Python 实战:内网渗透中的信息收集自动化脚本(1)
主要介绍一些python脚本在渗透测试等方面的使用
·
用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。
Python 脚本在网络安全和渗透测试中扮演着 **“瑞士军刀”** 般的核心角色,其灵活性、丰富的库生态和跨平台特性,使其快速实现需求、解决实际问题的首选工具。
相比 C/C++(开发周期长)、PowerShell(平台限制)等语言,Python 的优势在于:
- 语法简洁:几行代码即可实现核心功能(如
requests.get(url)一句话发起 HTTP 请求),适合快速验证想法。 - 库生态丰富:
requests(HTTP)、scapy(网络包)、impacket(内网协议)等安全专用库覆盖 90% 以上的渗透场景,无需重复造轮子。 - 跨平台兼容:同一脚本可在 Windows、Linux、Kali 中运行,适配渗透测试中多变的环境。
接下来我会不定时分享一些python脚本,为在内网渗透等操作中提供便利,并且通过脚本也能够了解一些工具的内在原理。首先,我们先分享多功能网络扫描脚本
from scapy.all import *
import argparse
import time
from concurrent.futures import ThreadPoolExecutor
import json
from datetime import datetime
# 常见端口与服务映射表
PORT_SERVICE_MAP = {
21: "FTP",
22: "SSH",
23: "Telnet",
25: "SMTP",
53: "DNS",
80: "HTTP",
110: "POP3",
143: "IMAP",
443: "HTTPS",
445: "SMB",
3306: "MySQL",
3389: "RDP",
8080: "HTTP Proxy",
8443: "HTTPS Alternate"
}
def get_service(port):
"""根据端口号获取对应的服务名称"""
return PORT_SERVICE_MAP.get(port, "Unknown Service")
def icmp_ping(host, timeout=2):
"""ICMP存活探测(判断主机是否在线)"""
try:
ping = IP(dst=host)/ICMP()
reply = sr1(ping, timeout=timeout, verbose=0)
return reply is not None
except Exception as e:
print(f"[!] ICMP扫描错误: {str(e)}")
return False
def syn_scan(host, port, timeout=2):
"""单个端口的SYN扫描"""
try:
ans, unans = sr(IP(dst=host)/TCP(dport=port, flags="S"),
timeout=timeout, verbose=0)
for (s, r) in ans:
if r.haslayer(TCP) and r[TCP].flags == "SA": # 收到SYN-ACK响应
return True, "open"
return False, "closed"
except Exception as e:
return False, f"error: {str(e)}"
def tcp_connect_scan(host, port, timeout=2):
"""TCP全连接扫描(建立完整三次握手)"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0, "open" if result == 0 else "closed"
except Exception as e:
return False, f"error: {str(e)}"
def udp_scan(host, port, timeout=2):
"""UDP端口扫描"""
try:
ans, unans = sr(IP(dst=host)/UDP(dport=port),
timeout=timeout, verbose=0)
for (s, r) in ans:
if r.haslayer(UDP):
return True, "open" # 收到UDP响应
elif r.haslayer(ICMP) and r[ICMP].type == 3 and r[ICMP].code in [1, 2, 3, 9, 10, 13]:
return False, "closed" # 收到ICMP不可达
return True, "open|filtered" # 无响应,可能开放或被过滤
except Exception as e:
return False, f"error: {str(e)}"
def dns_scan(host, record_type="A", domain="google.com", timeout=2):
"""DNS服务扫描,支持多种记录类型"""
dns_types = {"A": 1, "AAAA": 28, "MX": 15, "NS": 2, "CNAME": 5, "TXT": 16}
if record_type not in dns_types:
print(f"[!] 不支持的DNS记录类型: {record_type}")
return None
try:
ans, unans = sr(
IP(dst=host)/UDP(dport=53)/DNS(
rd=1,
qd=DNSQR(qname=domain, qtype=dns_types[record_type])
),
timeout=timeout,
verbose=0
)
if ans:
responses = []
for (s, r) in ans:
if r.haslayer(DNS) and r[DNS].ancount > 0:
for i in range(r[DNS].ancount):
responses.append(str(r[DNS].an[i]))
return {
"status": "active",
"record_type": record_type,
"domain": domain,
"responses": responses
}
return {"status": "inactive", "message": "未收到DNS响应"}
except Exception as e:
return {"status": "error", "message": str(e)}
def port_scan_worker(host, port, scan_type, timeout):
"""扫描工作线程"""
if scan_type == "syn":
is_open, status = syn_scan(host, port, timeout)
elif scan_type == "tcp":
is_open, status = tcp_connect_scan(host, port, timeout)
elif scan_type == "udp":
is_open, status = udp_scan(host, port, timeout)
else:
return None
if is_open:
return {
"port": port,
"status": status,
"service": get_service(port)
}
return None
def run_port_scan(host, ports, scan_type="syn", timeout=2, max_workers=10):
"""批量端口扫描(支持多线程)"""
print(f"\n[*] 开始对 {host} 进行{scan_type.upper()}端口扫描...")
print(f"[*] 扫描端口范围: {ports}")
start_time = time.time()
# 处理端口范围(如 1-100)
parsed_ports = []
for p in ports:
if "-" in p:
start, end = p.split("-")
parsed_ports.extend(range(int(start), int(end)+1))
else:
parsed_ports.append(int(p))
parsed_ports = list(set(parsed_ports)) # 去重
parsed_ports.sort()
open_ports = []
# 使用线程池提高扫描效率
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(
port_scan_worker, host, port, scan_type, timeout
) for port in parsed_ports]
for future in futures:
result = future.result()
if result:
open_ports.append(result)
print(f"[+] 端口 {result['port']} 开放 - {result['service']}")
end_time = time.time()
print(f"\n[*] 扫描完成,耗时 {end_time - start_time:.2f} 秒")
print(f"[*] 共发现 {len(open_ports)} 个开放端口")
return open_ports
def save_results(results, filename=None):
"""保存扫描结果到文件"""
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"scan_results_{timestamp}.json"
with open(filename, "w") as f:
json.dump(results, f, indent=2)
print(f"\n[*] 扫描结果已保存到 {filename}")
def main():
# 命令行参数解析
parser = argparse.ArgumentParser(description="高级网络扫描工具(支持多种扫描类型)")
parser.add_argument("host", help="目标主机IP地址")
parser.add_argument("-p", "--ports", nargs="+", default=["80", "443", "22", "25", "53"],
help="端口列表或范围(如 80 443 1-100)")
parser.add_argument("-t", "--type", choices=["syn", "tcp", "udp"], default="syn",
help="扫描类型(syn/tcp/udp)")
parser.add_argument("-d", "--dns", action="store_true", help="执行DNS服务扫描")
parser.add_argument("-r", "--dns-record", default="A",
help="DNS记录类型(A/AAAA/MX/NS/CNAME/TXT)")
parser.add_argument("-D", "--domain", default="google.com",
help="DNS查询的域名")
parser.add_argument("-T", "--timeout", type=int, default=2, help="超时时间(秒)")
parser.add_argument("-w", "--workers", type=int, default=10, help="线程数量")
parser.add_argument("-s", "--save", action="store_true", help="保存结果到文件")
args = parser.parse_args()
# 合法性提示
print("[!] 警告: 仅对授权目标进行扫描,未经授权的网络扫描可能违法!")
# 先进行主机存活探测
print(f"\n[*] 正在检测 {args.host} 是否在线...")
if not icmp_ping(args.host, args.timeout):
print(f"[!] 目标 {args.host} 似乎不在线,继续扫描可能无结果")
if input("是否继续?(y/n) ").lower() != "y":
return
# 执行端口扫描
scan_results = {
"host": args.host,
"scan_time": datetime.now().isoformat(),
"port_scan": {
"type": args.type,
"ports_scanned": args.ports,
"open_ports": run_port_scan(
args.host, args.ports, args.type, args.timeout, args.workers
)
}
}
# 执行DNS扫描(如果指定)
if args.dns:
print(f"\n[*] 开始对 {args.host} 进行DNS扫描...")
dns_result = dns_scan(args.host, args.dns_record, args.domain, args.timeout)
scan_results["dns_scan"] = dns_result
if dns_result["status"] == "active":
print(f"[+] 发现活跃DNS服务器: {args.host}")
print(f"[+] {args.dns_record}记录响应:")
for resp in dns_result["responses"]:
print(f" - {resp}")
else:
print(f"[-] DNS扫描结果: {dns_result['message']}")
# 保存结果(如果指定)
if args.save:
save_results(scan_results)
if __name__ == "__main__":
main()
这个脚本是一个多功能网络扫描工具,基于 Scapy 库开发,主要用于网络安全测试和信息收集,支持多种扫描方式,可帮助检测目标主机的端口状态、服务类型及 DNS 服务信息。
核心功能
-
端口扫描
- 支持三种扫描模式:
syn:SYN 半开放扫描(隐蔽性强,不建立完整连接)tcp:TCP 全连接扫描(建立完整三次握手,准确性高)udp:UDP 端口扫描(用于检测 DNS、SNMP 等 UDP 服务)
- 能识别常见端口对应的服务(如 80 端口对应 HTTP、443 对应 HTTPS)
- 支持三种扫描模式:
-
主机存活探测
通过 ICMP 协议(类似 ping 命令)先判断目标主机是否在线,避免对离线主机做无效扫描。 -
DNS 服务检测
可查询目标主机的 DNS 服务是否活跃,并支持多种 DNS 记录类型(A/AAAA/MX/NS 等)。 -
结果保存
扫描结果可保存为 JSON 文件,方便后续分析或生成报告。
基本语法
python 脚本名.py 目标主机IP [可选参数]
必选参数
目标主机IP:必须指定(如8.8.8.8、192.168.1.1),这是之前报错的原因(缺少此参数)。
常用可选参数
| 参数 | 作用说明 | 示例 |
|---|---|---|
-p |
指定要扫描的端口(支持单个端口、多个端口或范围,如80、80 443、1-100) |
-p 80 443 1-100 |
-t |
指定扫描类型(syn/tcp/udp,默认syn) |
-t udp |
-d |
开启 DNS 扫描(检测目标是否为 DNS 服务器) | -d |
-r |
配合-d使用,指定 DNS 记录类型(如A/MX/NS,默认A) |
-d -r MX |
-D |
配合-d使用,指定 DNS 查询的域名(默认google.com) |
-d -D example.com |
-T |
设置超时时间(秒,默认 2 秒) | -T 5 |
-w |
设置扫描线程数(默认 10,线程越多速度越快,但可能被防火墙拦截) | -w 20 |
-s |
将扫描结果保存为 JSON 文件(自动生成带时间戳的文件名) | -s |
以下是实操

这里只是分享思路,可以进行自行开发,使之更加完善。
更多推荐

所有评论(0)