局域网PING检测,不间断PING,查看局域网、交换机丢包率等
1. 环境介绍
开发Python环境 Python 3.9.14
,具体支持到什么版本未测试!
我运行在 PVE 的 CT 里面,CentOS 9
2. 功能
家里的交换机,不晓得什么问题,有的时候就是丢包,正当来测试吧,又好了。不确定什么时候又有问题了。
还有一些个主机时不时的失联,所以就写了一个小工具,不间断的 ping,然后记录日志文件查看 ping 不通的时间,丢包率等等。
3. 使用代码
进入 ping.py
目录,运行
- python3 ping.py
4. 日志输出
- 2024-05-10 10:00:00,000 - INFO - 日志文件名: /root/ping/network_monitor_2024-05-10_10-00-00.log
- 2024-05-10 10:00:00,000 - INFO - 程序开始运行时间: 2024-05-10 10:00:00
- 2024-05-10 10:00:05,000 - WARNING - 主机 10.0.0.1 不可达 - 开始时间: 2024-05-10 10:00:05
- 2024-05-10 10:00:10,000 - WARNING - 主机 10.0.0.1 已恢复 - 不可达持续时间: 5.00 秒
- 2024-05-10 10:01:00,000 - INFO - 程序结束运行时间: 2024-05-10 10:01:00
- 2024-05-10 10:01:00,000 - INFO - 程序总共运行时间: 60.00 秒
- 2024-05-10 10:01:00,000 - INFO - 总共 ping 次数: 12
- 2024-05-10 10:01:00,000 - INFO - 总共丢包次数: 1
- 2024-05-10 10:01:00,000 - INFO - 总丢包率: 8.33%
- 2024-05-10 10:01:00,000 - INFO - 平均响应时间: 0.15 毫秒
- 2024-05-10 10:01:00,000 - WARNING - 程序异常退出: 收到 SIGINT 信号 (Ctrl+C)
完整 Python 代码:
- import subprocess
- import time
- import re
- import logging
- import atexit
- import signal
- import sys
- import os
- print("V2RaySSR 综合网原创")
- print("访问 https://www.v2rayssr.com 获取更多资讯和教程")
- # 定义主机名和日志文件路径为变量
- HOST = "10.0.0.1"
- LOG_DIR = "/root/ping"
- # 获取程序开始时间并生成日志文件名
- start_time = time.time()
- start_time_str = time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(start_time))
- log_filename = os.path.join(LOG_DIR, f'network_monitor_{start_time_str}.log')
- # 检查目录是否存在,如果不存在则创建
- if not os.path.exists(LOG_DIR):
- os.makedirs(LOG_DIR)
- # 设置日志文件
- logging.basicConfig(filename=log_filename, level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s')
- # 测试日志记录功能
- try:
- logging.info(f"日志文件名: {log_filename}")
- except Exception as e:
- print(f"无法写入日志文件: {e}")
- sys.exit(1)
- # 初始化统计信息
- total_pings = 0
- total_lost_pings = 0
- total_response_time = 0.0
- log_exit_time_called = False # 标志位,确保 log_exit_time 只被调用一次
- def ping(host):
- """
- Ping 指定的主机并返回 ping 命令的输出。
- """
- process = subprocess.Popen(['ping', '-c', '1', host], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = process.communicate()
- if process.returncode == 0:
- return stdout.decode('utf-8')
- else:
- return None
- def parse_ping_output(output):
- """
- 解析 ping 命令的输出,提取丢包率和响应时间。
- """
- loss_match = re.search(r'(\d+)% packet loss', output)
- if loss_match:
- packet_loss = int(loss_match.group(1))
- else:
- packet_loss = 100
- time_match = re.search(r'time=(\d+\.\d+) ms', output)
- if time_match:
- ping_time = float(time_match.group(1))
- else:
- ping_time = None
- return packet_loss, ping_time
- def monitor_network(host, interval=5):
- """
- 通过定期 ping 指定的主机来持续监控网络。
- """
- global total_pings, total_lost_pings, total_response_time
- last_failure_time = None # 记录上一次失败的时间
- failure_duration = 0 # 记录持续失败的时间
- while True:
- output = ping(host)
- total_pings += 1
- if output:
- packet_loss, ping_time = parse_ping_output(output)
- if packet_loss == 100:
- total_lost_pings += 1
- if last_failure_time is None:
- last_failure_time = time.time()
- logging.warning(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
- print(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
- failure_duration = time.time() - last_failure_time
- else:
- total_response_time += ping_time
- if last_failure_time is not None:
- logging.warning(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒")
- print(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒")
- last_failure_time = None
- failure_duration = 0
- else:
- total_lost_pings += 1
- if last_failure_time is None:
- last_failure_time = time.time()
- logging.error(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
- print(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
- failure_duration = time.time() - last_failure_time
- time.sleep(interval)
- def log_exit_time(exit_reason=None):
- """
- 记录程序退出的时间和统计信息。
- """
- global log_exit_time_called
- if log_exit_time_called:
- return
- log_exit_time_called = True
- exit_time = time.time()
- exit_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(exit_time))
- total_run_time = exit_time - start_time
- total_lost_rate = (total_lost_pings / total_pings) * 100 if total_pings > 0 else 0
- avg_response_time = (total_response_time / (total_pings - total_lost_pings)) if (total_pings - total_lost_pings) > 0 else 0
- logging.info(f"程序结束运行时间: {exit_time_str}")
- logging.info(f"程序总共运行时间: {total_run_time:.2f} 秒")
- logging.info(f"总共 ping 次数: {total_pings}")
- logging.info(f"总共丢包次数: {total_lost_pings}")
- logging.info(f"总丢包率: {total_lost_rate:.2f}%")
- logging.info(f"平均响应时间: {avg_response_time:.2f} 毫秒")
- if exit_reason:
- logging.warning(f"程序异常退出: {exit_reason}")
- print(f"程序结束运行时间: {exit_time_str}")
- print(f"程序总共运行时间: {total_run_time:.2f} 秒")
- print(f"总共 ping 次数: {total_pings}")
- print(f"总共丢包次数: {total_lost_pings}")
- print(f"总丢包率: {total_lost_rate:.2f}%")
- print(f"平均响应时间: {avg_response_time:.2f} 毫秒")
- # 确保所有日志记录被刷新到磁盘
- logging.shutdown()
- # 注册程序正常退出时的回调函数
- atexit.register(log_exit_time)
- # 捕捉终止信号(例如 Ctrl+C 和系统重启)
- def signal_handler(sig, frame):
- exit_reasons = {
- signal.SIGINT: "收到 SIGINT 信号 (Ctrl+C)",
- signal.SIGTERM: "收到 SIGTERM 信号 (系统关机或重启)",
- signal.SIGHUP: "收到 SIGHUP 信号 (SSH 会话关闭)"
- }
- reason = exit_reasons.get(sig, "收到未知信号")
- log_exit_time(reason)
- sys.exit(0)
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
- # 捕捉 SIGHUP 信号(SSH 关闭引起的挂起信号)
- signal.signal(signal.SIGHUP, signal_handler)
- if __name__ == "__main__":
- try:
- monitor_network(HOST)
- except Exception as e:
- logging.error(f"程序异常退出: {e}")
- log_exit_time(f"程序异常退出: {e}")
- raise
提示:在享受本文内容的同时,请注意版权归属 徐州鑫坤机电设备有限公司https://www.xzxkjd.com如果您觉得有价值欢迎分享,但请务必注明出处,感谢您的理解,谢谢!
以下部分内容需要登录查看 立即登录