一、项目概述
在本文中,我将详细介绍如何使用Python最新技术栈构建一个高效、健壮的豆瓣电影Top250爬虫。我们将使用异步编程(aiohttp + asyncio)、现代HTML解析库(parsel)以及数据持久化技术,实现一个完整的网络爬虫项目。这个项目不仅适合初学者学习爬虫基础,也适合有一定经验的开发者了解最新的爬虫技术趋势。
二、技术栈介绍
1.异步HTTP客户端:aiohttp
传统的requests库是同步的,在大量请求时效率较低。aiohttp基于asyncio,支持高并发请求,能显著提高爬虫效率。
2.异步任务管理:asyncio
Python的异步IO框架,用于管理并发任务,避免阻塞等待。
3.HTML解析:parsel
比BeautifulSoup更快的解析库,语法类似Scrapy的Selector,支持XPath和CSS选择器。
4.数据存储:SQLAlchemy + SQLite
ORM框架使数据库操作更加方便,SQLite适合小型项目。
5.其他工具:
fake-useragent:随机生成User-Agent
tenacity:优雅的重试机制
pandas:数据分析和导出
rich:美观的控制台输出
三、完整爬虫代码实现
python
""" 豆瓣电影Top250爬虫 - 使用最新异步技术栈 作者:Python爬虫专家 创建时间:2024年 """ import asyncio import sqlite3 from datetime import datetime from typing import List, Dict, Optional from dataclasses import dataclass, asdict import json import aiohttp from aiohttp import TCPConnector import pandas as pd from parsel import Selector from fake_useragent import UserAgent from tenacity import retry, stop_after_attempt, wait_exponential from sqlalchemy import create_engine, Column, Integer, String, Float, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table from rich import print as rprint # 数据类定义 @dataclass class Movie: """电影数据类""" ranking: int # 排名 title: str # 标题 original_title: str # 原始标题 year: str # 年份 directors: str # 导演 actors: str # 主演 regions: str # 制片地区 categories: str # 类型 rating: float # 评分 rating_count: int # 评分人数 quote: str # 经典台词 cover_url: str # 封面URL detail_url: str # 详情页URL def to_dict(self) -> Dict: """转换为字典""" return asdict(self) # 数据库模型 Base = declarative_base() class MovieModel(Base): """电影数据库模型""" __tablename__ = 'movies' id = Column(Integer, primary_key=True) ranking = Column(Integer, nullable=False, index=True) title = Column(String(200), nullable=False) original_title = Column(String(200)) year = Column(String(20)) directors = Column(String(500)) actors = Column(String(1000)) regions = Column(String(100)) categories = Column(String(100)) rating = Column(Float) rating_count = Column(Integer) quote = Column(Text) cover_url = Column(String(500)) detail_url = Column(String(500), unique=True) created_at = Column(String(50), default=lambda: datetime.now().strftime('%Y-%m-%d %H:%M:%S')) class DoubanMovieSpider: """豆瓣电影Top250爬虫类""" def __init__(self, concurrency: int = 10): """ 初始化爬虫 Args: concurrency: 并发数 """ self.concurrency = concurrency self.base_url = "https://movie.douban.com/top250" self.console = Console() self.ua = UserAgent() # 初始化数据库 self.engine = create_engine('sqlite:///douban_movies.db', echo=False) Base.metadata.create_all(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() # 请求头 self.headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]: """ 异步获取页面内容 Args: session: aiohttp会话 url: 目标URL Returns: 页面HTML内容 """ @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def _fetch(): headers = self.headers.copy() headers['User-Agent'] = self.ua.random async with session.get( url, headers=headers, timeout=aiohttp.ClientTimeout(total=10), ssl=False ) as response: if response.status == 200: return await response.text(encoding='utf-8') else: self.console.log(f"[red]请求失败: {url}, 状态码: {response.status}[/red]") return None try: return await _fetch() except Exception as e: self.console.log(f"[red]请求异常: {url}, 错误: {e}[/red]") return None def parse_movie_list(self, html: str) -> List[str]: """ 解析电影列表页,获取电影详情页链接 Args: html: 列表页HTML Returns: 电影详情页URL列表 """ selector = Selector(text=html) detail_urls = [] # 提取详情页链接 items = selector.css('.grid_view .item') for item in items: detail_path = item.css('.hd a::attr(href)').get() if detail_path: detail_urls.append(detail_path) # 检查是否有下一页 next_page = selector.css('.next a::attr(href)').get() return detail_urls, next_page def parse_movie_detail(self, html: str, url: str) -> Optional[Movie]: """ 解析电影详情页 Args: html: 详情页HTML url: 详情页URL Returns: 电影对象 """ try: selector = Selector(text=html) # 提取基本信息 title = selector.css('h1 span:nth-child(1)::text').get('').strip() original_title = selector.css('.original-title::text').get('') if original_title: original_title = original_title.replace('原名: ', '').strip() # 提取年份 year_text = selector.css('.year::text').get('') year = year_text.strip('()') if year_text else '' # 提取导演和演员 directors = [] actors = [] # 从详情区域提取信息 info = selector.css('#info').get('') info_selector = Selector(text=info) # 导演 for director in info_selector.css('span:contains("导演") + span a::text').getall(): directors.append(director.strip()) # 演员(前3位) for actor in info_selector.css('span:contains("主演") + span a::text').getall()[:3]: actors.append(actor.strip()) # 地区和类型 regions = [] categories = [] # 地区 regions_text = info_selector.css('span:contains("制片国家/地区")::text').get() if regions_text and ':' in regions_text: regions = [r.strip() for r in regions_text.split(':')[1].split('/')] # 类型 for category in info_selector.css('span[property="v:genre"]::text').getall(): categories.append(category.strip()) # 评分信息 rating_text = selector.css('.rating_num::text').get('0') rating = float(rating_text) if rating_text else 0.0 rating_count_text = selector.css('.rating_people span::text').get('0') rating_count = int(rating_count_text.replace(',', '')) if rating_count_text else 0 # 经典台词 quote = selector.css('.related-info .indent span::text').get('') if not quote: quote = selector.css('.review-content p::text').get('') quote = quote.strip() if quote else '' # 封面URL cover_url = selector.css('#mainpic img::attr(src)').get('') # 排名(从URL中提取或从列表页获取) ranking = 0 return Movie( ranking=ranking, title=title, original_title=original_title, year=year, directors=', '.join(directors), actors=', '.join(actors), regions=', '.join(regions), categories=', '.join(categories), rating=rating, rating_count=rating_count, quote=quote, cover_url=cover_url, detail_url=url ) except Exception as e: self.console.log(f"[red]解析失败: {url}, 错误: {e}[/red]") return None async def process_movie(self, session: aiohttp.ClientSession, url: str, ranking: int) -> Optional[Movie]: """ 处理单个电影 Args: session: aiohttp会话 url: 电影详情页URL ranking: 电影排名 Returns: 电影对象 """ html = await self.fetch_page(session, url) if html: movie = self.parse_movie_detail(html, url) if movie: movie.ranking = ranking return movie return None async def worker(self, session: aiohttp.ClientSession, queue: asyncio.Queue, results: List[Movie], progress: Progress, task_id: int): """ 工作线程 Args: session: aiohttp会话 queue: 任务队列 results: 结果列表 progress: 进度条 task_id: 任务ID """ while True: try: url, ranking = await queue.get() movie = await self.process_movie(session, url, ranking) if movie: results.append(movie) progress.update(task_id, advance=1, description=f"[green]已获取: {movie.title}") queue.task_done() except asyncio.CancelledError: break except Exception as e: self.console.log(f"[red]工作线程异常: {e}[/red]") queue.task_done() async def crawl(self) -> List[Movie]: """ 主爬取函数 Returns: 电影列表 """ self.console.log("[bold blue]开始爬取豆瓣电影Top250...[/bold blue]") all_movies = [] current_page = 0 # 创建aiohttp会话 connector = TCPConnector(limit=self.concurrency, ssl=False) async with aiohttp.ClientSession(connector=connector) as session: # 第一步:获取所有电影详情页URL self.console.log("[yellow]获取电影列表...[/yellow]") detail_urls_with_ranking = [] ranking_counter = 1 start_url = self.base_url while start_url: full_url = f"https://movie.douban.com/top250{start_url}" if start_url.startswith('?') else start_url html = await self.fetch_page(session, full_url) if not html: break detail_urls, next_page = self.parse_movie_list(html) # 为每个URL添加排名 for url in detail_urls: detail_urls_with_ranking.append((url, ranking_counter)) ranking_counter += 1 current_page += 1 self.console.log(f"[cyan]已获取第{current_page}页,累计发现{len(detail_urls_with_ranking)}部电影[/cyan]") if next_page: start_url = next_page await asyncio.sleep(2) # 礼貌性延迟 else: break if not detail_urls_with_ranking: self.console.log("[red]未获取到电影列表[/red]") return [] # 第二步:并发获取详情页 self.console.log(f"[yellow]开始获取{len(detail_urls_with_ranking)}部电影的详细信息...[/yellow]") # 创建任务队列 queue = asyncio.Queue() for url_ranking in detail_urls_with_ranking: await queue.put(url_ranking) # 创建结果列表和进度条 results = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=self.console ) as progress: task_id = progress.add_task( "[cyan]爬取进度...", total=len(detail_urls_with_ranking) ) # 创建工作线程 workers = [] for i in range(min(self.concurrency, len(detail_urls_with_ranking))): worker_task = asyncio.create_task( self.worker(session, queue, results, progress, task_id) ) workers.append(worker_task) # 等待所有任务完成 await queue.join() # 取消工作线程 for worker in workers: worker.cancel() # 等待所有工作线程结束 await asyncio.gather(*workers, return_exceptions=True) all_movies = results return all_movies def save_to_database(self, movies: List[Movie]): """ 保存数据到数据库 Args: movies: 电影列表 """ try: count = 0 for movie in movies: # 检查是否已存在 existing = self.session.query(MovieModel).filter_by( detail_url=movie.detail_url ).first() if not existing: movie_dict = movie.to_dict() movie_model = MovieModel(**movie_dict) self.session.add(movie_model) count += 1 self.session.commit() self.console.log(f"[green]成功保存{count}条数据到数据库[/green]") except Exception as e: self.console.log(f"[red]保存到数据库失败: {e}[/red]") self.session.rollback() def save_to_json(self, movies: List[Movie], filename: str = "douban_top250.json"): """ 保存数据到JSON文件 Args: movies: 电影列表 filename: 文件名 """ try: movies_dict = [movie.to_dict() for movie in movies] with open(filename, 'w', encoding='utf-8') as f: json.dump(movies_dict, f, ensure_ascii=False, indent=2) self.console.log(f"[green]数据已保存到 {filename}[/green]") except Exception as e: self.console.log(f"[red]保存到JSON失败: {e}[/red]") def save_to_excel(self, movies: List[Movie], filename: str = "douban_top250.xlsx"): """ 保存数据到Excel文件 Args: movies: 电影列表 filename: 文件名 """ try: movies_dict = [movie.to_dict() for movie in movies] df = pd.DataFrame(movies_dict) # 按排名排序 df = df.sort_values('ranking') # 保存到Excel df.to_excel(filename, index=False, engine='openpyxl') self.console.log(f"[green]数据已保存到 {filename}[/green]") except Exception as e: self.console.log(f"[red]保存到Excel失败: {e}[/red]") def display_results(self, movies: List[Movie], limit: int = 10): """ 显示结果 Args: movies: 电影列表 limit: 显示数量 """ if not movies: self.console.log("[red]没有获取到数据[/red]") return # 按排名排序 sorted_movies = sorted(movies, key=lambda x: x.ranking) # 创建表格 table = Table(title="豆瓣电影Top250 (前10部)", show_header=True, header_style="bold magenta") table.add_column("排名", style="dim", width=5) table.add_column("标题", style="bold", width=30) table.add_column("导演", width=20) table.add_column("评分", justify="right", width=8) table.add_column("年份", width=6) for movie in sorted_movies[:limit]: table.add_row( str(movie.ranking), movie.title, movie.directors[:20] + "..." if len(movie.directors) > 20 else movie.directors, str(movie.rating), movie.year ) self.console.print(table) # 显示统计信息 total_movies = len(movies) avg_rating = sum(movie.rating for movie in movies) / total_movies if total_movies > 0 else 0 total_ratings = sum(movie.rating_count for movie in movies) self.console.print(f"\n[bold cyan]统计信息:[/bold cyan]") self.console.print(f" 获取电影总数: {total_movies}") self.console.print(f" 平均评分: {avg_rating:.2f}") self.console.print(f" 总评分人数: {total_ratings:,}") # 按类型统计 categories_count = {} for movie in movies: for category in movie.categories.split(', '): if category: categories_count[category] = categories_count.get(category, 0) + 1 if categories_count: self.console.print(f"\n[bold cyan]电影类型分布:[/bold cyan]") for category, count in sorted(categories_count.items(), key=lambda x: x[1], reverse=True)[:5]: self.console.print(f" {category}: {count}部") async def run(self): """ 运行爬虫 """ start_time = datetime.now() try: # 爬取数据 movies = await self.crawl() if movies: # 显示结果 self.display_results(movies) # 保存数据 self.save_to_database(movies) self.save_to_json(movies) self.save_to_excel(movies) end_time = datetime.now() duration = (end_time - start_time).total_seconds() self.console.print(f"\n[bold green]爬取完成![/bold green]") self.console.print(f" 耗时: {duration:.2f}秒") self.console.print(f" 平均每部电影: {duration/len(movies):.2f}秒") else: self.console.print("[bold red]爬取失败,未获取到数据[/bold red]") except KeyboardInterrupt: self.console.print("\n[yellow]爬虫被用户中断[/yellow]") except Exception as e: self.console.print(f"[bold red]爬虫运行异常: {e}[/bold red]") import traceback traceback.print_exc() def main(): """主函数""" # 创建控制台输出 console = Console() console.print("[bold magenta]豆瓣电影Top250爬虫[/bold magenta]") console.print("=" * 50) console.print("特点:") console.print(" • 使用异步技术提高爬取效率") console.print(" • 支持自动重试和错误处理") console.print(" • 多种数据存储方式") console.print(" • 友好的进度显示") console.print(" • 数据分析和统计") console.print("=" * 50) # 配置参数 concurrency = 5 # 并发数,避免给服务器造成压力 # 创建爬虫实例 spider = DoubanMovieSpider(concurrency=concurrency) # 运行爬虫 asyncio.run(spider.run()) if __name__ == "__main__": main()