用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。

Python 脚本在网络安全和渗透测试中扮演着 **“瑞士军刀”** 般的核心角色,其灵活性、丰富的库生态和跨平台特性,使其快速实现需求、解决实际问题的首选工具。

相比 C/C++(开发周期长)、PowerShell(平台限制)等语言,Python 的优势在于:

  1. 语法简洁:几行代码即可实现核心功能(如requests.get(url)一句话发起 HTTP 请求),适合快速验证想法。
  2. 库生态丰富requests(HTTP)、scapy(网络包)、impacket(内网协议)等安全专用库覆盖 90% 以上的渗透场景,无需重复造轮子。
  3. 跨平台兼容:同一脚本可在 Windows、Linux、Kali 中运行,适配渗透测试中多变的环境。

接下来我会不定时分享一些python脚本,为在内网渗透等操作中提供便利,并且通过脚本也能够了解一些工具的内在原理。首先,我们先分享多功能网络扫描脚本

from scapy.all import *
import argparse
import time
from concurrent.futures import ThreadPoolExecutor
import json
from datetime import datetime

# 常见端口与服务映射表
PORT_SERVICE_MAP = {
    21: "FTP",
    22: "SSH",
    23: "Telnet",
    25: "SMTP",
    53: "DNS",
    80: "HTTP",
    110: "POP3",
    143: "IMAP",
    443: "HTTPS",
    445: "SMB",
    3306: "MySQL",
    3389: "RDP",
    8080: "HTTP Proxy",
    8443: "HTTPS Alternate"
}

def get_service(port):
    """根据端口号获取对应的服务名称"""
    return PORT_SERVICE_MAP.get(port, "Unknown Service")

def icmp_ping(host, timeout=2):
    """ICMP存活探测(判断主机是否在线)"""
    try:
        ping = IP(dst=host)/ICMP()
        reply = sr1(ping, timeout=timeout, verbose=0)
        return reply is not None
    except Exception as e:
        print(f"[!] ICMP扫描错误: {str(e)}")
        return False

def syn_scan(host, port, timeout=2):
    """单个端口的SYN扫描"""
    try:
        ans, unans = sr(IP(dst=host)/TCP(dport=port, flags="S"), 
                       timeout=timeout, verbose=0)
        for (s, r) in ans:
            if r.haslayer(TCP) and r[TCP].flags == "SA":  # 收到SYN-ACK响应
                return True, "open"
        return False, "closed"
    except Exception as e:
        return False, f"error: {str(e)}"

def tcp_connect_scan(host, port, timeout=2):
    """TCP全连接扫描(建立完整三次握手)"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0, "open" if result == 0 else "closed"
    except Exception as e:
        return False, f"error: {str(e)}"

def udp_scan(host, port, timeout=2):
    """UDP端口扫描"""
    try:
        ans, unans = sr(IP(dst=host)/UDP(dport=port), 
                       timeout=timeout, verbose=0)
        for (s, r) in ans:
            if r.haslayer(UDP):
                return True, "open"  # 收到UDP响应
            elif r.haslayer(ICMP) and r[ICMP].type == 3 and r[ICMP].code in [1, 2, 3, 9, 10, 13]:
                return False, "closed"  # 收到ICMP不可达
        return True, "open|filtered"  # 无响应,可能开放或被过滤
    except Exception as e:
        return False, f"error: {str(e)}"

def dns_scan(host, record_type="A", domain="google.com", timeout=2):
    """DNS服务扫描,支持多种记录类型"""
    dns_types = {"A": 1, "AAAA": 28, "MX": 15, "NS": 2, "CNAME": 5, "TXT": 16}
    if record_type not in dns_types:
        print(f"[!] 不支持的DNS记录类型: {record_type}")
        return None

    try:
        ans, unans = sr(
            IP(dst=host)/UDP(dport=53)/DNS(
                rd=1, 
                qd=DNSQR(qname=domain, qtype=dns_types[record_type])
            ),
            timeout=timeout, 
            verbose=0
        )
        if ans:
            responses = []
            for (s, r) in ans:
                if r.haslayer(DNS) and r[DNS].ancount > 0:
                    for i in range(r[DNS].ancount):
                        responses.append(str(r[DNS].an[i]))
            return {
                "status": "active",
                "record_type": record_type,
                "domain": domain,
                "responses": responses
            }
        return {"status": "inactive", "message": "未收到DNS响应"}
    except Exception as e:
        return {"status": "error", "message": str(e)}

def port_scan_worker(host, port, scan_type, timeout):
    """扫描工作线程"""
    if scan_type == "syn":
        is_open, status = syn_scan(host, port, timeout)
    elif scan_type == "tcp":
        is_open, status = tcp_connect_scan(host, port, timeout)
    elif scan_type == "udp":
        is_open, status = udp_scan(host, port, timeout)
    else:
        return None

    if is_open:
        return {
            "port": port,
            "status": status,
            "service": get_service(port)
        }
    return None

def run_port_scan(host, ports, scan_type="syn", timeout=2, max_workers=10):
    """批量端口扫描(支持多线程)"""
    print(f"\n[*] 开始对 {host} 进行{scan_type.upper()}端口扫描...")
    print(f"[*] 扫描端口范围: {ports}")
    start_time = time.time()
    
    # 处理端口范围(如 1-100)
    parsed_ports = []
    for p in ports:
        if "-" in p:
            start, end = p.split("-")
            parsed_ports.extend(range(int(start), int(end)+1))
        else:
            parsed_ports.append(int(p))
    parsed_ports = list(set(parsed_ports))  # 去重
    parsed_ports.sort()

    open_ports = []
    # 使用线程池提高扫描效率
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(
            port_scan_worker, host, port, scan_type, timeout
        ) for port in parsed_ports]
        
        for future in futures:
            result = future.result()
            if result:
                open_ports.append(result)
                print(f"[+] 端口 {result['port']} 开放 - {result['service']}")

    end_time = time.time()
    print(f"\n[*] 扫描完成,耗时 {end_time - start_time:.2f} 秒")
    print(f"[*] 共发现 {len(open_ports)} 个开放端口")
    return open_ports

def save_results(results, filename=None):
    """保存扫描结果到文件"""
    if not filename:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"scan_results_{timestamp}.json"
    
    with open(filename, "w") as f:
        json.dump(results, f, indent=2)
    print(f"\n[*] 扫描结果已保存到 {filename}")

def main():
    # 命令行参数解析
    parser = argparse.ArgumentParser(description="高级网络扫描工具(支持多种扫描类型)")
    parser.add_argument("host", help="目标主机IP地址")
    parser.add_argument("-p", "--ports", nargs="+", default=["80", "443", "22", "25", "53"],
                       help="端口列表或范围(如 80 443 1-100)")
    parser.add_argument("-t", "--type", choices=["syn", "tcp", "udp"], default="syn",
                       help="扫描类型(syn/tcp/udp)")
    parser.add_argument("-d", "--dns", action="store_true", help="执行DNS服务扫描")
    parser.add_argument("-r", "--dns-record", default="A", 
                       help="DNS记录类型(A/AAAA/MX/NS/CNAME/TXT)")
    parser.add_argument("-D", "--domain", default="google.com", 
                       help="DNS查询的域名")
    parser.add_argument("-T", "--timeout", type=int, default=2, help="超时时间(秒)")
    parser.add_argument("-w", "--workers", type=int, default=10, help="线程数量")
    parser.add_argument("-s", "--save", action="store_true", help="保存结果到文件")
    
    args = parser.parse_args()

    # 合法性提示
    print("[!] 警告: 仅对授权目标进行扫描,未经授权的网络扫描可能违法!")
    
    # 先进行主机存活探测
    print(f"\n[*] 正在检测 {args.host} 是否在线...")
    if not icmp_ping(args.host, args.timeout):
        print(f"[!] 目标 {args.host} 似乎不在线,继续扫描可能无结果")
        if input("是否继续?(y/n) ").lower() != "y":
            return

    # 执行端口扫描
    scan_results = {
        "host": args.host,
        "scan_time": datetime.now().isoformat(),
        "port_scan": {
            "type": args.type,
            "ports_scanned": args.ports,
            "open_ports": run_port_scan(
                args.host, args.ports, args.type, args.timeout, args.workers
            )
        }
    }

    # 执行DNS扫描(如果指定)
    if args.dns:
        print(f"\n[*] 开始对 {args.host} 进行DNS扫描...")
        dns_result = dns_scan(args.host, args.dns_record, args.domain, args.timeout)
        scan_results["dns_scan"] = dns_result
        if dns_result["status"] == "active":
            print(f"[+] 发现活跃DNS服务器: {args.host}")
            print(f"[+] {args.dns_record}记录响应:")
            for resp in dns_result["responses"]:
                print(f"    - {resp}")
        else:
            print(f"[-] DNS扫描结果: {dns_result['message']}")

    # 保存结果(如果指定)
    if args.save:
        save_results(scan_results)

if __name__ == "__main__":
    main()

这个脚本是一个多功能网络扫描工具,基于 Scapy 库开发,主要用于网络安全测试和信息收集,支持多种扫描方式,可帮助检测目标主机的端口状态、服务类型及 DNS 服务信息。

核心功能

  1. 端口扫描

    • 支持三种扫描模式:
      • syn:SYN 半开放扫描(隐蔽性强,不建立完整连接)
      • tcp:TCP 全连接扫描(建立完整三次握手,准确性高)
      • udp:UDP 端口扫描(用于检测 DNS、SNMP 等 UDP 服务)
    • 能识别常见端口对应的服务(如 80 端口对应 HTTP、443 对应 HTTPS)
  2. 主机存活探测
    通过 ICMP 协议(类似 ping 命令)先判断目标主机是否在线,避免对离线主机做无效扫描。

  3. DNS 服务检测
    可查询目标主机的 DNS 服务是否活跃,并支持多种 DNS 记录类型(A/AAAA/MX/NS 等)。

  4. 结果保存
    扫描结果可保存为 JSON 文件,方便后续分析或生成报告。

基本语法

python 脚本名.py 目标主机IP [可选参数]

必选参数

  • 目标主机IP:必须指定(如8.8.8.8192.168.1.1),这是之前报错的原因(缺少此参数)。

常用可选参数

参数 作用说明 示例
-p 指定要扫描的端口(支持单个端口、多个端口或范围,如8080 4431-100 -p 80 443 1-100
-t 指定扫描类型(syn/tcp/udp,默认syn -t udp
-d 开启 DNS 扫描(检测目标是否为 DNS 服务器) -d
-r 配合-d使用,指定 DNS 记录类型(如A/MX/NS,默认A -d -r MX
-D 配合-d使用,指定 DNS 查询的域名(默认google.com -d -D example.com
-T 设置超时时间(秒,默认 2 秒) -T 5
-w 设置扫描线程数(默认 10,线程越多速度越快,但可能被防火墙拦截) -w 20
-s 将扫描结果保存为 JSON 文件(自动生成带时间戳的文件名) -s

以下是实操

这里只是分享思路,可以进行自行开发,使之更加完善。

Logo

一站式 AI 云服务平台

更多推荐