001-Python运维系统基础信息模块详解
摘要: 本章系统介绍了Python在自动化运维中的三大核心模块: psutil:用于获取系统性能信息(CPU、内存、磁盘、网络等)及进程管理(监控、终止、资源限制)。 IPy:提供IP地址与网段的计算、子网划分、聚合及VLSM规划功能,支持IPv4/IPv6。 dnspython:实现DNS查询(A/MX/NS等记录)、反向解析、轮询监控及性能分析,支持多DNS服务器检测。 通过代码示例展示了模块
系统基础信息模块详解
在自动化运维中,获取和管理系统基础信息是最基本也是最重要的功能之一。本章将详细介绍如何使用Python实现系统信息采集、IP地址处理和DNS管理等核心功能。
1.1 系统性能信息模块 psutil
psutil(process and system utilities)是一个跨平台的Python库,用于获取系统运行时的进程和系统利用率信息,包括CPU、内存、磁盘、网络等。它实现了许多Unix命令行工具提供的功能,如ps、top、lsof、netstat、ifconfig等。
1.1.1 获取系统性能信息
安装psutil
bash
pip install psutil
CPU信息获取
python
import psutil
import datetime
# CPU逻辑核心数
print(f"CPU逻辑核心数: {psutil.cpu_count()}")
# CPU物理核心数
print(f"CPU物理核心数: {psutil.cpu_count(logical=False)}")
# CPU使用率
# interval参数表示计算CPU使用率的时间间隔(秒)
print(f"CPU使用率: {psutil.cpu_percent(interval=1)}%")
# 每个CPU核心的使用率
print(f"每个核心使用率: {psutil.cpu_percent(interval=1, percpu=True)}")
# CPU频率信息
cpu_freq = psutil.cpu_freq()
print(f"CPU当前频率: {cpu_freq.current:.2f} MHz")
print(f"CPU最小频率: {cpu_freq.min:.2f} MHz")
print(f"CPU最大频率: {cpu_freq.max:.2f} MHz")
# CPU时间统计
cpu_times = psutil.cpu_times()
print(f"用户态时间: {cpu_times.user} 秒")
print(f"系统态时间: {cpu_times.system} 秒")
print(f"空闲时间: {cpu_times.idle} 秒")
内存信息获取
python
import psutil
def bytes_to_gb(bytes):
"""字节转换为GB"""
return bytes / (1024 ** 3)
# 物理内存信息
memory = psutil.virtual_memory()
print(f"总内存: {bytes_to_gb(memory.total):.2f} GB")
print(f"可用内存: {bytes_to_gb(memory.available):.2f} GB")
print(f"已用内存: {bytes_to_gb(memory.used):.2f} GB")
print(f"内存使用率: {memory.percent}%")
# 交换内存信息
swap = psutil.swap_memory()
print(f"\n交换内存总量: {bytes_to_gb(swap.total):.2f} GB")
print(f"交换内存使用: {bytes_to_gb(swap.used):.2f} GB")
print(f"交换内存空闲: {bytes_to_gb(swap.free):.2f} GB")
print(f"交换内存使用率: {swap.percent}%")
磁盘信息获取
python
import psutil
# 磁盘分区信息
partitions = psutil.disk_partitions()
for partition in partitions:
print(f"设备: {partition.device}")
print(f"挂载点: {partition.mountpoint}")
print(f"文件系统: {partition.fstype}")
try:
partition_usage = psutil.disk_usage(partition.mountpoint)
print(f" 总容量: {bytes_to_gb(partition_usage.total):.2f} GB")
print(f" 已使用: {bytes_to_gb(partition_usage.used):.2f} GB")
print(f" 可用: {bytes_to_gb(partition_usage.free):.2f} GB")
print(f" 使用率: {partition_usage.percent}%")
except PermissionError:
print(" 权限不足,无法获取使用信息")
print("-" * 50)
# 磁盘IO统计
disk_io = psutil.disk_io_counters()
print(f"\n磁盘读取: {bytes_to_gb(disk_io.read_bytes):.2f} GB")
print(f"磁盘写入: {bytes_to_gb(disk_io.write_bytes):.2f} GB")
print(f"读取次数: {disk_io.read_count}")
print(f"写入次数: {disk_io.write_count}")
网络信息获取
python
import psutil
# 网络接口信息
interfaces = psutil.net_if_addrs()
for interface_name, addresses in interfaces.items():
print(f"\n接口: {interface_name}")
for addr in addresses:
if addr.family == 2: # IPv4
print(f" IPv4地址: {addr.address}")
print(f" 子网掩码: {addr.netmask}")
elif addr.family == 10: # IPv6
print(f" IPv6地址: {addr.address}")
# 网络连接信息
connections = psutil.net_connections(kind='inet')
print(f"\n活动连接数: {len(connections)}")
# 网络IO统计
net_io = psutil.net_io_counters()
print(f"\n网络发送: {bytes_to_gb(net_io.bytes_sent):.2f} GB")
print(f"网络接收: {bytes_to_gb(net_io.bytes_recv):.2f} GB")
print(f"发送包数: {net_io.packets_sent}")
print(f"接收包数: {net_io.packets_recv}")
系统信息获取
python
import psutil
import platform
from datetime import datetime
# 系统启动时间
boot_time = datetime.fromtimestamp(psutil.boot_time())
print(f"系统启动时间: {boot_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 当前时间
current_time = datetime.now()
uptime = current_time - boot_time
print(f"系统运行时间: {uptime}")
# 用户信息
users = psutil.users()
for user in users:
print(f"\n用户名: {user.name}")
print(f"终端: {user.terminal}")
print(f"主机: {user.host}")
print(f"登录时间: {datetime.fromtimestamp(user.started).strftime('%Y-%m-%d %H:%M:%S')}")
# 系统平台信息
print(f"\n操作系统: {platform.system()}")
print(f"系统版本: {platform.version()}")
print(f"系统架构: {platform.machine()}")
print(f"主机名: {platform.node()}")
print(f"Python版本: {platform.python_version()}")
1.1.2 系统进程管理方法
进程列表和基本信息
python
import psutil
import time
def get_process_info(pid):
"""获取指定进程的详细信息"""
try:
process = psutil.Process(pid)
info = {
'pid': process.pid,
'name': process.name(),
'status': process.status(),
'cpu_percent': process.cpu_percent(interval=1),
'memory_percent': process.memory_percent(),
'memory_info': process.memory_info(),
'create_time': time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(process.create_time())),
'num_threads': process.num_threads(),
'cmdline': ' '.join(process.cmdline())
}
return info
except psutil.NoSuchProcess:
return None
# 获取所有进程列表
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
# 只显示CPU使用率大于0的进程
if proc.info['cpu_percent'] > 0:
print(f"PID: {proc.info['pid']}, "
f"Name: {proc.info['name']}, "
f"CPU: {proc.info['cpu_percent']}%")
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
进程监控和管理
python
import psutil
import time
class ProcessMonitor:
"""进程监控类"""
def __init__(self, process_name):
self.process_name = process_name
self.process = None
self._find_process()
def _find_process(self):
"""查找进程"""
for proc in psutil.process_iter(['name']):
if proc.info['name'] == self.process_name:
self.process = proc
return True
return False
def monitor(self, interval=1, duration=60):
"""监控进程资源使用情况"""
if not self.process:
print(f"进程 {self.process_name} 未找到")
return
print(f"开始监控进程: {self.process_name} (PID: {self.process.pid})")
print("时间\t\tCPU使用率\t内存使用率\t内存使用(MB)")
print("-" * 60)
start_time = time.time()
while time.time() - start_time < duration:
try:
cpu_percent = self.process.cpu_percent(interval=interval)
memory_percent = self.process.memory_percent()
memory_mb = self.process.memory_info().rss / 1024 / 1024
print(f"{time.strftime('%H:%M:%S')}\t"
f"{cpu_percent:.1f}%\t\t"
f"{memory_percent:.1f}%\t\t"
f"{memory_mb:.1f}")
time.sleep(interval)
except psutil.NoSuchProcess:
print("进程已结束")
break
def kill_process(self):
"""终止进程"""
if self.process:
try:
self.process.terminate() # 优雅终止
self.process.wait(timeout=3) # 等待进程结束
except psutil.TimeoutExpired:
self.process.kill() # 强制终止
print(f"进程 {self.process_name} 已终止")
# 使用示例
# monitor = ProcessMonitor("python.exe")
# monitor.monitor(interval=2, duration=30)
进程树和子进程管理
python
import psutil
def print_process_tree(pid, indent=0):
"""打印进程树"""
try:
process = psutil.Process(pid)
print(' ' * indent + f"├─ {process.name()} (PID: {pid})")
# 获取子进程
children = process.children(recursive=False)
for child in children:
print_process_tree(child.pid, indent + 2)
except psutil.NoSuchProcess:
pass
# 打印当前Python进程的进程树
import os
print("当前进程树:")
print_process_tree(os.getpid())
# 创建进程并监控
import subprocess
def run_and_monitor_process(command):
"""运行并监控外部进程"""
# 启动进程
proc = subprocess.Popen(command, shell=True)
psutil_proc = psutil.Process(proc.pid)
print(f"启动进程: {command} (PID: {proc.pid})")
# 监控进程
while proc.poll() is None: # 进程还在运行
try:
cpu = psutil_proc.cpu_percent(interval=1)
mem = psutil_proc.memory_info().rss / 1024 / 1024
print(f"CPU: {cpu}%, 内存: {mem:.1f}MB")
except psutil.NoSuchProcess:
break
print(f"进程结束,返回码: {proc.returncode}")
高级进程管理功能
python
import psutil
import signal
import os
class AdvancedProcessManager:
"""高级进程管理器"""
@staticmethod
def get_top_processes(sort_by='cpu', top_n=10):
"""获取资源占用最高的进程"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
proc.info['cpu_percent'] = proc.cpu_percent(interval=0.1)
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# 排序
if sort_by == 'cpu':
key = 'cpu_percent'
else:
key = 'memory_percent'
sorted_procs = sorted(processes, key=lambda x: x[key], reverse=True)
return sorted_procs[:top_n]
@staticmethod
def limit_process_resources(pid, cpu_percent=50):
"""限制进程资源使用(Linux)"""
if os.name != 'posix':
print("资源限制仅支持Linux系统")
return
try:
process = psutil.Process(pid)
# 使用nice值调整进程优先级
process.nice(10) # 降低优先级
print(f"已调整进程 {pid} 的优先级")
except Exception as e:
print(f"调整失败: {e}")
@staticmethod
def find_zombie_processes():
"""查找僵尸进程"""
zombies = []
for proc in psutil.process_iter(['pid', 'name', 'status']):
try:
if proc.info['status'] == psutil.STATUS_ZOMBIE:
zombies.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return zombies
# 使用示例
manager = AdvancedProcessManager()
# 获取CPU占用最高的10个进程
print("CPU占用最高的进程:")
for proc in manager.get_top_processes('cpu', 5):
print(f"PID: {proc['pid']}, Name: {proc['name']}, CPU: {proc['cpu_percent']}%")
# 查找僵尸进程
zombies = manager.find_zombie_processes()
if zombies:
print("\n发现僵尸进程:")
for zombie in zombies:
print(f"PID: {zombie['pid']}, Name: {zombie['name']}")
1.2 实用的IP地址处理模块 IPy
IPy是一个强大的Python第三方模块,用于处理IPv4和IPv6地址和网络。它可以自动识别IP地址版本,进行IP地址计算、验证和转换。
1.2.1 IP地址、网段的基本处理
安装IPy模块
bash
pip install IPy
IP地址基本操作
python
from IPy import IP
# 创建IP对象
ip = IP('192.168.1.1')
print(f"IP地址: {ip}")
print(f"版本: IPv{ip.version()}")
print(f"二进制表示: {ip.strBin()}")
print(f"十六进制表示: {ip.strHex()}")
print(f"整数表示: {ip.int()}")
# IPv6地址
ipv6 = IP('2001:db8::1')
print(f"\nIPv6地址: {ipv6}")
print(f"完整格式: {ipv6.strFullsize()}")
print(f"压缩格式: {ipv6.strCompressed()}")
# IP地址类型判断
print(f"\n{ip} 是私有地址: {ip.iptype() == 'PRIVATE'}")
print(f"{ipv6} 是公网地址: {ipv6.iptype() == 'PUBLIC'}")
# 反向解析格式
print(f"\n反向解析: {ip.reverseNames()[0]}")
网段处理
python
from IPy import IP
# 创建网段对象
network = IP('192.168.1.0/24')
print(f"网段: {network}")
print(f"网络地址: {network.net()}")
print(f"广播地址: {network.broadcast()}")
print(f"网络掩码: {network.netmask()}")
print(f"主机数量: {network.len()}")
# 遍历网段中的所有IP
print("\n网段中的前10个IP:")
for x, ip in enumerate(network):
if x < 10:
print(f" {ip}")
else:
break
# CIDR和子网掩码转换
cidr_network = IP('10.0.0.0/255.255.255.0')
print(f"\n子网掩码转CIDR: {cidr_network.strNormal()}")
# 网段包含关系
print(f"\n192.168.1.100 在 {network} 中: {'192.168.1.100' in network}")
print(f"192.168.2.1 在 {network} 中: {'192.168.2.1' in network}")
IP地址比较和运算
python
from IPy import IP
# IP地址比较
ip1 = IP('192.168.1.1')
ip2 = IP('192.168.1.2')
ip3 = IP('192.168.1.1')
print(f"{ip1} < {ip2}: {ip1 < ip2}")
print(f"{ip1} == {ip3}: {ip1 == ip3}")
# 网段比较
net1 = IP('192.168.1.0/24')
net2 = IP('192.168.0.0/16')
net3 = IP('192.168.1.0/25')
print(f"\n{net1} 包含于 {net2}: {net1 in net2}")
print(f"{net3} 包含于 {net1}: {net3 in net1}")
# 网段重叠检测
def check_overlap(net1, net2):
"""检查两个网段是否重叠"""
return net1.overlaps(net2)
print(f"\n{net1} 与 {net3} 重叠: {check_overlap(net1, net3)}")
1.2.2 多网络计算方法详解
子网划分
python
from IPy import IP
def subnet_calculator(network_str, subnet_size):
"""子网划分计算器"""
network = IP(network_str)
subnets = []
# 计算可以划分的子网数量
network_bits = network.prefixlen()
subnet_bits = subnet_size
if subnet_bits <= network_bits:
print(f"错误: 子网位数必须大于原网络位数 {network_bits}")
return []
# 计算子网数量
num_subnets = 2 ** (subnet_bits - network_bits)
subnet_hosts = 2 ** (32 - subnet_bits) - 2 # 减去网络地址和广播地址
print(f"原网络: {network}")
print(f"子网掩码位数: /{subnet_bits}")
print(f"子网数量: {num_subnets}")
print(f"每个子网可用主机数: {subnet_hosts}")
print("\n子网列表:")
# 生成子网
current_ip = network.net()
subnet_size_int = 2 ** (32 - subnet_bits)
for i in range(num_subnets):
subnet = IP(f"{current_ip}/{subnet_bits}")
subnets.append(subnet)
print(f" 子网{i+1}: {subnet} "
f"(网络: {subnet.net()}, 广播: {subnet.broadcast()})")
current_ip = IP(current_ip.int() + subnet_size_int)
return subnets
# 使用示例
subnets = subnet_calculator('192.168.1.0/24', 26)
VLSM(可变长子网掩码)计算
python
from IPy import IP
class VLSMCalculator:
"""VLSM计算器"""
def __init__(self, network_str):
self.network = IP(network_str)
self.available = [self.network]
self.allocated = []
def allocate_subnet(self, hosts_needed, description=""):
"""根据主机数分配子网"""
# 计算需要的子网大小
subnet_size = 2
while subnet_size - 2 < hosts_needed:
subnet_size *= 2
subnet_bits = 32 - subnet_size.bit_length() + 1
# 查找合适的可用网段
for i, available_net in enumerate(self.available):
if available_net.prefixlen() <= subnet_bits:
# 分配子网
allocated_subnet = IP(f"{available_net.net()}/{subnet_bits}")
# 更新可用网段列表
self.available.pop(i)
# 计算剩余网段
remaining_start = allocated_subnet.broadcast().int() + 1
remaining_end = available_net.broadcast().int()
if remaining_start <= remaining_end:
# 添加剩余部分到可用列表
remaining_bits = subnet_bits
while remaining_start <= remaining_end:
try:
remaining_net = IP(f"{IP(remaining_start)}/{remaining_bits}")
if remaining_net.broadcast().int() <= remaining_end:
self.available.append(remaining_net)
break
remaining_bits += 1
except:
break
# 记录分配信息
self.allocated.append({
'subnet': allocated_subnet,
'hosts_needed': hosts_needed,
'hosts_available': subnet_size - 2,
'description': description
})
return allocated_subnet
return None
def print_allocation_summary(self):
"""打印分配摘要"""
print(f"原始网络: {self.network}")
print("\n已分配子网:")
print("-" * 70)
print(f"{'子网':<20} {'所需主机':<10} {'可用主机':<10} {'说明':<20}")
print("-" * 70)
for alloc in self.allocated:
print(f"{str(alloc['subnet']):<20} "
f"{alloc['hosts_needed']:<10} "
f"{alloc['hosts_available']:<10} "
f"{alloc['description']:<20}")
print("\n剩余可用网段:")
for net in self.available:
print(f" {net}")
# 使用示例
vlsm = VLSMCalculator('172.16.0.0/20')
# 按需求分配子网
vlsm.allocate_subnet(500, "总部")
vlsm.allocate_subnet(200, "分公司A")
vlsm.allocate_subnet(100, "分公司B")
vlsm.allocate_subnet(50, "分公司C")
vlsm.allocate_subnet(20, "远程办公")
vlsm.print_allocation_summary()
IP地址聚合(路由汇总)
python
from IPy import IP
def aggregate_networks(networks):
"""IP地址聚合/路由汇总"""
if not networks:
return []
# 转换为IP对象并排序
ip_networks = [IP(net) for net in networks]
ip_networks.sort()
# 聚合网络
aggregated = []
current = ip_networks[0]
for net in ip_networks[1:]:
# 尝试合并相邻网络
try:
# 检查是否可以合并
combined = IP(f"{current.net()}/{current.prefixlen() - 1}")
if net in combined and current in combined:
current = combined
else:
aggregated.append(current)
current = net
except:
aggregated.append(current)
current = net
aggregated.append(current)
# 进一步优化聚合
optimized = []
for net in aggregated:
# 检查是否已被包含
is_contained = False
for opt_net in optimized:
if net in opt_net:
is_contained = True
break
if not is_contained:
# 移除被包含的网络
optimized = [n for n in optimized if n not in net]
optimized.append(net)
return optimized
# 使用示例
networks = [
'192.168.1.0/24',
'192.168.2.0/24',
'192.168.3.0/24',
'192.168.4.0/24',
'192.168.5.0/24',
'192.168.6.0/24',
'192.168.7.0/24',
'192.168.8.0/24'
]
print("原始网络列表:")
for net in networks:
print(f" {net}")
aggregated = aggregate_networks(networks)
print("\n聚合后的网络:")
for net in aggregated:
print(f" {net}")
IP地址规划工具
python
from IPy import IP
import json
class IPAddressPlanner:
"""IP地址规划工具"""
def __init__(self, base_network):
self.base_network = IP(base_network)
self.plan = {
'base_network': str(base_network),
'allocations': []
}
def plan_network(self, requirements):
"""
根据需求规划网络
requirements: 列表,每个元素为 (名称, 主机数, 预留百分比)
"""
# 按主机数降序排序
sorted_reqs = sorted(requirements, key=lambda x: x[1], reverse=True)
available_space = self.base_network
for name, hosts, reserve_pct in sorted_reqs:
# 计算实际需要的主机数(包含预留)
actual_hosts = int(hosts * (1 + reserve_pct / 100))
# 计算所需的子网大小
subnet_size = 2
while subnet_size - 2 < actual_hosts:
subnet_size *= 2
subnet_bits = 32 - subnet_size.bit_length() + 1
# 分配子网
if available_space.prefixlen() <= subnet_bits:
subnet = IP(f"{available_space.net()}/{subnet_bits}")
self.plan['allocations'].append({
'name': name,
'network': str(subnet),
'network_address': str(subnet.net()),
'broadcast_address': str(subnet.broadcast()),
'netmask': str(subnet.netmask()),
'first_host': str(IP(subnet.net().int() + 1)),
'last_host': str(IP(subnet.broadcast().int() - 1)),
'requested_hosts': hosts,
'actual_hosts': actual_hosts,
'available_hosts': subnet_size - 2,
'utilization': f"{(actual_hosts / (subnet_size - 2) * 100):.1f}%"
})
# 更新可用空间
next_network_int = subnet.broadcast().int() + 1
remaining_bits = available_space.prefixlen()
while remaining_bits < 32:
try:
available_space = IP(f"{IP(next_network_int)}/{remaining_bits}")
if available_space.broadcast().int() <= self.base_network.broadcast().int():
break
except:
pass
remaining_bits += 1
else:
print(f"警告: 无法为 {name} 分配足够的地址空间")
def export_plan(self, filename=None):
"""导出规划结果"""
if filename:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.plan, f, indent=2, ensure_ascii=False)
return self.plan
def print_plan(self):
"""打印规划结果"""
print(f"基础网络: {self.plan['base_network']}")
print("\n网络规划方案:")
print("-" * 100)
print(f"{'名称':<15} {'网络':<18} {'首个主机':<15} {'末个主机':<15} "
f"{'请求数':<8} {'可用数':<8} {'利用率':<8}")
print("-" * 100)
for alloc in self.plan['allocations']:
print(f"{alloc['name']:<15} "
f"{alloc['network']:<18} "
f"{alloc['first_host']:<15} "
f"{alloc['last_host']:<15} "
f"{alloc['requested_hosts']:<8} "
f"{alloc['available_hosts']:<8} "
f"{alloc['utilization']:<8}")
# 使用示例
planner = IPAddressPlanner('10.0.0.0/16')
# 定义网络需求
requirements = [
('总部核心网', 1000, 20), # 1000台主机,预留20%
('数据中心', 500, 30), # 500台主机,预留30%
('办公区A', 200, 25), # 200台主机,预留25%
('办公区B', 150, 25), # 150台主机,预留25%
('访客网络', 100, 50), # 100台主机,预留50%
('管理网络', 50, 20), # 50台主机,预留20%
('物联网设备', 200, 100), # 200台设备,预留100%
]
planner.plan_network(requirements)
planner.print_plan()
planner.export_plan('network_plan.json')
1.3 DNS处理模块 dnspython
dnspython是一个功能强大的DNS工具包,提供了高级和低级的DNS查询功能,支持几乎所有的DNS记录类型。
1.3.1 模块域名解析方法详解
安装dnspython
bash
pip install dnspython
基本DNS查询
python
import dns.resolver
import dns.reversename
def basic_dns_query(domain, record_type='A'):
"""基本DNS查询"""
try:
# 创建resolver对象
resolver = dns.resolver.Resolver()
# 执行查询
answers = resolver.resolve(domain, record_type)
results = []
for rdata in answers:
results.append(str(rdata))
return results
except dns.resolver.NXDOMAIN:
return f"域名 {domain} 不存在"
except dns.resolver.NoAnswer:
return f"域名 {domain} 没有 {record_type} 记录"
except Exception as e:
return f"查询错误: {e}"
# A记录查询
print("A记录查询:")
print(f"www.baidu.com: {basic_dns_query('www.baidu.com', 'A')}")
# MX记录查询
print("\nMX记录查询:")
mx_records = basic_dns_query('qq.com', 'MX')
print(f"qq.com MX记录: {mx_records}")
# CNAME记录查询
print("\nCNAME记录查询:")
print(f"www.taobao.com: {basic_dns_query('www.taobao.com', 'CNAME')}")
# TXT记录查询
print("\nTXT记录查询:")
print(f"baidu.com: {basic_dns_query('baidu.com', 'TXT')}")
高级DNS查询功能
python
import dns.resolver
import dns.query
import dns.zone
import time
class AdvancedDNSResolver:
"""高级DNS解析器"""
def __init__(self, nameservers=None, timeout=3):
self.resolver = dns.resolver.Resolver()
if nameservers:
self.resolver.nameservers = nameservers
self.resolver.timeout = timeout
self.resolver.lifetime = timeout * 2
def query_all_records(self, domain):
"""查询域名的所有常见记录类型"""
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA']
results = {}
for record_type in record_types:
try:
answers = self.resolver.resolve(domain, record_type)
results[record_type] = []
for rdata in answers:
if record_type == 'MX':
results[record_type].append({
'priority': rdata.preference,
'host': str(rdata.exchange)
})
elif record_type == 'SOA':
results[record_type].append({
'mname': str(rdata.mname),
'rname': str(rdata.rname),
'serial': rdata.serial,
'refresh': rdata.refresh,
'retry': rdata.retry,
'expire': rdata.expire,
'minimum': rdata.minimum
})
else:
results[record_type].append(str(rdata))
except:
continue
return results
def trace_dns_path(self, domain):
"""追踪DNS解析路径"""
parts = domain.split('.')
trace_results = []
# 从根服务器开始
current_ns = ['198.41.0.4'] # a.root-servers.net
for i in range(len(parts)):
query_domain = '.'.join(parts[-(i+1):])
try:
# 查询NS记录
temp_resolver = dns.resolver.Resolver()
temp_resolver.nameservers = current_ns
temp_resolver.timeout = 2
ns_answer = temp_resolver.resolve(query_domain, 'NS')
ns_servers = [str(rdata) for rdata in ns_answer]
# 获取NS服务器的IP
ns_ips = []
for ns in ns_servers:
try:
a_records = self.resolver.resolve(ns.rstrip('.'), 'A')
ns_ips.extend([str(rdata) for rdata in a_records])
except:
pass
trace_results.append({
'domain': query_domain,
'ns_servers': ns_servers,
'ns_ips': ns_ips
})
if ns_ips:
current_ns = ns_ips[:3] # 使用前3个NS服务器
except Exception as e:
trace_results.append({
'domain': query_domain,
'error': str(e)
})
return trace_results
def check_dns_propagation(self, domain, record_type='A', nameservers=None):
"""检查DNS传播状态"""
if not nameservers:
# 使用常见的公共DNS服务器
nameservers = {
'Google': ['8.8.8.8', '8.8.4.4'],
'Cloudflare': ['1.1.1.1', '1.0.0.1'],
'OpenDNS': ['208.67.222.222', '208.67.220.220'],
'Quad9': ['9.9.9.9', '149.112.112.112'],
'阿里DNS': ['223.5.5.5', '223.6.6.6'],
'百度DNS': ['180.76.76.76'],
'114DNS': ['114.114.114.114', '114.114.115.115']
}
results = {}
for provider, servers in nameservers.items():
for server in servers:
try:
temp_resolver = dns.resolver.Resolver()
temp_resolver.nameservers = [server]
temp_resolver.timeout = 3
start_time = time.time()
answers = temp_resolver.resolve(domain, record_type)
query_time = (time.time() - start_time) * 1000
records = [str(rdata) for rdata in answers]
results[f"{provider} ({server})"] = {
'records': records,
'query_time': f"{query_time:.2f}ms",
'status': 'OK'
}
except Exception as e:
results[f"{provider} ({server})"] = {
'records': [],
'query_time': 'N/A',
'status': f'Failed: {str(e)}'
}
return results
# 使用示例
resolver = AdvancedDNSResolver()
# 查询所有记录
print("查询 example.com 的所有记录:")
all_records = resolver.query_all_records('example.com')
for record_type, records in all_records.items():
if records:
print(f"\n{record_type} 记录:")
for record in records:
print(f" {record}")
# 检查DNS传播
print("\n\n检查 baidu.com 的DNS传播状态:")
propagation = resolver.check_dns_propagation('baidu.com')
for server, result in propagation.items():
print(f"\n{server}:")
print(f" 状态: {result['status']}")
print(f" 记录: {result['records']}")
print(f" 查询时间: {result['query_time']}")
1.3.2 常见解析类型示例说明
各种DNS记录类型查询
python
import dns.resolver
import json
class DNSRecordExplorer:
"""DNS记录探索工具"""
def __init__(self):
self.resolver = dns.resolver.Resolver()
self.resolver.timeout = 5
def get_a_records(self, domain):
"""获取A记录(IPv4地址)"""
try:
answers = self.resolver.resolve(domain, 'A')
return {
'type': 'A',
'description': 'IPv4地址记录',
'records': [str(rdata) for rdata in answers],
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'A', 'error': str(e)}
def get_aaaa_records(self, domain):
"""获取AAAA记录(IPv6地址)"""
try:
answers = self.resolver.resolve(domain, 'AAAA')
return {
'type': 'AAAA',
'description': 'IPv6地址记录',
'records': [str(rdata) for rdata in answers],
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'AAAA', 'error': str(e)}
def get_mx_records(self, domain):
"""获取MX记录(邮件服务器)"""
try:
answers = self.resolver.resolve(domain, 'MX')
records = []
for rdata in answers:
records.append({
'priority': rdata.preference,
'mail_server': str(rdata.exchange),
'ip_addresses': self._resolve_a_records(str(rdata.exchange))
})
# 按优先级排序
records.sort(key=lambda x: x['priority'])
return {
'type': 'MX',
'description': '邮件交换记录',
'records': records,
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'MX', 'error': str(e)}
def get_ns_records(self, domain):
"""获取NS记录(域名服务器)"""
try:
answers = self.resolver.resolve(domain, 'NS')
records = []
for rdata in answers:
ns_server = str(rdata)
records.append({
'nameserver': ns_server,
'ip_addresses': self._resolve_a_records(ns_server)
})
return {
'type': 'NS',
'description': '域名服务器记录',
'records': records,
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'NS', 'error': str(e)}
def get_txt_records(self, domain):
"""获取TXT记录(文本信息)"""
try:
answers = self.resolver.resolve(domain, 'TXT')
records = []
for rdata in answers:
txt_data = str(rdata).strip('"')
records.append({
'text': txt_data,
'purpose': self._identify_txt_purpose(txt_data)
})
return {
'type': 'TXT',
'description': '文本记录',
'records': records,
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'TXT', 'error': str(e)}
def get_srv_records(self, domain):
"""获取SRV记录(服务记录)"""
try:
answers = self.resolver.resolve(domain, 'SRV')
records = []
for rdata in answers:
records.append({
'priority': rdata.priority,
'weight': rdata.weight,
'port': rdata.port,
'target': str(rdata.target)
})
return {
'type': 'SRV',
'description': '服务定位记录',
'records': records,
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'SRV', 'error': str(e)}
def get_caa_records(self, domain):
"""获取CAA记录(证书颁发机构授权)"""
try:
answers = self.resolver.resolve(domain, 'CAA')
records = []
for rdata in answers:
records.append({
'flags': rdata.flags,
'tag': rdata.tag.decode(),
'value': rdata.value.decode()
})
return {
'type': 'CAA',
'description': '证书颁发机构授权记录',
'records': records,
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'CAA', 'error': str(e)}
def get_ptr_record(self, ip_address):
"""获取PTR记录(反向DNS)"""
try:
import dns.reversename
rev_name = dns.reversename.from_address(ip_address)
answers = self.resolver.resolve(rev_name, 'PTR')
return {
'type': 'PTR',
'description': '指针记录(反向DNS)',
'ip_address': ip_address,
'hostname': str(answers[0]),
'ttl': answers.rrset.ttl
}
except Exception as e:
return {'type': 'PTR', 'error': str(e)}
def _resolve_a_records(self, domain):
"""内部方法:解析A记录"""
try:
answers = self.resolver.resolve(domain.rstrip('.'), 'A')
return [str(rdata) for rdata in answers]
except:
return []
def _identify_txt_purpose(self, txt_content):
"""识别TXT记录的用途"""
txt_lower = txt_content.lower()
if 'spf' in txt_lower or 'v=spf1' in txt_lower:
return 'SPF (邮件发送策略框架)'
elif 'dkim' in txt_lower or 'v=dkim1' in txt_lower:
return 'DKIM (域名密钥识别邮件)'
elif 'dmarc' in txt_lower or 'v=dmarc1' in txt_lower:
return 'DMARC (基于域的消息认证)'
elif 'verification' in txt_lower or 'verify' in txt_lower:
return '域名验证'
elif 'microsoft' in txt_lower or 'MS=' in txt_content:
return 'Microsoft域验证'
elif 'google-site-verification' in txt_lower:
return 'Google站点验证'
else:
return '通用文本信息'
def comprehensive_lookup(self, domain):
"""综合查询域名的所有信息"""
results = {
'domain': domain,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'records': {}
}
# 查询各种记录类型
record_methods = [
self.get_a_records,
self.get_aaaa_records,
self.get_mx_records,
self.get_ns_records,
self.get_txt_records,
self.get_caa_records
]
for method in record_methods:
record_result = method(domain)
if 'error' not in record_result:
results['records'][record_result['type']] = record_result
return results
# 使用示例
explorer = DNSRecordExplorer()
# 综合查询
print("综合查询 baidu.com:")
comprehensive_result = explorer.comprehensive_lookup('baidu.com')
for record_type, data in comprehensive_result['records'].items():
print(f"\n{record_type} 记录 ({data['description']}):")
if 'records' in data:
for record in data['records']:
if isinstance(record, dict):
print(f" {json.dumps(record, ensure_ascii=False, indent=4)}")
else:
print(f" {record}")
print(f" TTL: {data['ttl']}秒")
# PTR记录查询
print("\n\nPTR记录查询:")
ptr_result = explorer.get_ptr_record('8.8.8.8')
if 'error' not in ptr_result:
print(f"IP地址 {ptr_result['ip_address']} 的主机名: {ptr_result['hostname']}")
1.3.3 实践:DNS域名轮循业务监控
DNS轮询监控系统
python
import dns.resolver
import time
import threading
import requests
from datetime import datetime
from collections import defaultdict
class DNSRoundRobinMonitor:
"""DNS轮询监控系统"""
def __init__(self, domain, check_interval=60):
self.domain = domain
self.check_interval = check_interval
self.resolver = dns.resolver.Resolver()
self.resolver.timeout = 5
self.monitoring = False
self.results = defaultdict(list)
self.ip_status = {}
def get_a_records(self):
"""获取域名的所有A记录"""
try:
answers = self.resolver.resolve(self.domain, 'A')
return [str(rdata) for rdata in answers]
except Exception as e:
print(f"DNS查询失败: {e}")
return []
def check_ip_health(self, ip, port=80, timeout=5):
"""检查IP地址的健康状态"""
try:
# HTTP健康检查
url = f"http://{ip}:{port}/"
response = requests.get(url, timeout=timeout,
headers={'Host': self.domain})
return {
'ip': ip,
'status': 'healthy',
'response_code': response.status_code,
'response_time': response.elapsed.total_seconds() * 1000,
'timestamp': datetime.now()
}
except requests.exceptions.Timeout:
return {
'ip': ip,
'status': 'timeout',
'response_code': None,
'response_time': None,
'timestamp': datetime.now()
}
except Exception as e:
return {
'ip': ip,
'status': 'error',
'response_code': None,
'response_time': None,
'error': str(e),
'timestamp': datetime.now()
}
def monitor_dns_changes(self):
"""监控DNS记录变化"""
previous_ips = set()
while self.monitoring:
current_ips = set(self.get_a_records())
# 检测新增的IP
added_ips = current_ips - previous_ips
if added_ips:
print(f"\n[{datetime.now()}] 检测到新增IP: {added_ips}")
for ip in added_ips:
self.ip_status[ip] = {'first_seen': datetime.now()}
# 检测删除的IP
removed_ips = previous_ips - current_ips
if removed_ips:
print(f"\n[{datetime.now()}] 检测到删除IP: {removed_ips}")
for ip in removed_ips:
if ip in self.ip_status:
self.ip_status[ip]['last_seen'] = datetime.now()
# 健康检查所有当前IP
for ip in current_ips:
health_result = self.check_ip_health(ip)
self.results[ip].append(health_result)
# 更新IP状态
if ip not in self.ip_status:
self.ip_status[ip] = {}
self.ip_status[ip]['last_check'] = health_result
previous_ips = current_ips
# 输出当前状态
self.print_status()
time.sleep(self.check_interval)
def analyze_performance(self):
"""分析性能数据"""
analysis = {}
for ip, checks in self.results.items():
healthy_checks = [c for c in checks if c['status'] == 'healthy']
if healthy_checks:
response_times = [c['response_time'] for c in healthy_checks]
analysis[ip] = {
'total_checks': len(checks),
'healthy_checks': len(healthy_checks),
'availability': len(healthy_checks) / len(checks) * 100,
'avg_response_time': sum(response_times) / len(response_times),
'min_response_time': min(response_times),
'max_response_time': max(response_times)
}
else:
analysis[ip] = {
'total_checks': len(checks),
'healthy_checks': 0,
'availability': 0,
'avg_response_time': None
}
return analysis
def print_status(self):
"""打印当前状态"""
print(f"\n{'='*60}")
print(f"域名: {self.domain} - 监控状态")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*60}")
current_ips = self.get_a_records()
for ip in current_ips:
status = self.ip_status.get(ip, {})
last_check = status.get('last_check', {})
status_icon = {
'healthy': '✓',
'timeout': '⚠',
'error': '✗'
}.get(last_check.get('status', 'unknown'), '?')
print(f"\nIP: {ip} [{status_icon}]")
if last_check:
print(f" 状态: {last_check.get('status')}")
if last_check.get('response_time'):
print(f" 响应时间: {last_check['response_time']:.2f}ms")
if last_check.get('response_code'):
print(f" 响应代码: {last_check['response_code']}")
if last_check.get('error'):
print(f" 错误: {last_check['error']}")
# 显示性能分析
analysis = self.analyze_performance()
if analysis:
print(f"\n{'='*60}")
print("性能分析:")
for ip, stats in analysis.items():
print(f"\nIP: {ip}")
print(f" 可用性: {stats['availability']:.1f}%")
if stats['avg_response_time']:
print(f" 平均响应: {stats['avg_response_time']:.2f}ms")
print(f" 最小响应: {stats['min_response_time']:.2f}ms")
print(f" 最大响应: {stats['max_response_time']:.2f}ms")
def start(self):
"""开始监控"""
self.monitoring = True
monitor_thread = threading.Thread(target=self.monitor_dns_changes)
monitor_thread.daemon = True
monitor_thread.start()
print(f"开始监控域名: {self.domain}")
def stop(self):
"""停止监控"""
self.monitoring = False
print(f"停止监控域名: {self.domain}")
def export_report(self, filename):
"""导出监控报告"""
report = {
'domain': self.domain,
'monitoring_period': {
'start': min([checks[0]['timestamp'] for checks in self.results.values() if checks]),
'end': max([checks[-1]['timestamp'] for checks in self.results.values() if checks])
},
'ip_analysis': self.analyze_performance(),
'ip_status': {}
}
# 转换datetime对象为字符串
for ip, status in self.ip_status.items():
report['ip_status'][ip] = {}
for key, value in status.items():
if isinstance(value, datetime):
report['ip_status'][ip][key] = value.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(value, dict) and 'timestamp' in value:
value_copy = value.copy()
value_copy['timestamp'] = value['timestamp'].strftime('%Y-%m-%d %H:%M:%S')
report['ip_status'][ip][key] = value_copy
else:
report['ip_status'][ip][key] = value
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
print(f"报告已导出到: {filename}")
# 高级DNS监控功能
class AdvancedDNSMonitor(DNSRoundRobinMonitor):
"""高级DNS监控系统"""
def __init__(self, domain, check_interval=60):
super().__init__(domain, check_interval)
self.alert_callbacks = []
self.performance_thresholds = {
'response_time': 1000, # 毫秒
'availability': 95 # 百分比
}
def add_alert_callback(self, callback):
"""添加告警回调函数"""
self.alert_callbacks.append(callback)
def check_alerts(self):
"""检查告警条件"""
analysis = self.analyze_performance()
for ip, stats in analysis.items():
alerts = []
# 检查可用性
if stats['availability'] < self.performance_thresholds['availability']:
alerts.append({
'type': 'availability',
'ip': ip,
'value': stats['availability'],
'threshold': self.performance_thresholds['availability'],
'message': f"IP {ip} 可用性低于阈值: {stats['availability']:.1f}%"
})
# 检查响应时间
if stats['avg_response_time'] and \
stats['avg_response_time'] > self.performance_thresholds['response_time']:
alerts.append({
'type': 'response_time',
'ip': ip,
'value': stats['avg_response_time'],
'threshold': self.performance_thresholds['response_time'],
'message': f"IP {ip} 响应时间过高: {stats['avg_response_time']:.2f}ms"
})
# 触发告警
for alert in alerts:
self.trigger_alert(alert)
def trigger_alert(self, alert):
"""触发告警"""
print(f"\n[告警] {alert['message']}")
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception as e:
print(f"告警回调执行失败: {e}")
def monitor_dns_ttl(self):
"""监控DNS TTL变化"""
ttl_history = defaultdict(list)
while self.monitoring:
try:
answers = self.resolver.resolve(self.domain, 'A')
current_ttl = answers.rrset.ttl
ttl_history[self.domain].append({
'ttl': current_ttl,
'timestamp': datetime.now()
})
# 检测TTL变化
if len(ttl_history[self.domain]) > 1:
prev_ttl = ttl_history[self.domain][-2]['ttl']
if current_ttl != prev_ttl:
print(f"\n[TTL变化] {self.domain}: {prev_ttl} -> {current_ttl}")
except Exception as e:
print(f"TTL监控错误: {e}")
time.sleep(self.check_interval)
# 使用示例
if __name__ == "__main__":
# 创建监控实例
monitor = AdvancedDNSMonitor('www.baidu.com', check_interval=30)
# 添加告警回调
def email_alert(alert):
"""邮件告警(示例)"""
print(f"[邮件告警] 发送告警邮件: {alert['message']}")
monitor.add_alert_callback(email_alert)
# 开始监控
monitor.start()
try:
# 运行监控
while True:
time.sleep(60)
monitor.check_alerts()
except KeyboardInterrupt:
# 停止监控
monitor.stop()
# 导出报告
monitor.export_report('dns_monitor_report.json')
print("\n监控已停止")
总结
本章详细介绍了Python在系统运维中的三个核心模块:
- psutil - 提供了全面的系统和进程信息获取能力,是系统监控的基础
- IPy - 简化了IP地址和网络的处理,特别适合网络规划和管理
- dnspython - 提供了强大的DNS查询和管理功能,是网络运维的重要工具
这些模块的掌握对于构建自动化运维系统至关重要,它们提供了:
- 实时系统监控能力
- 网络配置和规划工具
- DNS管理和监控功能
通过本章的学习和实践,读者应该能够:
- 使用psutil构建系统监控工具
- 使用IPy进行IP地址规划和网络管理
- 使用dnspython实现DNS查询和监控
- 将这些工具整合到自动化运维流程中
更多推荐


所有评论(0)