news 2026/5/28 10:07:33

anyrouter CDN 测速工具 v2.1 - 测试多个服务地址的连接速度

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
anyrouter CDN 测速工具 v2.1 - 测试多个服务地址的连接速度
#!/usr/bin/env python3 """ CDN 测速工具 v2.1 - 测试多个服务地址的连接速度 支持: HTTP延迟、TCP Ping、Traceroute、下载速度测试、JSON导出 """ import urllib.request import urllib.error import time import ssl import socket import statistics import subprocess import struct import sys import json import os from datetime import datetime from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed # 测试地址列表 ENDPOINTS = [ { "name": "大陆优化CDN", "url": "https://c.cspok.cn", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "主站直连", "url": "https://anyrouter.top", "desc": "主站后端直连服务" }, { "name": "大陆网络优化-宁波", "url": "https://pmpjfbhq.cn-nb1.rainapp.top", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "大陆网络优化-上海", "url": "https://a-ocnfniawgw.cn-shanghai.fcapp.run", "desc": "针对中国大陆地区优化的后端服务" } ] # 测试参数 TEST_COUNT = 3 TCP_PING_COUNT = 5 TIMEOUT = 10 TRACEROUTE_MAX_HOPS = 20 def create_ssl_context(): """创建SSL上下文""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def parse_url(url): """解析URL获取主机名和端口""" parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) return host, port def resolve_host(hostname): """DNS解析""" try: ip = socket.gethostbyname(hostname) return {"success": True, "ip": ip} except socket.gaierror as e: return {"success": False, "error": str(e)} def tcp_ping(host, port, timeout=5): """TCP Ping - 测试TCP连接延迟""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) start = time.time() sock.connect((host, port)) elapsed = (time.time() - start) * 1000 # ms sock.close() return {"success": True, "latency": elapsed} except socket.timeout: return {"success": False, "error": "连接超时"} except socket.error as e: return {"success": False, "error": str(e)} def tcp_ping_test(host, port, count=TCP_PING_COUNT): """执行多次TCP Ping测试""" results = [] for i in range(count): result = tcp_ping(host, port) results.append(result) time.sleep(0.1) # 短暂间隔 return results def traceroute_tcp(host, port=443, max_hops=TRACEROUTE_MAX_HOPS, timeout=2): """ TCP Traceroute 实现 使用递增的TTL值来追踪路由路径 """ results = [] # 先解析目标IP try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪路由到 {host} ({dest_ip}), 最大 {max_hops} 跳:") print(f" {'跳数':<4} {'IP地址':<18} {'延迟':<12} {'主机名'}") print(" " + "-" * 55) for ttl in range(1, max_hops + 1): # 创建原始socket(需要root权限)或使用UDP hop_result = {"hop": ttl, "ip": None, "latency": None, "hostname": None} try: # 使用UDP进行traceroute(不需要root) recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) recv_socket.settimeout(timeout) send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) start = time.time() send_socket.sendto(b'', (dest_ip, 33434 + ttl)) try: data, addr = recv_socket.recvfrom(1024) elapsed = (time.time() - start) * 1000 hop_ip = addr[0] # 尝试反向DNS解析 try: hostname = socket.gethostbyaddr(hop_ip)[0] except: hostname = hop_ip hop_result["ip"] = hop_ip hop_result["latency"] = elapsed hop_result["hostname"] = hostname print(f" {ttl:<4} {hop_ip:<18} {elapsed:.2f} ms {hostname}") # 如果到达目标 if hop_ip == dest_ip: results.append(hop_result) break except socket.timeout: hop_result["error"] = "超时" print(f" {ttl:<4} {'*':<18} {'*':<12} 请求超时") recv_socket.close() send_socket.close() except PermissionError: # 没有root权限,使用系统traceroute命令 return traceroute_system(host, max_hops) except Exception as e: hop_result["error"] = str(e) # 尝试使用系统命令 return traceroute_system(host, max_hops) results.append(hop_result) return results def traceroute_system(host, max_hops=TRACEROUTE_MAX_HOPS): """使用系统traceroute命令""" results = [] # 检测可用的traceroute命令 traceroute_cmd = None for cmd in ['traceroute', 'tracepath', 'mtr']: try: subprocess.run([cmd, '--version'], capture_output=True, timeout=2) traceroute_cmd = cmd break except: continue if not traceroute_cmd: print(" 未找到traceroute工具,尝试安装...") return [{"hop": 0, "error": "未找到traceroute工具"}] try: if traceroute_cmd == 'traceroute': cmd = ['traceroute', '-n', '-m', str(max_hops), '-w', '2', host] elif traceroute_cmd == 'tracepath': cmd = ['tracepath', '-n', '-m', str(max_hops), host] else: # mtr cmd = ['mtr', '-n', '-c', '1', '--report', host] print(f" 执行: {' '.join(cmd)}") print() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 实时输出 for line in process.stdout: print(f" {line.rstrip()}") # 解析输出 parts = line.split() if parts and parts[0].isdigit(): hop_num = int(parts[0]) results.append({"hop": hop_num, "raw": line.strip()}) process.wait(timeout=60) except subprocess.TimeoutExpired: print(" traceroute 超时") except Exception as e: print(f" traceroute 错误: {e}") return results def traceroute_simple(host, max_hops=TRACEROUTE_MAX_HOPS): """ 简化版TCP traceroute,使用connect超时方式 """ results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) # 尝试直接连接测试 for ttl in range(1, max_hops + 1): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "到达", "latency": elapsed}) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") results.append({"hop": ttl, "status": "超时"}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: # Connection refused - 通常表示到达 print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "中间节点", "latency": elapsed}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "TTL过期", "latency": elapsed}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") results.append({"hop": ttl, "error": str(e)}) return results def test_latency(url, timeout=TIMEOUT): """测试HTTP延迟""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url, method='HEAD') req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: status = response.status elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": status} except urllib.error.HTTPError as e: elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": e.code} except Exception as e: return {"success": False, "error": str(e)} def test_download_speed(url, timeout=TIMEOUT): """测试下载速度""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url) req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: data = response.read() size = len(data) elapsed = time.time() - start if elapsed > 0: speed = size / elapsed return {"success": True, "size": size, "time": elapsed, "speed": speed} return {"success": True, "size": size, "time": 0, "speed": 0} except Exception as e: return {"success": False, "error": str(e)} def format_speed(bps): """格式化速度""" if bps >= 1024 * 1024: return f"{bps / (1024 * 1024):.2f} MB/s" elif bps >= 1024: return f"{bps / 1024:.2f} KB/s" else: return f"{bps:.2f} B/s" def format_size(bytes_size): """格式化大小""" if bytes_size >= 1024 * 1024: return f"{bytes_size / (1024 * 1024):.2f} MB" elif bytes_size >= 1024: return f"{bytes_size / 1024:.2f} KB" else: return f"{bytes_size} B" def test_endpoint(endpoint, enable_traceroute=True): """测试单个端点""" name = endpoint["name"] url = endpoint["url"] desc = endpoint["desc"] host, port = parse_url(url) print(f"\n{'='*65}") print(f"📡 {name}") print(f" URL: {url}") print(f" 描述: {desc}") print(f"{'='*65}") # 结果数据结构 result_data = { "name": name, "url": url, "description": desc, "host": host, "port": port, "test_time": datetime.now().isoformat(), "dns": {}, "tcp_ping": {}, "traceroute": [], "http": {}, "download": {}, "summary": {} } # DNS 解析 print(f"\n🔍 DNS解析:") dns_result = resolve_host(host) if dns_result["success"]: ip = dns_result["ip"] print(f" {host} -> {ip}") result_data["dns"] = {"success": True, "ip": ip, "hostname": host} else: print(f" ❌ DNS解析失败: {dns_result['error']}") result_data["dns"] = {"success": False, "error": dns_result['error']} result_data["summary"] = { "avg_latency": None, "tcp_latency": None, "speed": None, "status": "DNS解析失败" } return result_data # TCP Ping 测试 print(f"\n🏓 TCP Ping 测试 (端口 {port}, 共{TCP_PING_COUNT}次):") tcp_results = tcp_ping_test(ip, port, TCP_PING_COUNT) tcp_latencies = [] tcp_details = [] for i, result in enumerate(tcp_results, 1): if result["success"]: lat = result["latency"] tcp_latencies.append(lat) tcp_details.append({"seq": i, "latency_ms": round(lat, 2), "success": True}) print(f" 第{i}次: {lat:.2f} ms") else: tcp_details.append({"seq": i, "success": False, "error": result['error']}) print(f" 第{i}次: 失败 - {result['error']}") result_data["tcp_ping"]["details"] = tcp_details if tcp_latencies: tcp_avg = statistics.mean(tcp_latencies) tcp_min = min(tcp_latencies) tcp_max = max(tcp_latencies) tcp_jitter = statistics.stdev(tcp_latencies) if len(tcp_latencies) > 1 else 0 tcp_loss = (TCP_PING_COUNT - len(tcp_latencies)) / TCP_PING_COUNT * 100 print(f"\n 📊 TCP Ping 统计:") print(f" 平均: {tcp_avg:.2f} ms") print(f" 最小: {tcp_min:.2f} ms") print(f" 最大: {tcp_max:.2f} ms") print(f" 抖动: {tcp_jitter:.2f} ms") print(f" 丢包: {tcp_loss:.1f}%") result_data["tcp_ping"]["statistics"] = { "avg_ms": round(tcp_avg, 2), "min_ms": round(tcp_min, 2), "max_ms": round(tcp_max, 2), "jitter_ms": round(tcp_jitter, 2), "packet_loss_percent": round(tcp_loss, 1), "success_count": len(tcp_latencies), "total_count": TCP_PING_COUNT } else: tcp_avg = None print(f"\n ❌ TCP Ping 全部失败") result_data["tcp_ping"]["statistics"] = {"success": False, "packet_loss_percent": 100} # Traceroute 测试 if enable_traceroute: print(f"\n🛤️ Traceroute 测试:") trace_results = traceroute_simple_with_data(ip, max_hops=15) result_data["traceroute"] = trace_results # HTTP 延迟测试 print(f"\n⏱️ HTTP 延迟测试 (共{TEST_COUNT}次):") latencies = [] http_details = [] http_status = None for i in range(TEST_COUNT): result = test_latency(url) if result["success"]: latency = result["latency"] latencies.append(latency) http_status = result.get("status", "N/A") http_details.append({"seq": i+1, "latency_ms": round(latency, 2), "status": http_status, "success": True}) print(f" 第{i+1}次: {latency:.2f} ms (HTTP {http_status})") else: http_details.append({"seq": i+1, "success": False, "error": result['error']}) print(f" 第{i+1}次: 失败 - {result['error']}") result_data["http"]["details"] = http_details result_data["http"]["status_code"] = http_status if latencies: avg_latency = statistics.mean(latencies) min_latency = min(latencies) max_latency = max(latencies) print(f"\n 📊 HTTP延迟统计:") print(f" 平均: {avg_latency:.2f} ms") print(f" 最小: {min_latency:.2f} ms") print(f" 最大: {max_latency:.2f} ms") result_data["http"]["statistics"] = { "avg_ms": round(avg_latency, 2), "min_ms": round(min_latency, 2), "max_ms": round(max_latency, 2), "success_count": len(latencies), "total_count": TEST_COUNT } else: avg_latency = None print(f"\n ❌ HTTP延迟测试失败") result_data["http"]["statistics"] = {"success": False} # 下载测试 print(f"\n📥 下载测试:") dl_result = test_download_speed(url) if dl_result["success"]: size = dl_result["size"] dl_time = dl_result["time"] speed = dl_result["speed"] print(f" 下载大小: {format_size(size)}") print(f" 下载用时: {dl_time:.3f} 秒") print(f" 下载速度: {format_speed(speed)}") result_data["download"] = { "success": True, "size_bytes": size, "time_seconds": round(dl_time, 3), "speed_bps": round(speed, 2), "speed_formatted": format_speed(speed) } else: speed = None print(f" ❌ 下载失败 - {dl_result['error']}") result_data["download"] = {"success": False, "error": dl_result['error']} # 汇总 result_data["summary"] = { "avg_latency": round(avg_latency, 2) if avg_latency else None, "tcp_latency": round(tcp_avg, 2) if tcp_avg else None, "speed": round(speed, 2) if speed else None, "status": "可用" if tcp_avg else "不可用" } return result_data def traceroute_simple_with_data(host, max_hops=TRACEROUTE_MAX_HOPS): """简化版TCP traceroute,返回数据""" results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) for ttl in range(1, max_hops + 1): hop_data = {"hop": ttl} try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") hop_data.update({"status": "到达目标", "latency_ms": round(elapsed, 2), "reached": True}) results.append(hop_data) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") hop_data.update({"status": "超时", "timeout": True}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") hop_data.update({"status": "中间节点", "latency_ms": round(elapsed, 2)}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") hop_data.update({"status": "TTL过期", "latency_ms": round(elapsed, 2)}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") hop_data.update({"error": str(e)}) results.append(hop_data) return results def print_summary(results): """打印汇总结果""" print("\n\n" + "="*70) print("📈 测速结果汇总") print("="*70) # 按TCP延迟排序 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) print(f"\n{'排名':<4} {'名称':<22} {'TCP延迟':<12} {'HTTP延迟':<12} {'速度':<12}") print("-" * 70) for i, r in enumerate(valid_results, 1): tcp_str = f"{r['summary']['tcp_latency']:.2f} ms" if r['summary']['tcp_latency'] else "N/A" http_str = f"{r['summary']['avg_latency']:.2f} ms" if r['summary']['avg_latency'] else "N/A" speed_str = format_speed(r['summary']['speed']) if r['summary']['speed'] else "N/A" print(f"{i:<4} {r['name']:<22} {tcp_str:<12} {http_str:<12} {speed_str:<12}") # 显示失败的 failed = [r for r in results if r["summary"].get("tcp_latency") is None] if failed: print("\n❌ 无法连接的节点:") for r in failed: print(f" - {r['name']} ({r['url']})") # 推荐 if valid_results: best = valid_results[0] print(f"\n" + "="*70) print(f"✅ 推荐使用: {best['name']}") print(f" URL: {best['url']}") if best['summary']['tcp_latency']: print(f" TCP延迟: {best['summary']['tcp_latency']:.2f} ms") if best['summary']['avg_latency']: print(f" HTTP延迟: {best['summary']['avg_latency']:.2f} ms") def export_json(results, output_file=None): """导出测试结果到JSON文件""" if output_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"speedtest_result_{timestamp}.json" # 构建完整报告 report = { "report_info": { "tool": "CDN 测速工具", "version": "2.1", "generated_at": datetime.now().isoformat(), "test_parameters": { "tcp_ping_count": TCP_PING_COUNT, "http_test_count": TEST_COUNT, "timeout_seconds": TIMEOUT, "traceroute_max_hops": TRACEROUTE_MAX_HOPS } }, "endpoints": results, "ranking": [], "recommendation": None } # 生成排名 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) for i, r in enumerate(valid_results, 1): report["ranking"].append({ "rank": i, "name": r["name"], "url": r["url"], "tcp_latency_ms": r["summary"]["tcp_latency"], "http_latency_ms": r["summary"]["avg_latency"], "speed_bps": r["summary"]["speed"] }) # 推荐 if valid_results: best = valid_results[0] report["recommendation"] = { "name": best["name"], "url": best["url"], "tcp_latency_ms": best["summary"]["tcp_latency"], "http_latency_ms": best["summary"]["avg_latency"], "reason": "TCP延迟最低" } # 写入文件 with open(output_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) return output_file def main(): import argparse parser = argparse.ArgumentParser(description='CDN 测速工具 v2.1') parser.add_argument('--no-traceroute', '-t', action='store_true', help='禁用 traceroute 测试') parser.add_argument('--quick', '-q', action='store_true', help='快速模式(减少测试次数)') parser.add_argument('--json', '-j', nargs='?', const='auto', default=None, metavar='FILE', help='导出结果到JSON文件(不指定文件名则自动生成)') parser.add_argument('--output', '-o', type=str, default=None, help='指定JSON输出文件路径') args = parser.parse_args() global TEST_COUNT, TCP_PING_COUNT if args.quick: TEST_COUNT = 1 TCP_PING_COUNT = 3 print("\n" + "="*70) print(" 🚀 CDN 测速工具 v2.1") print(" 支持: TCP Ping | Traceroute | HTTP延迟 | 下载速度 | JSON导出") print(" 测试中国大陆优化节点") print("="*70) results = [] enable_traceroute = not args.no_traceroute for endpoint in ENDPOINTS: result = test_endpoint(endpoint, enable_traceroute=enable_traceroute) results.append(result) print_summary(results) # 导出JSON json_file = args.output or args.json if json_file: if json_file == 'auto': json_file = None # 让export_json自动生成文件名 output_path = export_json(results, json_file) print(f"\n📄 测试结果已导出到: {output_path}") print("\n" + "="*70) print("测速完成!") print("="*70 + "\n") if __name__ == "__main__": main()
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/28 8:02:33

14、Windows 应用开发:环境利用与数据管理

Windows 应用开发:环境利用与数据管理 1. 搜索功能实现 在 Windows 应用开发中,搜索功能是提升用户体验的重要部分。可以通过重写 OnSearchActivated 方法来处理搜索激活事件。以下是 VB 代码示例: Protected Overrides Async Sub OnSearchActivated(args As Windows.…

作者头像 李华
网站建设 2026/5/27 14:16:18

2025网络安全行业全景解析:技术趋势、黄金赛道与职业机遇

2025网络安全行业全景解析&#xff1a;技术趋势、黄金赛道与职业机遇 在数字化转型进入深水区的2025年&#xff0c;网络安全早已不是“IT部门的附属工作”&#xff0c;而是决定企业生存发展的“核心生产力”。从金融机构的交易防护到工业车间的设备安防&#xff0c;从智能汽车…

作者头像 李华
网站建设 2026/5/22 3:31:53

Mushroom Cards:零代码打造专业级Home Assistant智能家居控制面板

Mushroom Cards&#xff1a;零代码打造专业级Home Assistant智能家居控制面板 【免费下载链接】lovelace-mushroom Mushroom Cards - Build a beautiful dashboard easily &#x1f344; 项目地址: https://gitcode.com/gh_mirrors/lo/lovelace-mushroom 厌倦了Home Ass…

作者头像 李华
网站建设 2026/5/23 23:41:05

MobaXterm高效运维实战—Linux运维中的高级技巧与自动化脚本案例深度解析

在当今高度复杂和动态的IT基础架构中&#xff0c;Linux系统运维工程师与DevOps专家面临着前所未有的效率与可靠性挑战。远程连接管理工具的选择与精通程度&#xff0c;直接决定了运维工作的质量与速度。MobaXterm作为一款功能强大的“全能型”终端工具&#xff0c;凭借其对多协…

作者头像 李华
网站建设 2026/5/26 15:56:32

Ramile终极指南:如何5分钟搞定软件著作权代码提取

Ramile终极指南&#xff1a;如何5分钟搞定软件著作权代码提取 【免费下载链接】ramile China software copyright extraction tool - 中国软件著作权代码自动提取工具 项目地址: https://gitcode.com/gh_mirrors/ra/ramile 在软件著作权申请过程中&#xff0c;手动整理3…

作者头像 李华