news 2026/6/15 21:48:12

Python爬虫实战:最新异步技术抓取编程教程资源

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:最新异步技术抓取编程教程资源

一、前言:为什么需要新一代爬虫技术?

在当今信息爆炸的时代,海量编程教程资源分散在各个网站平台,手动收集这些资源既耗时又低效。传统同步爬虫在面对大量请求时效率低下,而基于异步IO的新一代爬虫技术能够并发处理数百个请求,大幅提升数据采集效率。本文将详细介绍如何使用Python最新的异步爬虫技术,构建一个高效、稳定的编程教程资源抓取系统。

二、技术栈:现代Python爬虫的核心组件

2.1 核心框架选择

  • aiohttp:异步HTTP客户端/服务器框架,支持WebSocket

  • httpx:新一代HTTP客户端,同时支持同步和异步请求

  • playwright:微软开发的浏览器自动化工具,支持现代JavaScript渲染

2.2 解析与存储

  • parsel:Scrapy团队开发的HTML/XML解析库

  • BeautifulSoup4:经典的HTML解析库

  • asyncpg:异步PostgreSQL客户端

  • aiofiles:异步文件操作库

2.3 异步生态

  • asyncio:Python原生异步IO框架

  • anyio:高级异步编程抽象层

  • uvloop:超快速异步事件循环(性能提升2-4倍)

三、完整代码实现:异步爬虫系统

python

""" 现代异步编程教程资源爬虫系统 支持并发抓取、自动去重、断点续爬、反爬绕过等功能 """ import asyncio import aiohttp import aiofiles from typing import List, Dict, Optional from dataclasses import dataclass, asdict from urllib.parse import urljoin, urlparse import hashlib import json import logging from datetime import datetime from contextlib import asynccontextmanager import ssl import certifi # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('crawler.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class TutorialResource: """教程资源数据模型""" title: str url: str description: str category: str file_type: str # pdf, video, zip等 file_size: str download_url: str source_site: str upload_date: str tags: List[str] language: str = "中文" difficulty: str = "中级" rating: float = 0.0 class AsyncTutorialCrawler: """异步教程资源爬虫核心类""" def __init__(self, max_concurrent: int = 100, request_timeout: int = 30, use_proxy: bool = False): """ 初始化爬虫 Args: max_concurrent: 最大并发数 request_timeout: 请求超时时间(秒) use_proxy: 是否使用代理 """ self.max_concurrent = max_concurrent self.request_timeout = request_timeout self.use_proxy = use_proxy self.visited_urls = set() self.resources = [] # SSL上下文配置 self.ssl_context = ssl.create_default_context(cafile=certifi.where()) # 请求头配置(模拟浏览器) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', } # 网站配置(示例) self.site_configs = { 'tutorialspoint': { 'base_url': 'https://www.tutorialspoint.com', 'selectors': { 'resource_list': '.resource-list > div', 'title': 'h2 a::text', 'url': 'h2 a::attr(href)', 'description': '.description::text', 'category': '.category::text', 'file_type': '.file-type::text' } }, 'freecodecamp': { 'base_url': 'https://www.freecodecamp.org', 'api_endpoint': '/api/v1/tutorials' } } @asynccontextmanager async def create_session(self): """创建aiohttp会话上下文""" connector = aiohttp.TCPConnector( limit=self.max_concurrent, ssl=self.ssl_context, force_close=True, enable_cleanup_closed=True ) timeout = aiohttp.ClientTimeout(total=self.request_timeout) async with aiohttp.ClientSession( connector=connector, timeout=timeout, headers=self.headers ) as session: yield session def generate_url_hash(self, url: str) -> str: """生成URL的MD5哈希值用于去重""" return hashlib.md5(url.encode('utf-8')).hexdigest() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, retries: int = 3, delay: float = 1.0) -> Optional[str]: """ 带重试机制的请求函数 Args: session: aiohttp会话 url: 请求URL retries: 重试次数 delay: 重试延迟(秒) Returns: 响应文本或None """ url_hash = self.generate_url_hash(url) # 检查是否已访问 if url_hash in self.visited_urls: logger.debug(f"跳过已访问URL: {url}") return None for attempt in range(retries): try: async with session.get(url, ssl=self.ssl_context) as response: if response.status == 200: self.visited_urls.add(url_hash) text = await response.text() logger.info(f"成功获取: {url} (尝试 {attempt + 1})") return text elif response.status == 429: # 太多请求 wait_time = delay * (2 ** attempt) # 指数退避 logger.warning(f"触发限流,等待 {wait_time} 秒") await asyncio.sleep(wait_time) else: logger.error(f"HTTP错误 {response.status}: {url}") return None except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.warning(f"请求失败 (尝试 {attempt + 1}/{retries}): {url} - {e}") if attempt < retries - 1: await asyncio.sleep(delay * (2 ** attempt)) else: logger.error(f"请求最终失败: {url}") return None return None async def parse_tutorialspoint(self, html: str, base_url: str) -> List[TutorialResource]: """解析tutorialspoint网站""" from parsel import Selector selector = Selector(html) resources = [] config = self.site_configs['tutorialspoint'] for item in selector.css(config['selectors']['resource_list']): try: title = item.css(config['selectors']['title']).get('').strip() relative_url = item.css(config['selectors']['url']).get('') url = urljoin(base_url, relative_url) resource = TutorialResource( title=title, url=url, description=item.css(config['selectors']['description']).get('')[:200], category=item.css(config['selectors']['category']).get('未分类'), file_type=item.css(config['selectors']['file_type']).get('未知'), file_size="待获取", download_url="", # 需要进一步解析详情页 source_site="tutorialspoint", upload_date=datetime.now().strftime("%Y-%m-%d"), tags=["编程", "教程"] ) resources.append(resource) except Exception as e: logger.error(f"解析失败: {e}") continue return resources async def parse_freecodecamp_api(self, data: dict) -> List[TutorialResource]: """解析freecodecamp API数据""" resources = [] for item in data.get('tutorials', []): try: resource = TutorialResource( title=item.get('title', ''), url=item.get('url', ''), description=item.get('description', ''), category=item.get('category', 'programming'), file_type='text', # 主要是文章 file_size='N/A', download_url=item.get('download_url', ''), source_site="freecodecamp", upload_date=item.get('published_at', ''), tags=item.get('tags', []), difficulty=item.get('difficulty', 'beginner'), rating=float(item.get('rating', 0)) ) resources.append(resource) except Exception as e: logger.error(f"解析API数据失败: {e}") continue return resources async def download_resource(self, session: aiohttp.ClientSession, resource: TutorialResource, save_dir: str = "downloads"): """异步下载资源文件""" if not resource.download_url: return try: async with session.get(resource.download_url, ssl=self.ssl_context) as response: if response.status == 200: # 获取文件扩展名 content_disposition = response.headers.get('Content-Disposition', '') filename = resource.title.replace(' ', '_') + self._get_file_extension(resource.file_type) # 异步保存文件 filepath = f"{save_dir}/{filename}" async with aiofiles.open(filepath, 'wb') as f: await f.write(await response.read()) logger.info(f"下载完成: {filename}") resource.file_size = f"{response.content_length / 1024 / 1024:.2f} MB" except Exception as e: logger.error(f"下载失败 {resource.title}: {e}") def _get_file_extension(self, file_type: str) -> str: """根据文件类型获取扩展名""" extensions = { 'pdf': '.pdf', 'video': '.mp4', 'zip': '.zip', 'text': '.txt', 'html': '.html', 'markdown': '.md' } return extensions.get(file_type.lower(), '.bin') async def crawl_site(self, site_name: str, start_url: str = None): """爬取指定网站""" async with self.create_session() as session: if site_name == 'freecodecamp': # API方式获取 api_url = self.site_configs[site_name]['api_endpoint'] full_url = self.site_configs[site_name]['base_url'] + api_url async with session.get(full_url) as response: if response.status == 200: data = await response.json() resources = await self.parse_freecodecamp_api(data) self.resources.extend(resources) else: # HTML解析方式 base_url = start_url or self.site_configs[site_name]['base_url'] html = await self.fetch_with_retry(session, base_url) if html: if site_name == 'tutorialspoint': resources = await self.parse_tutorialspoint(html, base_url) self.resources.extend(resources) # 可以添加更多网站的解析逻辑 async def concurrent_crawl(self, sites: List[Dict]): """并发爬取多个网站""" tasks = [] for site in sites: task = asyncio.create_task(self.crawl_site(site['name'], site.get('url'))) tasks.append(task) # 使用asyncio.gather并发执行 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常 for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"爬取任务失败 {sites[i]['name']}: {result}") async def save_results(self, format: str = 'json'): """保存爬取结果""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if format == 'json': filename = f"tutorial_resources_{timestamp}.json" data = [asdict(resource) for resource in self.resources] async with aiofiles.open(filename, 'w', encoding='utf-8') as f: await f.write(json.dumps(data, ensure_ascii=False, indent=2)) logger.info(f"结果已保存至 {filename}") elif format == 'csv': import csv filename = f"tutorial_resources_{timestamp}.csv" async with aiofiles.open(filename, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=list(asdict(self.resources[0]).keys())) await writer.writeheader() for resource in self.resources: await writer.writerow(asdict(resource)) logger.info(f"结果已保存至 {filename}") async def get_statistics(self) -> Dict: """获取爬取统计信息""" stats = { "total_resources": len(self.resources), "sites_count": len(set(r.source_site for r in self.resources)), "categories": {}, "file_types": {} } for resource in self.resources: # 统计分类 stats["categories"][resource.category] = \ stats["categories"].get(resource.category, 0) + 1 # 统计文件类型 stats["file_types"][resource.file_type] = \ stats["file_types"].get(resource.file_type, 0) + 1 return stats async def main(): """主函数""" # 初始化爬虫(配置高并发) crawler = AsyncTutorialCrawler( max_concurrent=50, request_timeout=45, use_proxy=False ) # 定义要爬取的网站 sites_to_crawl = [ {'name': 'tutorialspoint'}, {'name': 'freecodecamp'}, # 可以添加更多网站 ] logger.info("开始爬取编程教程资源...") try: # 并发爬取 await crawler.concurrent_crawl(sites_to_crawl) # 下载资源文件(可选) # async with crawler.create_session() as session: # download_tasks = [] # for resource in crawler.resources[:10]: # 限制前10个 # task = crawler.download_resource(session, resource) # download_tasks.append(task) # await asyncio.gather(*download_tasks) # 保存结果 await crawler.save_results('json') await crawler.save_results('csv') # 显示统计信息 stats = await crawler.get_statistics() logger.info(f"爬取完成!统计信息:") logger.info(f"总计资源: {stats['total_resources']}个") logger.info(f"来源网站: {stats['sites_count']}个") logger.info(f"分类分布: {stats['categories']}") except KeyboardInterrupt: logger.info("用户中断爬取过程") except Exception as e: logger.error(f"爬取过程发生错误: {e}") finally: logger.info("爬虫运行结束") if __name__ == "__main__": # 使用uvloop提升性能(Linux/Mac) try: import uvloop uvloop.install() logger.info("使用uvloop加速") except ImportError: logger.info("使用标准asyncio事件循环") # 运行主函数 asyncio.run(main())

四、高级功能扩展

4.1 Playwright动态渲染支持

python

async def crawl_js_heavy_site(url: str): """处理JavaScript动态渲染的网站""" from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ) page = await context.new_page() await page.goto(url, wait_until='networkidle') # 等待内容加载 await page.wait_for_selector('.resource-item', timeout=10000) # 获取渲染后的HTML content = await page.content() await browser.close() return content

4.2 分布式爬虫支持

python

import redis.asyncio as redis from celery import Celery # Redis连接池 redis_pool = redis.ConnectionPool.from_url('redis://localhost:6379/0') class DistributedCrawler: def __init__(self): self.redis = redis.Redis(connection_pool=redis_pool) self.celery_app = Celery('crawler', broker='redis://localhost:6379/0') async def distribute_tasks(self, urls: List[str]): """分布式任务分发""" for url in urls: await self.redis.lpush('crawl_queue', url)

4.3 智能限速与反反爬策略

python

class RateLimiter: """智能请求限速器""" def __init__(self, requests_per_minute: int = 60): self.requests_per_minute = requests_per_minute self.request_times = [] async def acquire(self): """获取请求许可""" now = asyncio.get_event_loop().time() # 清理过期的请求记录 self.request_times = [t for t in self.request_times if now - t < 60] if len(self.request_times) >= self.requests_per_minute: # 计算需要等待的时间 oldest = self.request_times[0] wait_time = 60 - (now - oldest) if wait_time > 0: await asyncio.sleep(wait_time) self.request_times.append(now)

五、部署与监控

5.1 Docker部署配置

dockerfile

FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ libpq-dev \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制源代码 COPY . . # 运行爬虫 CMD ["python", "-m", "crawler.main"]

5.2 监控配置

python

from prometheus_client import start_http_server, Counter, Histogram # 定义监控指标 REQUESTS_TOTAL = Counter('crawler_requests_total', 'Total requests') REQUEST_DURATION = Histogram('crawler_request_duration_seconds', 'Request duration') @REQUEST_DURATION.time() async def monitored_fetch(session, url): """带监控的请求函数""" REQUESTS_TOTAL.inc() return await session.get(url)

六、最佳实践与注意事项

6.1 遵守robots.txt

python

from urllib.robotparser import RobotFileParser async def check_robots_txt(url: str): """检查robots.txt协议""" rp = RobotFileParser() base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}" rp.set_url(f"{base_url}/robots.txt") rp.read() return rp.can_fetch("*", url)

6.2 数据清洗与去重

python

import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity def deduplicate_resources(resources: List[TutorialResource], similarity_threshold: float = 0.9): """基于内容相似度的去重""" titles = [r.title for r in resources] # 计算TF-IDF向量 vectorizer = TfidfVectorizer() title_vectors = vectorizer.fit_transform(titles) # 计算相似度矩阵 similarity_matrix = cosine_similarity(title_vectors) # 找出重复项 duplicates = set() for i in range(len(resources)): for j in range(i + 1, len(resources)): if similarity_matrix[i, j] > similarity_threshold: duplicates.add(j) # 返回去重后的列表 return [r for idx, r in enumerate(resources) if idx not in duplicates]

七、总结

本文详细介绍了使用Python最新异步技术构建高效爬虫的完整方案。通过采用aiohttp、playwright等现代库,结合智能限速、分布式部署等高级特性,可以构建出稳定高效的编程教程资源采集系统。在实际应用中,请务必:

  1. 遵守目标网站的robots.txt协议

  2. 设置合理的请求间隔,避免对目标服务器造成压力

  3. 尊重版权,仅下载允许自由传播的资源

  4. 定期更新爬虫以适应网站改版

  5. 实施完善的数据验证和清洗流程

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/15 12:40:38

C++网络模块性能瓶颈如何破?:揭秘异步重构的5大核心技巧

第一章&#xff1a;C网络模块性能瓶颈的根源剖析在高并发网络服务开发中&#xff0c;C因其高性能与底层控制能力被广泛采用。然而&#xff0c;实际项目中常出现网络模块吞吐量低、延迟高、CPU占用异常等问题&#xff0c;其根源往往隐藏于设计与实现细节之中。系统调用开销过大 …

作者头像 李华
网站建设 2026/6/15 12:40:45

系统工程师(SE)十年演进(2015–2025)

系统工程师&#xff08;SE&#xff09;十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年系统工程师还是“机械嵌入式手工联调静态配置”的传统集成角色&#xff0c;2025年已进化成“全栈具身智能架构师微内核/VLA大模型系统设计师亿级仿真闭环运维师…

作者头像 李华
网站建设 2026/6/15 12:41:07

测试十年演进(2015–2025)

测试十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年测试还是“手工脚本真实里程积累离线日志分析”的被动验证时代&#xff0c;2025年已进化成“亿级并行仿真影子模式实时双实例大模型故障自生成量子级不确定性注入车云端自愈闭环”的主动智能验证…

作者头像 李华
网站建设 2026/6/15 14:58:31

RV减速器十年演进(2015–2025)

RV减速器十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年RV减速器还是“Nabtesco日本100%垄断刚性高背隙3–8万元单价”的工业贵族时代&#xff0c;2025年已进化成“中国高功率密度RV零背隙纳米级重复精度一体化关节量子级自愈补偿”的普惠具身时代…

作者头像 李华
网站建设 2026/6/15 13:35:00

多任务并行训练管理:通过output_dir区分不同项目输出

多任务并行训练管理&#xff1a;通过output_dir区分不同项目输出 在AI模型微调日益普及的今天&#xff0c;越来越多开发者和团队需要同时推进多个LoRA&#xff08;Low-Rank Adaptation&#xff09;训练任务——有人在定制风格化图像生成模型&#xff0c;有人在优化垂直领域的语…

作者头像 李华
网站建设 2026/6/15 14:22:45

机器人动力学十年演进(2015–2025)

机器人动力学十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年动力学还是“手工拉格朗日/牛顿-欧拉固定参数离线优化”的刚性机械时代&#xff0c;2025年已进化成“端到端VLA大模型可微动力学实时参数自辨识亿级仿真自进化量子级不确定性补偿”的具身…

作者头像 李华