系统基础信息模块详解

在自动化运维中,获取和管理系统基础信息是最基本也是最重要的功能之一。本章将详细介绍如何使用Python实现系统信息采集、IP地址处理和DNS管理等核心功能。

1.1 系统性能信息模块 psutil

psutil(process and system utilities)是一个跨平台的Python库,用于获取系统运行时的进程和系统利用率信息,包括CPU、内存、磁盘、网络等。它实现了许多Unix命令行工具提供的功能,如ps、top、lsof、netstat、ifconfig等。

1.1.1 获取系统性能信息

安装psutil

bash

pip install psutil
CPU信息获取

python

import psutil
import datetime

# CPU逻辑核心数
print(f"CPU逻辑核心数: {psutil.cpu_count()}")

# CPU物理核心数
print(f"CPU物理核心数: {psutil.cpu_count(logical=False)}")

# CPU使用率
# interval参数表示计算CPU使用率的时间间隔(秒)
print(f"CPU使用率: {psutil.cpu_percent(interval=1)}%")

# 每个CPU核心的使用率
print(f"每个核心使用率: {psutil.cpu_percent(interval=1, percpu=True)}")

# CPU频率信息
cpu_freq = psutil.cpu_freq()
print(f"CPU当前频率: {cpu_freq.current:.2f} MHz")
print(f"CPU最小频率: {cpu_freq.min:.2f} MHz")
print(f"CPU最大频率: {cpu_freq.max:.2f} MHz")

# CPU时间统计
cpu_times = psutil.cpu_times()
print(f"用户态时间: {cpu_times.user} 秒")
print(f"系统态时间: {cpu_times.system} 秒")
print(f"空闲时间: {cpu_times.idle} 秒")
内存信息获取

python

import psutil

def bytes_to_gb(bytes):
    """字节转换为GB"""
    return bytes / (1024 ** 3)

# 物理内存信息
memory = psutil.virtual_memory()
print(f"总内存: {bytes_to_gb(memory.total):.2f} GB")
print(f"可用内存: {bytes_to_gb(memory.available):.2f} GB")
print(f"已用内存: {bytes_to_gb(memory.used):.2f} GB")
print(f"内存使用率: {memory.percent}%")

# 交换内存信息
swap = psutil.swap_memory()
print(f"\n交换内存总量: {bytes_to_gb(swap.total):.2f} GB")
print(f"交换内存使用: {bytes_to_gb(swap.used):.2f} GB")
print(f"交换内存空闲: {bytes_to_gb(swap.free):.2f} GB")
print(f"交换内存使用率: {swap.percent}%")
磁盘信息获取

python

import psutil

# 磁盘分区信息
partitions = psutil.disk_partitions()
for partition in partitions:
    print(f"设备: {partition.device}")
    print(f"挂载点: {partition.mountpoint}")
    print(f"文件系统: {partition.fstype}")
    
    try:
        partition_usage = psutil.disk_usage(partition.mountpoint)
        print(f"  总容量: {bytes_to_gb(partition_usage.total):.2f} GB")
        print(f"  已使用: {bytes_to_gb(partition_usage.used):.2f} GB")
        print(f"  可用: {bytes_to_gb(partition_usage.free):.2f} GB")
        print(f"  使用率: {partition_usage.percent}%")
    except PermissionError:
        print("  权限不足,无法获取使用信息")
    print("-" * 50)

# 磁盘IO统计
disk_io = psutil.disk_io_counters()
print(f"\n磁盘读取: {bytes_to_gb(disk_io.read_bytes):.2f} GB")
print(f"磁盘写入: {bytes_to_gb(disk_io.write_bytes):.2f} GB")
print(f"读取次数: {disk_io.read_count}")
print(f"写入次数: {disk_io.write_count}")
网络信息获取

python

import psutil

# 网络接口信息
interfaces = psutil.net_if_addrs()
for interface_name, addresses in interfaces.items():
    print(f"\n接口: {interface_name}")
    for addr in addresses:
        if addr.family == 2:  # IPv4
            print(f"  IPv4地址: {addr.address}")
            print(f"  子网掩码: {addr.netmask}")
        elif addr.family == 10:  # IPv6
            print(f"  IPv6地址: {addr.address}")

# 网络连接信息
connections = psutil.net_connections(kind='inet')
print(f"\n活动连接数: {len(connections)}")

# 网络IO统计
net_io = psutil.net_io_counters()
print(f"\n网络发送: {bytes_to_gb(net_io.bytes_sent):.2f} GB")
print(f"网络接收: {bytes_to_gb(net_io.bytes_recv):.2f} GB")
print(f"发送包数: {net_io.packets_sent}")
print(f"接收包数: {net_io.packets_recv}")
系统信息获取

python

import psutil
import platform
from datetime import datetime

# 系统启动时间
boot_time = datetime.fromtimestamp(psutil.boot_time())
print(f"系统启动时间: {boot_time.strftime('%Y-%m-%d %H:%M:%S')}")

# 当前时间
current_time = datetime.now()
uptime = current_time - boot_time
print(f"系统运行时间: {uptime}")

# 用户信息
users = psutil.users()
for user in users:
    print(f"\n用户名: {user.name}")
    print(f"终端: {user.terminal}")
    print(f"主机: {user.host}")
    print(f"登录时间: {datetime.fromtimestamp(user.started).strftime('%Y-%m-%d %H:%M:%S')}")

# 系统平台信息
print(f"\n操作系统: {platform.system()}")
print(f"系统版本: {platform.version()}")
print(f"系统架构: {platform.machine()}")
print(f"主机名: {platform.node()}")
print(f"Python版本: {platform.python_version()}")

1.1.2 系统进程管理方法

进程列表和基本信息

python

import psutil
import time

def get_process_info(pid):
    """获取指定进程的详细信息"""
    try:
        process = psutil.Process(pid)
        info = {
            'pid': process.pid,
            'name': process.name(),
            'status': process.status(),
            'cpu_percent': process.cpu_percent(interval=1),
            'memory_percent': process.memory_percent(),
            'memory_info': process.memory_info(),
            'create_time': time.strftime('%Y-%m-%d %H:%M:%S', 
                                       time.localtime(process.create_time())),
            'num_threads': process.num_threads(),
            'cmdline': ' '.join(process.cmdline())
        }
        return info
    except psutil.NoSuchProcess:
        return None

# 获取所有进程列表
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
    try:
        # 只显示CPU使用率大于0的进程
        if proc.info['cpu_percent'] > 0:
            print(f"PID: {proc.info['pid']}, "
                  f"Name: {proc.info['name']}, "
                  f"CPU: {proc.info['cpu_percent']}%")
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        pass
进程监控和管理

python

import psutil
import time

class ProcessMonitor:
    """进程监控类"""
    
    def __init__(self, process_name):
        self.process_name = process_name
        self.process = None
        self._find_process()
    
    def _find_process(self):
        """查找进程"""
        for proc in psutil.process_iter(['name']):
            if proc.info['name'] == self.process_name:
                self.process = proc
                return True
        return False
    
    def monitor(self, interval=1, duration=60):
        """监控进程资源使用情况"""
        if not self.process:
            print(f"进程 {self.process_name} 未找到")
            return
        
        print(f"开始监控进程: {self.process_name} (PID: {self.process.pid})")
        print("时间\t\tCPU使用率\t内存使用率\t内存使用(MB)")
        print("-" * 60)
        
        start_time = time.time()
        while time.time() - start_time < duration:
            try:
                cpu_percent = self.process.cpu_percent(interval=interval)
                memory_percent = self.process.memory_percent()
                memory_mb = self.process.memory_info().rss / 1024 / 1024
                
                print(f"{time.strftime('%H:%M:%S')}\t"
                      f"{cpu_percent:.1f}%\t\t"
                      f"{memory_percent:.1f}%\t\t"
                      f"{memory_mb:.1f}")
                
                time.sleep(interval)
            except psutil.NoSuchProcess:
                print("进程已结束")
                break
    
    def kill_process(self):
        """终止进程"""
        if self.process:
            try:
                self.process.terminate()  # 优雅终止
                self.process.wait(timeout=3)  # 等待进程结束
            except psutil.TimeoutExpired:
                self.process.kill()  # 强制终止
            print(f"进程 {self.process_name} 已终止")

# 使用示例
# monitor = ProcessMonitor("python.exe")
# monitor.monitor(interval=2, duration=30)
进程树和子进程管理

python

import psutil

def print_process_tree(pid, indent=0):
    """打印进程树"""
    try:
        process = psutil.Process(pid)
        print(' ' * indent + f"├─ {process.name()} (PID: {pid})")
        
        # 获取子进程
        children = process.children(recursive=False)
        for child in children:
            print_process_tree(child.pid, indent + 2)
    except psutil.NoSuchProcess:
        pass

# 打印当前Python进程的进程树
import os
print("当前进程树:")
print_process_tree(os.getpid())

# 创建进程并监控
import subprocess

def run_and_monitor_process(command):
    """运行并监控外部进程"""
    # 启动进程
    proc = subprocess.Popen(command, shell=True)
    psutil_proc = psutil.Process(proc.pid)
    
    print(f"启动进程: {command} (PID: {proc.pid})")
    
    # 监控进程
    while proc.poll() is None:  # 进程还在运行
        try:
            cpu = psutil_proc.cpu_percent(interval=1)
            mem = psutil_proc.memory_info().rss / 1024 / 1024
            print(f"CPU: {cpu}%, 内存: {mem:.1f}MB")
        except psutil.NoSuchProcess:
            break
    
    print(f"进程结束,返回码: {proc.returncode}")
高级进程管理功能

python

import psutil
import signal
import os

class AdvancedProcessManager:
    """高级进程管理器"""
    
    @staticmethod
    def get_top_processes(sort_by='cpu', top_n=10):
        """获取资源占用最高的进程"""
        processes = []
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            try:
                proc.info['cpu_percent'] = proc.cpu_percent(interval=0.1)
                processes.append(proc.info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        
        # 排序
        if sort_by == 'cpu':
            key = 'cpu_percent'
        else:
            key = 'memory_percent'
        
        sorted_procs = sorted(processes, key=lambda x: x[key], reverse=True)
        return sorted_procs[:top_n]
    
    @staticmethod
    def limit_process_resources(pid, cpu_percent=50):
        """限制进程资源使用(Linux)"""
        if os.name != 'posix':
            print("资源限制仅支持Linux系统")
            return
        
        try:
            process = psutil.Process(pid)
            # 使用nice值调整进程优先级
            process.nice(10)  # 降低优先级
            print(f"已调整进程 {pid} 的优先级")
        except Exception as e:
            print(f"调整失败: {e}")
    
    @staticmethod
    def find_zombie_processes():
        """查找僵尸进程"""
        zombies = []
        for proc in psutil.process_iter(['pid', 'name', 'status']):
            try:
                if proc.info['status'] == psutil.STATUS_ZOMBIE:
                    zombies.append(proc.info)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return zombies

# 使用示例
manager = AdvancedProcessManager()

# 获取CPU占用最高的10个进程
print("CPU占用最高的进程:")
for proc in manager.get_top_processes('cpu', 5):
    print(f"PID: {proc['pid']}, Name: {proc['name']}, CPU: {proc['cpu_percent']}%")

# 查找僵尸进程
zombies = manager.find_zombie_processes()
if zombies:
    print("\n发现僵尸进程:")
    for zombie in zombies:
        print(f"PID: {zombie['pid']}, Name: {zombie['name']}")

1.2 实用的IP地址处理模块 IPy

IPy是一个强大的Python第三方模块,用于处理IPv4和IPv6地址和网络。它可以自动识别IP地址版本,进行IP地址计算、验证和转换。

1.2.1 IP地址、网段的基本处理

安装IPy模块

bash

pip install IPy
IP地址基本操作

python

from IPy import IP

# 创建IP对象
ip = IP('192.168.1.1')
print(f"IP地址: {ip}")
print(f"版本: IPv{ip.version()}")
print(f"二进制表示: {ip.strBin()}")
print(f"十六进制表示: {ip.strHex()}")
print(f"整数表示: {ip.int()}")

# IPv6地址
ipv6 = IP('2001:db8::1')
print(f"\nIPv6地址: {ipv6}")
print(f"完整格式: {ipv6.strFullsize()}")
print(f"压缩格式: {ipv6.strCompressed()}")

# IP地址类型判断
print(f"\n{ip} 是私有地址: {ip.iptype() == 'PRIVATE'}")
print(f"{ipv6} 是公网地址: {ipv6.iptype() == 'PUBLIC'}")

# 反向解析格式
print(f"\n反向解析: {ip.reverseNames()[0]}")
网段处理

python

from IPy import IP

# 创建网段对象
network = IP('192.168.1.0/24')
print(f"网段: {network}")
print(f"网络地址: {network.net()}")
print(f"广播地址: {network.broadcast()}")
print(f"网络掩码: {network.netmask()}")
print(f"主机数量: {network.len()}")

# 遍历网段中的所有IP
print("\n网段中的前10个IP:")
for x, ip in enumerate(network):
    if x < 10:
        print(f"  {ip}")
    else:
        break

# CIDR和子网掩码转换
cidr_network = IP('10.0.0.0/255.255.255.0')
print(f"\n子网掩码转CIDR: {cidr_network.strNormal()}")

# 网段包含关系
print(f"\n192.168.1.100 在 {network} 中: {'192.168.1.100' in network}")
print(f"192.168.2.1 在 {network} 中: {'192.168.2.1' in network}")
IP地址比较和运算

python

from IPy import IP

# IP地址比较
ip1 = IP('192.168.1.1')
ip2 = IP('192.168.1.2')
ip3 = IP('192.168.1.1')

print(f"{ip1} < {ip2}: {ip1 < ip2}")
print(f"{ip1} == {ip3}: {ip1 == ip3}")

# 网段比较
net1 = IP('192.168.1.0/24')
net2 = IP('192.168.0.0/16')
net3 = IP('192.168.1.0/25')

print(f"\n{net1} 包含于 {net2}: {net1 in net2}")
print(f"{net3} 包含于 {net1}: {net3 in net1}")

# 网段重叠检测
def check_overlap(net1, net2):
    """检查两个网段是否重叠"""
    return net1.overlaps(net2)

print(f"\n{net1} 与 {net3} 重叠: {check_overlap(net1, net3)}")

1.2.2 多网络计算方法详解

子网划分

python

from IPy import IP

def subnet_calculator(network_str, subnet_size):
    """子网划分计算器"""
    network = IP(network_str)
    subnets = []
    
    # 计算可以划分的子网数量
    network_bits = network.prefixlen()
    subnet_bits = subnet_size
    
    if subnet_bits <= network_bits:
        print(f"错误: 子网位数必须大于原网络位数 {network_bits}")
        return []
    
    # 计算子网数量
    num_subnets = 2 ** (subnet_bits - network_bits)
    subnet_hosts = 2 ** (32 - subnet_bits) - 2  # 减去网络地址和广播地址
    
    print(f"原网络: {network}")
    print(f"子网掩码位数: /{subnet_bits}")
    print(f"子网数量: {num_subnets}")
    print(f"每个子网可用主机数: {subnet_hosts}")
    print("\n子网列表:")
    
    # 生成子网
    current_ip = network.net()
    subnet_size_int = 2 ** (32 - subnet_bits)
    
    for i in range(num_subnets):
        subnet = IP(f"{current_ip}/{subnet_bits}")
        subnets.append(subnet)
        print(f"  子网{i+1}: {subnet} "
              f"(网络: {subnet.net()}, 广播: {subnet.broadcast()})")
        current_ip = IP(current_ip.int() + subnet_size_int)
    
    return subnets

# 使用示例
subnets = subnet_calculator('192.168.1.0/24', 26)
VLSM(可变长子网掩码)计算

python

from IPy import IP

class VLSMCalculator:
    """VLSM计算器"""
    
    def __init__(self, network_str):
        self.network = IP(network_str)
        self.available = [self.network]
        self.allocated = []
    
    def allocate_subnet(self, hosts_needed, description=""):
        """根据主机数分配子网"""
        # 计算需要的子网大小
        subnet_size = 2
        while subnet_size - 2 < hosts_needed:
            subnet_size *= 2
        
        subnet_bits = 32 - subnet_size.bit_length() + 1
        
        # 查找合适的可用网段
        for i, available_net in enumerate(self.available):
            if available_net.prefixlen() <= subnet_bits:
                # 分配子网
                allocated_subnet = IP(f"{available_net.net()}/{subnet_bits}")
                
                # 更新可用网段列表
                self.available.pop(i)
                
                # 计算剩余网段
                remaining_start = allocated_subnet.broadcast().int() + 1
                remaining_end = available_net.broadcast().int()
                
                if remaining_start <= remaining_end:
                    # 添加剩余部分到可用列表
                    remaining_bits = subnet_bits
                    while remaining_start <= remaining_end:
                        try:
                            remaining_net = IP(f"{IP(remaining_start)}/{remaining_bits}")
                            if remaining_net.broadcast().int() <= remaining_end:
                                self.available.append(remaining_net)
                                break
                            remaining_bits += 1
                        except:
                            break
                
                # 记录分配信息
                self.allocated.append({
                    'subnet': allocated_subnet,
                    'hosts_needed': hosts_needed,
                    'hosts_available': subnet_size - 2,
                    'description': description
                })
                
                return allocated_subnet
        
        return None
    
    def print_allocation_summary(self):
        """打印分配摘要"""
        print(f"原始网络: {self.network}")
        print("\n已分配子网:")
        print("-" * 70)
        print(f"{'子网':<20} {'所需主机':<10} {'可用主机':<10} {'说明':<20}")
        print("-" * 70)
        
        for alloc in self.allocated:
            print(f"{str(alloc['subnet']):<20} "
                  f"{alloc['hosts_needed']:<10} "
                  f"{alloc['hosts_available']:<10} "
                  f"{alloc['description']:<20}")
        
        print("\n剩余可用网段:")
        for net in self.available:
            print(f"  {net}")

# 使用示例
vlsm = VLSMCalculator('172.16.0.0/20')

# 按需求分配子网
vlsm.allocate_subnet(500, "总部")
vlsm.allocate_subnet(200, "分公司A")
vlsm.allocate_subnet(100, "分公司B")
vlsm.allocate_subnet(50, "分公司C")
vlsm.allocate_subnet(20, "远程办公")

vlsm.print_allocation_summary()
IP地址聚合(路由汇总)

python

from IPy import IP

def aggregate_networks(networks):
    """IP地址聚合/路由汇总"""
    if not networks:
        return []
    
    # 转换为IP对象并排序
    ip_networks = [IP(net) for net in networks]
    ip_networks.sort()
    
    # 聚合网络
    aggregated = []
    current = ip_networks[0]
    
    for net in ip_networks[1:]:
        # 尝试合并相邻网络
        try:
            # 检查是否可以合并
            combined = IP(f"{current.net()}/{current.prefixlen() - 1}")
            if net in combined and current in combined:
                current = combined
            else:
                aggregated.append(current)
                current = net
        except:
            aggregated.append(current)
            current = net
    
    aggregated.append(current)
    
    # 进一步优化聚合
    optimized = []
    for net in aggregated:
        # 检查是否已被包含
        is_contained = False
        for opt_net in optimized:
            if net in opt_net:
                is_contained = True
                break
        
        if not is_contained:
            # 移除被包含的网络
            optimized = [n for n in optimized if n not in net]
            optimized.append(net)
    
    return optimized

# 使用示例
networks = [
    '192.168.1.0/24',
    '192.168.2.0/24',
    '192.168.3.0/24',
    '192.168.4.0/24',
    '192.168.5.0/24',
    '192.168.6.0/24',
    '192.168.7.0/24',
    '192.168.8.0/24'
]

print("原始网络列表:")
for net in networks:
    print(f"  {net}")

aggregated = aggregate_networks(networks)
print("\n聚合后的网络:")
for net in aggregated:
    print(f"  {net}")
IP地址规划工具

python

from IPy import IP
import json

class IPAddressPlanner:
    """IP地址规划工具"""
    
    def __init__(self, base_network):
        self.base_network = IP(base_network)
        self.plan = {
            'base_network': str(base_network),
            'allocations': []
        }
    
    def plan_network(self, requirements):
        """
        根据需求规划网络
        requirements: 列表,每个元素为 (名称, 主机数, 预留百分比)
        """
        # 按主机数降序排序
        sorted_reqs = sorted(requirements, key=lambda x: x[1], reverse=True)
        
        available_space = self.base_network
        
        for name, hosts, reserve_pct in sorted_reqs:
            # 计算实际需要的主机数(包含预留)
            actual_hosts = int(hosts * (1 + reserve_pct / 100))
            
            # 计算所需的子网大小
            subnet_size = 2
            while subnet_size - 2 < actual_hosts:
                subnet_size *= 2
            
            subnet_bits = 32 - subnet_size.bit_length() + 1
            
            # 分配子网
            if available_space.prefixlen() <= subnet_bits:
                subnet = IP(f"{available_space.net()}/{subnet_bits}")
                
                self.plan['allocations'].append({
                    'name': name,
                    'network': str(subnet),
                    'network_address': str(subnet.net()),
                    'broadcast_address': str(subnet.broadcast()),
                    'netmask': str(subnet.netmask()),
                    'first_host': str(IP(subnet.net().int() + 1)),
                    'last_host': str(IP(subnet.broadcast().int() - 1)),
                    'requested_hosts': hosts,
                    'actual_hosts': actual_hosts,
                    'available_hosts': subnet_size - 2,
                    'utilization': f"{(actual_hosts / (subnet_size - 2) * 100):.1f}%"
                })
                
                # 更新可用空间
                next_network_int = subnet.broadcast().int() + 1
                remaining_bits = available_space.prefixlen()
                
                while remaining_bits < 32:
                    try:
                        available_space = IP(f"{IP(next_network_int)}/{remaining_bits}")
                        if available_space.broadcast().int() <= self.base_network.broadcast().int():
                            break
                    except:
                        pass
                    remaining_bits += 1
            else:
                print(f"警告: 无法为 {name} 分配足够的地址空间")
    
    def export_plan(self, filename=None):
        """导出规划结果"""
        if filename:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(self.plan, f, indent=2, ensure_ascii=False)
        
        return self.plan
    
    def print_plan(self):
        """打印规划结果"""
        print(f"基础网络: {self.plan['base_network']}")
        print("\n网络规划方案:")
        print("-" * 100)
        print(f"{'名称':<15} {'网络':<18} {'首个主机':<15} {'末个主机':<15} "
              f"{'请求数':<8} {'可用数':<8} {'利用率':<8}")
        print("-" * 100)
        
        for alloc in self.plan['allocations']:
            print(f"{alloc['name']:<15} "
                  f"{alloc['network']:<18} "
                  f"{alloc['first_host']:<15} "
                  f"{alloc['last_host']:<15} "
                  f"{alloc['requested_hosts']:<8} "
                  f"{alloc['available_hosts']:<8} "
                  f"{alloc['utilization']:<8}")

# 使用示例
planner = IPAddressPlanner('10.0.0.0/16')

# 定义网络需求
requirements = [
    ('总部核心网', 1000, 20),      # 1000台主机,预留20%
    ('数据中心', 500, 30),          # 500台主机,预留30%
    ('办公区A', 200, 25),           # 200台主机,预留25%
    ('办公区B', 150, 25),           # 150台主机,预留25%
    ('访客网络', 100, 50),          # 100台主机,预留50%
    ('管理网络', 50, 20),           # 50台主机,预留20%
    ('物联网设备', 200, 100),       # 200台设备,预留100%
]

planner.plan_network(requirements)
planner.print_plan()
planner.export_plan('network_plan.json')

1.3 DNS处理模块 dnspython

dnspython是一个功能强大的DNS工具包,提供了高级和低级的DNS查询功能,支持几乎所有的DNS记录类型。

1.3.1 模块域名解析方法详解

安装dnspython

bash

pip install dnspython
基本DNS查询

python

import dns.resolver
import dns.reversename

def basic_dns_query(domain, record_type='A'):
    """基本DNS查询"""
    try:
        # 创建resolver对象
        resolver = dns.resolver.Resolver()
        
        # 执行查询
        answers = resolver.resolve(domain, record_type)
        
        results = []
        for rdata in answers:
            results.append(str(rdata))
        
        return results
    except dns.resolver.NXDOMAIN:
        return f"域名 {domain} 不存在"
    except dns.resolver.NoAnswer:
        return f"域名 {domain} 没有 {record_type} 记录"
    except Exception as e:
        return f"查询错误: {e}"

# A记录查询
print("A记录查询:")
print(f"www.baidu.com: {basic_dns_query('www.baidu.com', 'A')}")

# MX记录查询
print("\nMX记录查询:")
mx_records = basic_dns_query('qq.com', 'MX')
print(f"qq.com MX记录: {mx_records}")

# CNAME记录查询
print("\nCNAME记录查询:")
print(f"www.taobao.com: {basic_dns_query('www.taobao.com', 'CNAME')}")

# TXT记录查询
print("\nTXT记录查询:")
print(f"baidu.com: {basic_dns_query('baidu.com', 'TXT')}")
高级DNS查询功能

python

import dns.resolver
import dns.query
import dns.zone
import time

class AdvancedDNSResolver:
    """高级DNS解析器"""
    
    def __init__(self, nameservers=None, timeout=3):
        self.resolver = dns.resolver.Resolver()
        if nameservers:
            self.resolver.nameservers = nameservers
        self.resolver.timeout = timeout
        self.resolver.lifetime = timeout * 2
    
    def query_all_records(self, domain):
        """查询域名的所有常见记录类型"""
        record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA']
        results = {}
        
        for record_type in record_types:
            try:
                answers = self.resolver.resolve(domain, record_type)
                results[record_type] = []
                
                for rdata in answers:
                    if record_type == 'MX':
                        results[record_type].append({
                            'priority': rdata.preference,
                            'host': str(rdata.exchange)
                        })
                    elif record_type == 'SOA':
                        results[record_type].append({
                            'mname': str(rdata.mname),
                            'rname': str(rdata.rname),
                            'serial': rdata.serial,
                            'refresh': rdata.refresh,
                            'retry': rdata.retry,
                            'expire': rdata.expire,
                            'minimum': rdata.minimum
                        })
                    else:
                        results[record_type].append(str(rdata))
            except:
                continue
        
        return results
    
    def trace_dns_path(self, domain):
        """追踪DNS解析路径"""
        parts = domain.split('.')
        trace_results = []
        
        # 从根服务器开始
        current_ns = ['198.41.0.4']  # a.root-servers.net
        
        for i in range(len(parts)):
            query_domain = '.'.join(parts[-(i+1):])
            
            try:
                # 查询NS记录
                temp_resolver = dns.resolver.Resolver()
                temp_resolver.nameservers = current_ns
                temp_resolver.timeout = 2
                
                ns_answer = temp_resolver.resolve(query_domain, 'NS')
                ns_servers = [str(rdata) for rdata in ns_answer]
                
                # 获取NS服务器的IP
                ns_ips = []
                for ns in ns_servers:
                    try:
                        a_records = self.resolver.resolve(ns.rstrip('.'), 'A')
                        ns_ips.extend([str(rdata) for rdata in a_records])
                    except:
                        pass
                
                trace_results.append({
                    'domain': query_domain,
                    'ns_servers': ns_servers,
                    'ns_ips': ns_ips
                })
                
                if ns_ips:
                    current_ns = ns_ips[:3]  # 使用前3个NS服务器
                
            except Exception as e:
                trace_results.append({
                    'domain': query_domain,
                    'error': str(e)
                })
        
        return trace_results
    
    def check_dns_propagation(self, domain, record_type='A', nameservers=None):
        """检查DNS传播状态"""
        if not nameservers:
            # 使用常见的公共DNS服务器
            nameservers = {
                'Google': ['8.8.8.8', '8.8.4.4'],
                'Cloudflare': ['1.1.1.1', '1.0.0.1'],
                'OpenDNS': ['208.67.222.222', '208.67.220.220'],
                'Quad9': ['9.9.9.9', '149.112.112.112'],
                '阿里DNS': ['223.5.5.5', '223.6.6.6'],
                '百度DNS': ['180.76.76.76'],
                '114DNS': ['114.114.114.114', '114.114.115.115']
            }
        
        results = {}
        
        for provider, servers in nameservers.items():
            for server in servers:
                try:
                    temp_resolver = dns.resolver.Resolver()
                    temp_resolver.nameservers = [server]
                    temp_resolver.timeout = 3
                    
                    start_time = time.time()
                    answers = temp_resolver.resolve(domain, record_type)
                    query_time = (time.time() - start_time) * 1000
                    
                    records = [str(rdata) for rdata in answers]
                    
                    results[f"{provider} ({server})"] = {
                        'records': records,
                        'query_time': f"{query_time:.2f}ms",
                        'status': 'OK'
                    }
                except Exception as e:
                    results[f"{provider} ({server})"] = {
                        'records': [],
                        'query_time': 'N/A',
                        'status': f'Failed: {str(e)}'
                    }
        
        return results

# 使用示例
resolver = AdvancedDNSResolver()

# 查询所有记录
print("查询 example.com 的所有记录:")
all_records = resolver.query_all_records('example.com')
for record_type, records in all_records.items():
    if records:
        print(f"\n{record_type} 记录:")
        for record in records:
            print(f"  {record}")

# 检查DNS传播
print("\n\n检查 baidu.com 的DNS传播状态:")
propagation = resolver.check_dns_propagation('baidu.com')
for server, result in propagation.items():
    print(f"\n{server}:")
    print(f"  状态: {result['status']}")
    print(f"  记录: {result['records']}")
    print(f"  查询时间: {result['query_time']}")

1.3.2 常见解析类型示例说明

各种DNS记录类型查询

python

import dns.resolver
import json

class DNSRecordExplorer:
    """DNS记录探索工具"""
    
    def __init__(self):
        self.resolver = dns.resolver.Resolver()
        self.resolver.timeout = 5
    
    def get_a_records(self, domain):
        """获取A记录(IPv4地址)"""
        try:
            answers = self.resolver.resolve(domain, 'A')
            return {
                'type': 'A',
                'description': 'IPv4地址记录',
                'records': [str(rdata) for rdata in answers],
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'A', 'error': str(e)}
    
    def get_aaaa_records(self, domain):
        """获取AAAA记录(IPv6地址)"""
        try:
            answers = self.resolver.resolve(domain, 'AAAA')
            return {
                'type': 'AAAA',
                'description': 'IPv6地址记录',
                'records': [str(rdata) for rdata in answers],
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'AAAA', 'error': str(e)}
    
    def get_mx_records(self, domain):
        """获取MX记录(邮件服务器)"""
        try:
            answers = self.resolver.resolve(domain, 'MX')
            records = []
            for rdata in answers:
                records.append({
                    'priority': rdata.preference,
                    'mail_server': str(rdata.exchange),
                    'ip_addresses': self._resolve_a_records(str(rdata.exchange))
                })
            
            # 按优先级排序
            records.sort(key=lambda x: x['priority'])
            
            return {
                'type': 'MX',
                'description': '邮件交换记录',
                'records': records,
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'MX', 'error': str(e)}
    
    def get_ns_records(self, domain):
        """获取NS记录(域名服务器)"""
        try:
            answers = self.resolver.resolve(domain, 'NS')
            records = []
            for rdata in answers:
                ns_server = str(rdata)
                records.append({
                    'nameserver': ns_server,
                    'ip_addresses': self._resolve_a_records(ns_server)
                })
            
            return {
                'type': 'NS',
                'description': '域名服务器记录',
                'records': records,
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'NS', 'error': str(e)}
    
    def get_txt_records(self, domain):
        """获取TXT记录(文本信息)"""
        try:
            answers = self.resolver.resolve(domain, 'TXT')
            records = []
            for rdata in answers:
                txt_data = str(rdata).strip('"')
                records.append({
                    'text': txt_data,
                    'purpose': self._identify_txt_purpose(txt_data)
                })
            
            return {
                'type': 'TXT',
                'description': '文本记录',
                'records': records,
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'TXT', 'error': str(e)}
    
    def get_srv_records(self, domain):
        """获取SRV记录(服务记录)"""
        try:
            answers = self.resolver.resolve(domain, 'SRV')
            records = []
            for rdata in answers:
                records.append({
                    'priority': rdata.priority,
                    'weight': rdata.weight,
                    'port': rdata.port,
                    'target': str(rdata.target)
                })
            
            return {
                'type': 'SRV',
                'description': '服务定位记录',
                'records': records,
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'SRV', 'error': str(e)}
    
    def get_caa_records(self, domain):
        """获取CAA记录(证书颁发机构授权)"""
        try:
            answers = self.resolver.resolve(domain, 'CAA')
            records = []
            for rdata in answers:
                records.append({
                    'flags': rdata.flags,
                    'tag': rdata.tag.decode(),
                    'value': rdata.value.decode()
                })
            
            return {
                'type': 'CAA',
                'description': '证书颁发机构授权记录',
                'records': records,
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'CAA', 'error': str(e)}
    
    def get_ptr_record(self, ip_address):
        """获取PTR记录(反向DNS)"""
        try:
            import dns.reversename
            rev_name = dns.reversename.from_address(ip_address)
            answers = self.resolver.resolve(rev_name, 'PTR')
            
            return {
                'type': 'PTR',
                'description': '指针记录(反向DNS)',
                'ip_address': ip_address,
                'hostname': str(answers[0]),
                'ttl': answers.rrset.ttl
            }
        except Exception as e:
            return {'type': 'PTR', 'error': str(e)}
    
    def _resolve_a_records(self, domain):
        """内部方法:解析A记录"""
        try:
            answers = self.resolver.resolve(domain.rstrip('.'), 'A')
            return [str(rdata) for rdata in answers]
        except:
            return []
    
    def _identify_txt_purpose(self, txt_content):
        """识别TXT记录的用途"""
        txt_lower = txt_content.lower()
        
        if 'spf' in txt_lower or 'v=spf1' in txt_lower:
            return 'SPF (邮件发送策略框架)'
        elif 'dkim' in txt_lower or 'v=dkim1' in txt_lower:
            return 'DKIM (域名密钥识别邮件)'
        elif 'dmarc' in txt_lower or 'v=dmarc1' in txt_lower:
            return 'DMARC (基于域的消息认证)'
        elif 'verification' in txt_lower or 'verify' in txt_lower:
            return '域名验证'
        elif 'microsoft' in txt_lower or 'MS=' in txt_content:
            return 'Microsoft域验证'
        elif 'google-site-verification' in txt_lower:
            return 'Google站点验证'
        else:
            return '通用文本信息'
    
    def comprehensive_lookup(self, domain):
        """综合查询域名的所有信息"""
        results = {
            'domain': domain,
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'records': {}
        }
        
        # 查询各种记录类型
        record_methods = [
            self.get_a_records,
            self.get_aaaa_records,
            self.get_mx_records,
            self.get_ns_records,
            self.get_txt_records,
            self.get_caa_records
        ]
        
        for method in record_methods:
            record_result = method(domain)
            if 'error' not in record_result:
                results['records'][record_result['type']] = record_result
        
        return results

# 使用示例
explorer = DNSRecordExplorer()

# 综合查询
print("综合查询 baidu.com:")
comprehensive_result = explorer.comprehensive_lookup('baidu.com')

for record_type, data in comprehensive_result['records'].items():
    print(f"\n{record_type} 记录 ({data['description']}):")
    if 'records' in data:
        for record in data['records']:
            if isinstance(record, dict):
                print(f"  {json.dumps(record, ensure_ascii=False, indent=4)}")
            else:
                print(f"  {record}")
    print(f"  TTL: {data['ttl']}秒")

# PTR记录查询
print("\n\nPTR记录查询:")
ptr_result = explorer.get_ptr_record('8.8.8.8')
if 'error' not in ptr_result:
    print(f"IP地址 {ptr_result['ip_address']} 的主机名: {ptr_result['hostname']}")

1.3.3 实践:DNS域名轮循业务监控

DNS轮询监控系统

python

import dns.resolver
import time
import threading
import requests
from datetime import datetime
from collections import defaultdict

class DNSRoundRobinMonitor:
    """DNS轮询监控系统"""
    
    def __init__(self, domain, check_interval=60):
        self.domain = domain
        self.check_interval = check_interval
        self.resolver = dns.resolver.Resolver()
        self.resolver.timeout = 5
        self.monitoring = False
        self.results = defaultdict(list)
        self.ip_status = {}
        
    def get_a_records(self):
        """获取域名的所有A记录"""
        try:
            answers = self.resolver.resolve(self.domain, 'A')
            return [str(rdata) for rdata in answers]
        except Exception as e:
            print(f"DNS查询失败: {e}")
            return []
    
    def check_ip_health(self, ip, port=80, timeout=5):
        """检查IP地址的健康状态"""
        try:
            # HTTP健康检查
            url = f"http://{ip}:{port}/"
            response = requests.get(url, timeout=timeout, 
                                  headers={'Host': self.domain})
            
            return {
                'ip': ip,
                'status': 'healthy',
                'response_code': response.status_code,
                'response_time': response.elapsed.total_seconds() * 1000,
                'timestamp': datetime.now()
            }
        except requests.exceptions.Timeout:
            return {
                'ip': ip,
                'status': 'timeout',
                'response_code': None,
                'response_time': None,
                'timestamp': datetime.now()
            }
        except Exception as e:
            return {
                'ip': ip,
                'status': 'error',
                'response_code': None,
                'response_time': None,
                'error': str(e),
                'timestamp': datetime.now()
            }
    
    def monitor_dns_changes(self):
        """监控DNS记录变化"""
        previous_ips = set()
        
        while self.monitoring:
            current_ips = set(self.get_a_records())
            
            # 检测新增的IP
            added_ips = current_ips - previous_ips
            if added_ips:
                print(f"\n[{datetime.now()}] 检测到新增IP: {added_ips}")
                for ip in added_ips:
                    self.ip_status[ip] = {'first_seen': datetime.now()}
            
            # 检测删除的IP
            removed_ips = previous_ips - current_ips
            if removed_ips:
                print(f"\n[{datetime.now()}] 检测到删除IP: {removed_ips}")
                for ip in removed_ips:
                    if ip in self.ip_status:
                        self.ip_status[ip]['last_seen'] = datetime.now()
            
            # 健康检查所有当前IP
            for ip in current_ips:
                health_result = self.check_ip_health(ip)
                self.results[ip].append(health_result)
                
                # 更新IP状态
                if ip not in self.ip_status:
                    self.ip_status[ip] = {}
                self.ip_status[ip]['last_check'] = health_result
            
            previous_ips = current_ips
            
            # 输出当前状态
            self.print_status()
            
            time.sleep(self.check_interval)
    
    def analyze_performance(self):
        """分析性能数据"""
        analysis = {}
        
        for ip, checks in self.results.items():
            healthy_checks = [c for c in checks if c['status'] == 'healthy']
            
            if healthy_checks:
                response_times = [c['response_time'] for c in healthy_checks]
                analysis[ip] = {
                    'total_checks': len(checks),
                    'healthy_checks': len(healthy_checks),
                    'availability': len(healthy_checks) / len(checks) * 100,
                    'avg_response_time': sum(response_times) / len(response_times),
                    'min_response_time': min(response_times),
                    'max_response_time': max(response_times)
                }
            else:
                analysis[ip] = {
                    'total_checks': len(checks),
                    'healthy_checks': 0,
                    'availability': 0,
                    'avg_response_time': None
                }
        
        return analysis
    
    def print_status(self):
        """打印当前状态"""
        print(f"\n{'='*60}")
        print(f"域名: {self.domain} - 监控状态")
        print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"{'='*60}")
        
        current_ips = self.get_a_records()
        
        for ip in current_ips:
            status = self.ip_status.get(ip, {})
            last_check = status.get('last_check', {})
            
            status_icon = {
                'healthy': '✓',
                'timeout': '⚠',
                'error': '✗'
            }.get(last_check.get('status', 'unknown'), '?')
            
            print(f"\nIP: {ip} [{status_icon}]")
            
            if last_check:
                print(f"  状态: {last_check.get('status')}")
                if last_check.get('response_time'):
                    print(f"  响应时间: {last_check['response_time']:.2f}ms")
                if last_check.get('response_code'):
                    print(f"  响应代码: {last_check['response_code']}")
                if last_check.get('error'):
                    print(f"  错误: {last_check['error']}")
        
        # 显示性能分析
        analysis = self.analyze_performance()
        if analysis:
            print(f"\n{'='*60}")
            print("性能分析:")
            for ip, stats in analysis.items():
                print(f"\nIP: {ip}")
                print(f"  可用性: {stats['availability']:.1f}%")
                if stats['avg_response_time']:
                    print(f"  平均响应: {stats['avg_response_time']:.2f}ms")
                    print(f"  最小响应: {stats['min_response_time']:.2f}ms")
                    print(f"  最大响应: {stats['max_response_time']:.2f}ms")
    
    def start(self):
        """开始监控"""
        self.monitoring = True
        monitor_thread = threading.Thread(target=self.monitor_dns_changes)
        monitor_thread.daemon = True
        monitor_thread.start()
        print(f"开始监控域名: {self.domain}")
    
    def stop(self):
        """停止监控"""
        self.monitoring = False
        print(f"停止监控域名: {self.domain}")
    
    def export_report(self, filename):
        """导出监控报告"""
        report = {
            'domain': self.domain,
            'monitoring_period': {
                'start': min([checks[0]['timestamp'] for checks in self.results.values() if checks]),
                'end': max([checks[-1]['timestamp'] for checks in self.results.values() if checks])
            },
            'ip_analysis': self.analyze_performance(),
            'ip_status': {}
        }
        
        # 转换datetime对象为字符串
        for ip, status in self.ip_status.items():
            report['ip_status'][ip] = {}
            for key, value in status.items():
                if isinstance(value, datetime):
                    report['ip_status'][ip][key] = value.strftime('%Y-%m-%d %H:%M:%S')
                elif isinstance(value, dict) and 'timestamp' in value:
                    value_copy = value.copy()
                    value_copy['timestamp'] = value['timestamp'].strftime('%Y-%m-%d %H:%M:%S')
                    report['ip_status'][ip][key] = value_copy
                else:
                    report['ip_status'][ip][key] = value
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False, default=str)
        
        print(f"报告已导出到: {filename}")

# 高级DNS监控功能
class AdvancedDNSMonitor(DNSRoundRobinMonitor):
    """高级DNS监控系统"""
    
    def __init__(self, domain, check_interval=60):
        super().__init__(domain, check_interval)
        self.alert_callbacks = []
        self.performance_thresholds = {
            'response_time': 1000,  # 毫秒
            'availability': 95      # 百分比
        }
    
    def add_alert_callback(self, callback):
        """添加告警回调函数"""
        self.alert_callbacks.append(callback)
    
    def check_alerts(self):
        """检查告警条件"""
        analysis = self.analyze_performance()
        
        for ip, stats in analysis.items():
            alerts = []
            
            # 检查可用性
            if stats['availability'] < self.performance_thresholds['availability']:
                alerts.append({
                    'type': 'availability',
                    'ip': ip,
                    'value': stats['availability'],
                    'threshold': self.performance_thresholds['availability'],
                    'message': f"IP {ip} 可用性低于阈值: {stats['availability']:.1f}%"
                })
            
            # 检查响应时间
            if stats['avg_response_time'] and \
               stats['avg_response_time'] > self.performance_thresholds['response_time']:
                alerts.append({
                    'type': 'response_time',
                    'ip': ip,
                    'value': stats['avg_response_time'],
                    'threshold': self.performance_thresholds['response_time'],
                    'message': f"IP {ip} 响应时间过高: {stats['avg_response_time']:.2f}ms"
                })
            
            # 触发告警
            for alert in alerts:
                self.trigger_alert(alert)
    
    def trigger_alert(self, alert):
        """触发告警"""
        print(f"\n[告警] {alert['message']}")
        
        for callback in self.alert_callbacks:
            try:
                callback(alert)
            except Exception as e:
                print(f"告警回调执行失败: {e}")
    
    def monitor_dns_ttl(self):
        """监控DNS TTL变化"""
        ttl_history = defaultdict(list)
        
        while self.monitoring:
            try:
                answers = self.resolver.resolve(self.domain, 'A')
                current_ttl = answers.rrset.ttl
                
                ttl_history[self.domain].append({
                    'ttl': current_ttl,
                    'timestamp': datetime.now()
                })
                
                # 检测TTL变化
                if len(ttl_history[self.domain]) > 1:
                    prev_ttl = ttl_history[self.domain][-2]['ttl']
                    if current_ttl != prev_ttl:
                        print(f"\n[TTL变化] {self.domain}: {prev_ttl} -> {current_ttl}")
                
            except Exception as e:
                print(f"TTL监控错误: {e}")
            
            time.sleep(self.check_interval)

# 使用示例
if __name__ == "__main__":
    # 创建监控实例
    monitor = AdvancedDNSMonitor('www.baidu.com', check_interval=30)
    
    # 添加告警回调
    def email_alert(alert):
        """邮件告警(示例)"""
        print(f"[邮件告警] 发送告警邮件: {alert['message']}")
    
    monitor.add_alert_callback(email_alert)
    
    # 开始监控
    monitor.start()
    
    try:
        # 运行监控
        while True:
            time.sleep(60)
            monitor.check_alerts()
    except KeyboardInterrupt:
        # 停止监控
        monitor.stop()
        
        # 导出报告
        monitor.export_report('dns_monitor_report.json')
        
        print("\n监控已停止")

总结

本章详细介绍了Python在系统运维中的三个核心模块:

  1. psutil - 提供了全面的系统和进程信息获取能力,是系统监控的基础
  2. IPy - 简化了IP地址和网络的处理,特别适合网络规划和管理
  3. dnspython - 提供了强大的DNS查询和管理功能,是网络运维的重要工具

这些模块的掌握对于构建自动化运维系统至关重要,它们提供了:

  • 实时系统监控能力
  • 网络配置和规划工具
  • DNS管理和监控功能

通过本章的学习和实践,读者应该能够:

  • 使用psutil构建系统监控工具
  • 使用IPy进行IP地址规划和网络管理
  • 使用dnspython实现DNS查询和监控
  • 将这些工具整合到自动化运维流程中
Logo

一站式 AI 云服务平台

更多推荐