news 2026/5/1 9:40:27

Python爬虫实战:使用异步技术与数据解析新方法抓取豆瓣电影Top250

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:使用异步技术与数据解析新方法抓取豆瓣电影Top250

一、项目概述

在本文中,我将详细介绍如何使用Python最新技术栈构建一个高效、健壮的豆瓣电影Top250爬虫。我们将使用异步编程(aiohttp + asyncio)、现代HTML解析库(parsel)以及数据持久化技术,实现一个完整的网络爬虫项目。这个项目不仅适合初学者学习爬虫基础,也适合有一定经验的开发者了解最新的爬虫技术趋势。

二、技术栈介绍

1.异步HTTP客户端:aiohttp

传统的requests库是同步的,在大量请求时效率较低。aiohttp基于asyncio,支持高并发请求,能显著提高爬虫效率。

2.异步任务管理:asyncio

Python的异步IO框架,用于管理并发任务,避免阻塞等待。

3.HTML解析:parsel

比BeautifulSoup更快的解析库,语法类似Scrapy的Selector,支持XPath和CSS选择器。

4.数据存储:SQLAlchemy + SQLite

ORM框架使数据库操作更加方便,SQLite适合小型项目。

5.其他工具:

  • fake-useragent:随机生成User-Agent

  • tenacity:优雅的重试机制

  • pandas:数据分析和导出

  • rich:美观的控制台输出

三、完整爬虫代码实现

python

""" 豆瓣电影Top250爬虫 - 使用最新异步技术栈 作者:Python爬虫专家 创建时间:2024年 """ import asyncio import sqlite3 from datetime import datetime from typing import List, Dict, Optional from dataclasses import dataclass, asdict import json import aiohttp from aiohttp import TCPConnector import pandas as pd from parsel import Selector from fake_useragent import UserAgent from tenacity import retry, stop_after_attempt, wait_exponential from sqlalchemy import create_engine, Column, Integer, String, Float, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table from rich import print as rprint # 数据类定义 @dataclass class Movie: """电影数据类""" ranking: int # 排名 title: str # 标题 original_title: str # 原始标题 year: str # 年份 directors: str # 导演 actors: str # 主演 regions: str # 制片地区 categories: str # 类型 rating: float # 评分 rating_count: int # 评分人数 quote: str # 经典台词 cover_url: str # 封面URL detail_url: str # 详情页URL def to_dict(self) -> Dict: """转换为字典""" return asdict(self) # 数据库模型 Base = declarative_base() class MovieModel(Base): """电影数据库模型""" __tablename__ = 'movies' id = Column(Integer, primary_key=True) ranking = Column(Integer, nullable=False, index=True) title = Column(String(200), nullable=False) original_title = Column(String(200)) year = Column(String(20)) directors = Column(String(500)) actors = Column(String(1000)) regions = Column(String(100)) categories = Column(String(100)) rating = Column(Float) rating_count = Column(Integer) quote = Column(Text) cover_url = Column(String(500)) detail_url = Column(String(500), unique=True) created_at = Column(String(50), default=lambda: datetime.now().strftime('%Y-%m-%d %H:%M:%S')) class DoubanMovieSpider: """豆瓣电影Top250爬虫类""" def __init__(self, concurrency: int = 10): """ 初始化爬虫 Args: concurrency: 并发数 """ self.concurrency = concurrency self.base_url = "https://movie.douban.com/top250" self.console = Console() self.ua = UserAgent() # 初始化数据库 self.engine = create_engine('sqlite:///douban_movies.db', echo=False) Base.metadata.create_all(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() # 请求头 self.headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]: """ 异步获取页面内容 Args: session: aiohttp会话 url: 目标URL Returns: 页面HTML内容 """ @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def _fetch(): headers = self.headers.copy() headers['User-Agent'] = self.ua.random async with session.get( url, headers=headers, timeout=aiohttp.ClientTimeout(total=10), ssl=False ) as response: if response.status == 200: return await response.text(encoding='utf-8') else: self.console.log(f"[red]请求失败: {url}, 状态码: {response.status}[/red]") return None try: return await _fetch() except Exception as e: self.console.log(f"[red]请求异常: {url}, 错误: {e}[/red]") return None def parse_movie_list(self, html: str) -> List[str]: """ 解析电影列表页,获取电影详情页链接 Args: html: 列表页HTML Returns: 电影详情页URL列表 """ selector = Selector(text=html) detail_urls = [] # 提取详情页链接 items = selector.css('.grid_view .item') for item in items: detail_path = item.css('.hd a::attr(href)').get() if detail_path: detail_urls.append(detail_path) # 检查是否有下一页 next_page = selector.css('.next a::attr(href)').get() return detail_urls, next_page def parse_movie_detail(self, html: str, url: str) -> Optional[Movie]: """ 解析电影详情页 Args: html: 详情页HTML url: 详情页URL Returns: 电影对象 """ try: selector = Selector(text=html) # 提取基本信息 title = selector.css('h1 span:nth-child(1)::text').get('').strip() original_title = selector.css('.original-title::text').get('') if original_title: original_title = original_title.replace('原名: ', '').strip() # 提取年份 year_text = selector.css('.year::text').get('') year = year_text.strip('()') if year_text else '' # 提取导演和演员 directors = [] actors = [] # 从详情区域提取信息 info = selector.css('#info').get('') info_selector = Selector(text=info) # 导演 for director in info_selector.css('span:contains("导演") + span a::text').getall(): directors.append(director.strip()) # 演员(前3位) for actor in info_selector.css('span:contains("主演") + span a::text').getall()[:3]: actors.append(actor.strip()) # 地区和类型 regions = [] categories = [] # 地区 regions_text = info_selector.css('span:contains("制片国家/地区")::text').get() if regions_text and ':' in regions_text: regions = [r.strip() for r in regions_text.split(':')[1].split('/')] # 类型 for category in info_selector.css('span[property="v:genre"]::text').getall(): categories.append(category.strip()) # 评分信息 rating_text = selector.css('.rating_num::text').get('0') rating = float(rating_text) if rating_text else 0.0 rating_count_text = selector.css('.rating_people span::text').get('0') rating_count = int(rating_count_text.replace(',', '')) if rating_count_text else 0 # 经典台词 quote = selector.css('.related-info .indent span::text').get('') if not quote: quote = selector.css('.review-content p::text').get('') quote = quote.strip() if quote else '' # 封面URL cover_url = selector.css('#mainpic img::attr(src)').get('') # 排名(从URL中提取或从列表页获取) ranking = 0 return Movie( ranking=ranking, title=title, original_title=original_title, year=year, directors=', '.join(directors), actors=', '.join(actors), regions=', '.join(regions), categories=', '.join(categories), rating=rating, rating_count=rating_count, quote=quote, cover_url=cover_url, detail_url=url ) except Exception as e: self.console.log(f"[red]解析失败: {url}, 错误: {e}[/red]") return None async def process_movie(self, session: aiohttp.ClientSession, url: str, ranking: int) -> Optional[Movie]: """ 处理单个电影 Args: session: aiohttp会话 url: 电影详情页URL ranking: 电影排名 Returns: 电影对象 """ html = await self.fetch_page(session, url) if html: movie = self.parse_movie_detail(html, url) if movie: movie.ranking = ranking return movie return None async def worker(self, session: aiohttp.ClientSession, queue: asyncio.Queue, results: List[Movie], progress: Progress, task_id: int): """ 工作线程 Args: session: aiohttp会话 queue: 任务队列 results: 结果列表 progress: 进度条 task_id: 任务ID """ while True: try: url, ranking = await queue.get() movie = await self.process_movie(session, url, ranking) if movie: results.append(movie) progress.update(task_id, advance=1, description=f"[green]已获取: {movie.title}") queue.task_done() except asyncio.CancelledError: break except Exception as e: self.console.log(f"[red]工作线程异常: {e}[/red]") queue.task_done() async def crawl(self) -> List[Movie]: """ 主爬取函数 Returns: 电影列表 """ self.console.log("[bold blue]开始爬取豆瓣电影Top250...[/bold blue]") all_movies = [] current_page = 0 # 创建aiohttp会话 connector = TCPConnector(limit=self.concurrency, ssl=False) async with aiohttp.ClientSession(connector=connector) as session: # 第一步:获取所有电影详情页URL self.console.log("[yellow]获取电影列表...[/yellow]") detail_urls_with_ranking = [] ranking_counter = 1 start_url = self.base_url while start_url: full_url = f"https://movie.douban.com/top250{start_url}" if start_url.startswith('?') else start_url html = await self.fetch_page(session, full_url) if not html: break detail_urls, next_page = self.parse_movie_list(html) # 为每个URL添加排名 for url in detail_urls: detail_urls_with_ranking.append((url, ranking_counter)) ranking_counter += 1 current_page += 1 self.console.log(f"[cyan]已获取第{current_page}页,累计发现{len(detail_urls_with_ranking)}部电影[/cyan]") if next_page: start_url = next_page await asyncio.sleep(2) # 礼貌性延迟 else: break if not detail_urls_with_ranking: self.console.log("[red]未获取到电影列表[/red]") return [] # 第二步:并发获取详情页 self.console.log(f"[yellow]开始获取{len(detail_urls_with_ranking)}部电影的详细信息...[/yellow]") # 创建任务队列 queue = asyncio.Queue() for url_ranking in detail_urls_with_ranking: await queue.put(url_ranking) # 创建结果列表和进度条 results = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=self.console ) as progress: task_id = progress.add_task( "[cyan]爬取进度...", total=len(detail_urls_with_ranking) ) # 创建工作线程 workers = [] for i in range(min(self.concurrency, len(detail_urls_with_ranking))): worker_task = asyncio.create_task( self.worker(session, queue, results, progress, task_id) ) workers.append(worker_task) # 等待所有任务完成 await queue.join() # 取消工作线程 for worker in workers: worker.cancel() # 等待所有工作线程结束 await asyncio.gather(*workers, return_exceptions=True) all_movies = results return all_movies def save_to_database(self, movies: List[Movie]): """ 保存数据到数据库 Args: movies: 电影列表 """ try: count = 0 for movie in movies: # 检查是否已存在 existing = self.session.query(MovieModel).filter_by( detail_url=movie.detail_url ).first() if not existing: movie_dict = movie.to_dict() movie_model = MovieModel(**movie_dict) self.session.add(movie_model) count += 1 self.session.commit() self.console.log(f"[green]成功保存{count}条数据到数据库[/green]") except Exception as e: self.console.log(f"[red]保存到数据库失败: {e}[/red]") self.session.rollback() def save_to_json(self, movies: List[Movie], filename: str = "douban_top250.json"): """ 保存数据到JSON文件 Args: movies: 电影列表 filename: 文件名 """ try: movies_dict = [movie.to_dict() for movie in movies] with open(filename, 'w', encoding='utf-8') as f: json.dump(movies_dict, f, ensure_ascii=False, indent=2) self.console.log(f"[green]数据已保存到 {filename}[/green]") except Exception as e: self.console.log(f"[red]保存到JSON失败: {e}[/red]") def save_to_excel(self, movies: List[Movie], filename: str = "douban_top250.xlsx"): """ 保存数据到Excel文件 Args: movies: 电影列表 filename: 文件名 """ try: movies_dict = [movie.to_dict() for movie in movies] df = pd.DataFrame(movies_dict) # 按排名排序 df = df.sort_values('ranking') # 保存到Excel df.to_excel(filename, index=False, engine='openpyxl') self.console.log(f"[green]数据已保存到 {filename}[/green]") except Exception as e: self.console.log(f"[red]保存到Excel失败: {e}[/red]") def display_results(self, movies: List[Movie], limit: int = 10): """ 显示结果 Args: movies: 电影列表 limit: 显示数量 """ if not movies: self.console.log("[red]没有获取到数据[/red]") return # 按排名排序 sorted_movies = sorted(movies, key=lambda x: x.ranking) # 创建表格 table = Table(title="豆瓣电影Top250 (前10部)", show_header=True, header_style="bold magenta") table.add_column("排名", style="dim", width=5) table.add_column("标题", style="bold", width=30) table.add_column("导演", width=20) table.add_column("评分", justify="right", width=8) table.add_column("年份", width=6) for movie in sorted_movies[:limit]: table.add_row( str(movie.ranking), movie.title, movie.directors[:20] + "..." if len(movie.directors) > 20 else movie.directors, str(movie.rating), movie.year ) self.console.print(table) # 显示统计信息 total_movies = len(movies) avg_rating = sum(movie.rating for movie in movies) / total_movies if total_movies > 0 else 0 total_ratings = sum(movie.rating_count for movie in movies) self.console.print(f"\n[bold cyan]统计信息:[/bold cyan]") self.console.print(f" 获取电影总数: {total_movies}") self.console.print(f" 平均评分: {avg_rating:.2f}") self.console.print(f" 总评分人数: {total_ratings:,}") # 按类型统计 categories_count = {} for movie in movies: for category in movie.categories.split(', '): if category: categories_count[category] = categories_count.get(category, 0) + 1 if categories_count: self.console.print(f"\n[bold cyan]电影类型分布:[/bold cyan]") for category, count in sorted(categories_count.items(), key=lambda x: x[1], reverse=True)[:5]: self.console.print(f" {category}: {count}部") async def run(self): """ 运行爬虫 """ start_time = datetime.now() try: # 爬取数据 movies = await self.crawl() if movies: # 显示结果 self.display_results(movies) # 保存数据 self.save_to_database(movies) self.save_to_json(movies) self.save_to_excel(movies) end_time = datetime.now() duration = (end_time - start_time).total_seconds() self.console.print(f"\n[bold green]爬取完成![/bold green]") self.console.print(f" 耗时: {duration:.2f}秒") self.console.print(f" 平均每部电影: {duration/len(movies):.2f}秒") else: self.console.print("[bold red]爬取失败,未获取到数据[/bold red]") except KeyboardInterrupt: self.console.print("\n[yellow]爬虫被用户中断[/yellow]") except Exception as e: self.console.print(f"[bold red]爬虫运行异常: {e}[/bold red]") import traceback traceback.print_exc() def main(): """主函数""" # 创建控制台输出 console = Console() console.print("[bold magenta]豆瓣电影Top250爬虫[/bold magenta]") console.print("=" * 50) console.print("特点:") console.print(" • 使用异步技术提高爬取效率") console.print(" • 支持自动重试和错误处理") console.print(" • 多种数据存储方式") console.print(" • 友好的进度显示") console.print(" • 数据分析和统计") console.print("=" * 50) # 配置参数 concurrency = 5 # 并发数,避免给服务器造成压力 # 创建爬虫实例 spider = DoubanMovieSpider(concurrency=concurrency) # 运行爬虫 asyncio.run(spider.run()) if __name__ == "__main__": main()
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 6:08:20

MySQL,InnoDB的select为什么会阻塞insert?(第8讲,超硬核)

《数据库架构100讲》8. InnoDB锁:记录锁,间隙锁,临键锁。今天介绍InnoDB七种锁的最后三种:记录锁,间隙锁,临键锁。MySQL的InnoDB的细粒度行锁,是它最吸引人的特性之一。InnoDB的细粒度锁&#x…

作者头像 李华
网站建设 2026/5/1 6:06:14

构建‘动漫角色语音复活’平台粉丝上传台词生成经典重现

构建“动漫角色语音复活”平台:粉丝上传台词生成经典重现 在B站上,一位《火影忍者》的忠实粉丝上传了一段自制短片——画面中鸣人站在月光下说出那句经典的“我不会放弃的!”,声音却不再是原版声优竹内顺子的演绎,而是…

作者头像 李华
网站建设 2026/5/1 8:33:35

BBDown神器:小白也能轻松掌握的B站视频下载终极指南

你是否曾经遇到过这样的情况:看到B站上一个精彩的教学视频,想要反复观看学习,却因为网络不稳定或者担心视频被下架而焦虑?别担心,今天我要向你推荐一款神器级别的工具——BBDown,让你轻松搞定B站视频下载&a…

作者头像 李华
网站建设 2026/4/30 4:24:01

开发‘生日祝福生成器’朋友录音混合生成惊喜语音贺卡

开发“生日祝福生成器”:朋友录音混合生成惊喜语音贺卡 你有没有想过,给朋友做一张会“说话”的生日贺卡——不是AI机械朗读,而是用他妈妈的声音激动地说“宝贝生日快乐”,或者让那个平时冷淡的哥们儿突然温柔地祝你“天天开心”&…

作者头像 李华
网站建设 2026/4/17 10:58:50

TranslucentTB任务栏透明终极修复方案:Windows 11兼容性完整指南

TranslucentTB任务栏透明终极修复方案:Windows 11兼容性完整指南 【免费下载链接】TranslucentTB A lightweight utility that makes the Windows taskbar translucent/transparent. 项目地址: https://gitcode.com/gh_mirrors/tr/TranslucentTB 还在为Windo…

作者头像 李华
网站建设 2026/5/1 9:15:55

联动‘Unity游戏引擎’实现实时NPC语音生成基于IndexTTS

联动“Unity游戏引擎”实现实时NPC语音生成基于IndexTTS 在今天的开放世界游戏中,玩家早已不再满足于那些用固定语音反复念白的NPC。我们期待的是能根据情境变化语气、带有情绪起伏、甚至会因紧张而结巴的角色——就像真实的人类一样。然而,要实现这种级…

作者头像 李华