news 2026/4/30 20:56:38

Python实现跨平台商品价格追踪:构建智能比价爬虫系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python实现跨平台商品价格追踪:构建智能比价爬虫系统

引言:电商时代的价格监控革命

在当今的电商时代,商品价格波动频繁且迅速。对于精明的消费者、电商从业者、数据分析师和市场研究人员来说,实时监控亚马逊、淘宝、京东等主要电商平台的商品价格变化已成为一项重要技能。通过构建智能价格追踪爬虫系统,我们不仅可以捕捉最佳购买时机,还能进行市场趋势分析和竞争对手监控。本文将深入探讨如何使用Python最新技术构建一个高效、稳定的跨平台商品价格追踪系统。

技术栈概览

我们将在本项目中采用以下现代Python技术栈:

  • Playwright:微软开发的现代化浏览器自动化工具,支持无头浏览器操作

  • Asyncio:Python原生异步IO框架,实现高并发爬取

  • BeautifulSoup4/Selectolax:高效的HTML解析库

  • Pydantic:数据验证与设置管理

  • SQLAlchemy + SQLite:轻量级数据存储方案

  • FastAPI:可选的可视化API后端

  • 代理池支持:处理反爬机制

  • 机器学习预警:基于历史价格的智能预测

系统架构设计

1. 项目结构规划

text

price-tracker/ ├── crawlers/ # 各平台爬虫模块 │ ├── base.py # 基础爬虫类 │ ├── amazon.py # 亚马逊爬虫 │ ├── taobao.py # 淘宝爬虫 │ └── jd.py # 京东爬虫 ├── models/ # 数据模型 │ ├── product.py # 商品模型 │ └── price.py # 价格记录模型 ├── database/ # 数据库操作 ├── config/ # 配置文件 ├── utils/ # 工具函数 ├── alerts/ # 预警系统 └── main.py # 主程序入口

2. 核心功能模块

  • 多平台适配器:统一接口支持不同电商平台

  • 智能请求调度:自适应请求频率控制

  • 反爬虫对抗:自动切换代理、用户代理和浏览器指纹

  • 数据清洗管道:标准化不同平台的数据格式

  • 实时监控与预警:基于规则和机器学习的价格异常检测

完整代码实现

1. 环境配置与依赖安装

首先创建并激活虚拟环境,然后安装所需依赖:

bash

# 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # 或 venv\Scripts\activate # Windows # 安装核心依赖 pip install playwright selectolax pydantic sqlalchemy aiohttp pip install asyncio nest-asyncio pandas numpy pip install fastapi uvicorn jinja2 # 可选,用于Web界面 # 安装Playwright浏览器 playwright install chromium

2. 配置管理模块

python

# config/settings.py from pydantic import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 数据库配置 DATABASE_URL: str = "sqlite:///./price_tracker.db" # 爬虫配置 REQUEST_TIMEOUT: int = 30 MAX_CONCURRENT_REQUESTS: int = 5 DEFAULT_USER_AGENT: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" # 代理配置 PROXY_ENABLED: bool = False PROXY_POOL: List[str] = [] # 平台特定配置 AMAZON_DOMAINS: dict = { 'US': 'amazon.com', 'UK': 'amazon.co.uk', 'DE': 'amazon.de', 'JP': 'amazon.co.jp' } # 价格监控阈值 PRICE_DROP_THRESHOLD: float = 0.15 # 价格下降15%时触发通知 CHECK_INTERVAL_HOURS: int = 6 class Config: env_file = ".env" settings = Settings()

3. 数据模型定义

python

# models/product.py from pydantic import BaseModel, HttpUrl from typing import Optional, List from datetime import datetime from enum import Enum class Platform(str, Enum): AMAZON = "amazon" TAOBAO = "taobao" JD = "jd" EBAY = "ebay" class Product(BaseModel): id: Optional[int] = None platform: Platform product_id: str url: HttpUrl title: str description: Optional[str] = None current_price: Optional[float] = None original_price: Optional[float] = None currency: str = "USD" availability: bool = True image_url: Optional[str] = None category: Optional[str] = None last_updated: datetime = datetime.now() class Config: orm_mode = True class PriceHistory(BaseModel): id: Optional[int] = None product_id: int price: float currency: str timestamp: datetime = datetime.now() availability: bool = True special_offer: Optional[str] = None class Config: orm_mode = True

4. 数据库管理

python

# database/database.py from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, DateTime, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime import os from config.settings import settings Base = declarative_base() class ProductORM(Base): __tablename__ = "products" id = Column(Integer, primary_key=True, index=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), unique=True, nullable=False) url = Column(String(500), nullable=False) title = Column(String(500), nullable=False) description = Column(Text) current_price = Column(Float) original_price = Column(Float) currency = Column(String(10), default="USD") availability = Column(Boolean, default=True) image_url = Column(String(500)) category = Column(String(200)) last_updated = Column(DateTime, default=datetime.now) price_history = relationship("PriceHistoryORM", back_populates="product", cascade="all, delete-orphan") class PriceHistoryORM(Base): __tablename__ = "price_history" id = Column(Integer, primary_key=True, index=True) product_id = Column(Integer, ForeignKey("products.id")) price = Column(Float, nullable=False) currency = Column(String(10), default="USD") timestamp = Column(DateTime, default=datetime.now) availability = Column(Boolean, default=True) special_offer = Column(String(200)) product = relationship("ProductORM", back_populates="price_history") # 数据库初始化 engine = create_engine(settings.DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def init_db(): Base.metadata.create_all(bind=engine) def get_db(): db = SessionLocal() try: yield db finally: db.close()

5. 基础爬虫类

python

# crawlers/base.py import asyncio import aiohttp from typing import Optional, Dict, Any from selectolax.parser import HTMLParser import random import time from dataclasses import dataclass from urllib.parse import urlparse import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CrawlerConfig: user_agents: list = None proxy: Optional[str] = None timeout: int = 30 max_retries: int = 3 delay_range: tuple = (1, 3) class BaseCrawler: def __init__(self, config: CrawlerConfig = None): self.config = config or CrawlerConfig() if not self.config.user_agents: self.config.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" ] def get_random_user_agent(self) -> str: return random.choice(self.config.user_agents) async def fetch(self, url: str, session: aiohttp.ClientSession) -> Optional[str]: headers = { "User-Agent": self.get_random_user_agent(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } for attempt in range(self.config.max_retries): try: async with session.get( url, headers=headers, proxy=self.config.proxy, timeout=aiohttp.ClientTimeout(total=self.config.timeout) ) as response: if response.status == 200: return await response.text() elif response.status == 429: # Too Many Requests wait_time = 2 ** attempt # Exponential backoff logger.warning(f"Rate limited. Waiting {wait_time} seconds...") await asyncio.sleep(wait_time) else: logger.error(f"Failed to fetch {url}: Status {response.status}") return None except Exception as e: logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}") if attempt < self.config.max_retries - 1: await asyncio.sleep(2 ** attempt) else: return None # 随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(*self.config.delay_range)) return None def parse_html(self, html: str) -> HTMLParser: return HTMLParser(html)

6. 亚马逊爬虫实现

python

# crawlers/amazon.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re from urllib.parse import urlparse, parse_qs import json import logging logger = logging.getLogger(__name__) class AmazonCrawler(BaseCrawler): def __init__(self, country: str = "US", config: CrawlerConfig = None): super().__init__(config) self.country = country.upper() self.base_domain = f"amazon.{self._get_domain_suffix()}" def _get_domain_suffix(self) -> str: domains = { "US": "com", "UK": "co.uk", "DE": "de", "JP": "co.jp", "FR": "fr", "IT": "it", "ES": "es", "CA": "ca", "AU": "com.au" } return domains.get(self.country, "com") def extract_product_id(self, url: str) -> Optional[str]: """提取亚马逊商品ID""" patterns = [ r'/dp/([A-Z0-9]{10})', r'/gp/product/([A-Z0-9]{10})', r'/product/([A-Z0-9]{10})', r'/dp/([A-Z0-9]{10})/' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) # 尝试从查询参数中提取 parsed = urlparse(url) query_params = parse_qs(parsed.query) if 'asin' in query_params: return query_params['asin'][0] return None async def get_product_info(self, url: str, session) -> Optional[Product]: html = await self.fetch(url, session) if not html: return None tree = self.parse_html(html) # 提取商品信息 product_data = self._parse_product_page(tree) if product_data: return Product( platform="amazon", product_id=product_data["product_id"], url=url, title=product_data["title"], current_price=product_data["current_price"], original_price=product_data["original_price"], currency=product_data["currency"], availability=product_data["availability"], image_url=product_data.get("image_url"), description=product_data.get("description") ) return None def _parse_product_page(self, tree) -> Optional[dict]: """解析亚马逊商品页面""" try: # 尝试从JSON-LD中提取数据 script_tags = tree.css('script[type="application/ld+json"]') for script in script_tags: try: data = json.loads(script.text()) if data.get("@type") == "Product": product_info = { "product_id": data.get("sku") or data.get("productID", ""), "title": data.get("name", ""), "description": data.get("description", ""), "image_url": data.get("image", ""), "currency": "USD", "availability": False, "current_price": None, "original_price": None } # 提取价格信息 if "offers" in data: offers = data["offers"] if isinstance(offers, dict): offers = [offers] for offer in offers: if offer.get("availability") == "https://schema.org/InStock": product_info["availability"] = True price = offer.get("price") if price: product_info["current_price"] = float(price) product_info["currency"] = offer.get("priceCurrency", "USD") return product_info except json.JSONDecodeError: continue # 备用解析方法:直接解析HTML元素 title_elem = tree.css_first('#productTitle') title = title_elem.text(strip=True) if title_elem else "" # 价格解析 price_selectors = [ '.a-price .a-offscreen', '#price_inside_buybox', '#priceblock_ourprice', '#priceblock_dealprice', '.a-color-price' ] current_price = None for selector in price_selectors: price_elem = tree.css_first(selector) if price_elem: price_text = price_elem.text(strip=True) price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if price_match: current_price = float(price_match.group()) break # 提取商品ID product_id = None asin_elem = tree.css_first('input[name="asin"]') if asin_elem and asin_elem.attributes.get('value'): product_id = asin_elem.attributes['value'] return { "product_id": product_id or "", "title": title, "current_price": current_price, "original_price": None, "currency": "USD", "availability": current_price is not None, "image_url": None, "description": "" } except Exception as e: logger.error(f"Error parsing Amazon page: {str(e)}") return None

7. 淘宝爬虫实现

python

# crawlers/taobao.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re import json import logging logger = logging.getLogger(__name__) class TaobaoCrawler(BaseCrawler): def __init__(self, config: CrawlerConfig = None): super().__init__(config) self.base_url = "https://item.taobao.com" def extract_product_id(self, url: str) -> Optional[str]: """提取淘宝商品ID""" # 匹配淘宝商品URL模式 patterns = [ r'item\.taobao\.com/item\.htm\?id=(\d+)', r'taobao\.com/item\.htm\?id=(\d+)', r'/item/(\d+)\.html' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None async def get_product_info(self, url: str, session) -> Optional[Product]: # 淘宝需要特殊处理,可能需要使用无头浏览器 from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.get_random_user_agent(), viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() try: await page.goto(url, timeout=30000) # 等待页面加载完成 await page.wait_for_load_state('networkidle', timeout=10000) # 提取商品信息 title = await self._extract_title(page) price = await self._extract_price(page) product_id = self.extract_product_id(url) if title and product_id: return Product( platform="taobao", product_id=product_id, url=url, title=title, current_price=float(price) if price else None, currency="CNY", availability=price is not None ) except Exception as e: logger.error(f"Error crawling Taobao: {str(e)}") finally: await browser.close() return None async def _extract_title(self, page) -> Optional[str]: """提取商品标题""" try: # 尝试多种选择器 selectors = [ '.tb-detail-hd h1', '.tb-main-title', '[data-title]', '.title-text' ] for selector in selectors: element = await page.query_selector(selector) if element: title = await element.text_content() if title and len(title.strip()) > 0: return title.strip() return None except Exception as e: logger.error(f"Error extracting title: {str(e)}") return None async def _extract_price(self, page) -> Optional[str]: """提取商品价格""" try: # 价格选择器 selectors = [ '.tb-rmb-num', '.tm-price', '.price', '[property="og:product:price"]' ] for selector in selectors: element = await page.query_selector(selector) if element: price_text = await element.text_content() if price_text: # 提取数字 match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if match: return match.group() return None except Exception as e: logger.error(f"Error extracting price: {str(e)}") return None

8. 异步爬虫调度器

python

# crawlers/scheduler.py import asyncio from typing import List, Dict, Any, Optional import aiohttp from datetime import datetime import logging from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor from crawlers.amazon import AmazonCrawler from crawlers.taobao import TaobaoCrawler from models.product import Product, Platform from database.database import SessionLocal, ProductORM, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class TrackingTask: url: str platform: Platform check_interval: int = 6 # 检查间隔(小时) last_checked: Optional[datetime] = None class PriceTrackerScheduler: def __init__(self, max_concurrent: int = 5): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.crawlers = { Platform.AMAZON: AmazonCrawler(), Platform.TAOBAO: TaobaoCrawler(), } self.tracking_tasks: List[TrackingTask] = [] def add_tracking_task(self, url: str, platform: Platform, check_interval: int = 6): """添加追踪任务""" task = TrackingTask(url=url, platform=platform, check_interval=check_interval) self.tracking_tasks.append(task) logger.info(f"Added tracking task: {url}") async def crawl_product(self, task: TrackingTask, session: aiohttp.ClientSession) -> Optional[Product]: """爬取单个商品信息""" async with self.semaphore: try: crawler = self.crawlers.get(task.platform) if not crawler: logger.error(f"No crawler for platform: {task.platform}") return None product = await crawler.get_product_info(task.url, session) if product: logger.info(f"Successfully crawled {product.title}") return product except Exception as e: logger.error(f"Error crawling {task.url}: {str(e)}") return None async def process_batch(self, tasks: List[TrackingTask]): """批量处理追踪任务""" async with aiohttp.ClientSession() as session: tasks_list = [self.crawl_product(task, session) for task in tasks] results = await asyncio.gather(*tasks_list, return_exceptions=True) successful_products = [] for result in results: if isinstance(result, Product): successful_products.append(result) elif isinstance(result, Exception): logger.error(f"Task failed with exception: {str(result)}") # 保存到数据库 await self.save_to_database(successful_products) return successful_products async def save_to_database(self, products: List[Product]): """保存商品信息到数据库""" db = SessionLocal() try: for product in products: # 检查商品是否已存在 db_product = db.query(ProductORM).filter( ProductORM.platform == product.platform, ProductORM.product_id == product.product_id ).first() if db_product: # 更新现有商品 price_changed = db_product.current_price != product.current_price db_product.current_price = product.current_price db_product.original_price = product.original_price or db_product.original_price db_product.availability = product.availability db_product.last_updated = datetime.now() # 如果价格变化,添加历史记录 if price_changed and product.current_price: price_history = PriceHistoryORM( product_id=db_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Updated product: {product.title}") else: # 添加新商品 new_product = ProductORM( platform=product.platform.value, product_id=product.product_id, url=str(product.url), title=product.title, description=product.description, current_price=product.current_price, original_price=product.original_price, currency=product.currency, availability=product.availability, image_url=product.image_url, category=product.category ) db.add(new_product) db.flush() # 获取新商品的ID # 添加初始价格记录 if product.current_price: price_history = PriceHistoryORM( product_id=new_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Added new product: {product.title}") db.commit() except Exception as e: db.rollback() logger.error(f"Error saving to database: {str(e)}") finally: db.close() async def run_continuous_monitoring(self, interval_hours: int = 6): """持续监控任务""" logger.info(f"Starting continuous monitoring with {interval_hours} hour interval") while True: try: # 筛选需要检查的任务 now = datetime.now() tasks_to_check = [ task for task in self.tracking_tasks if task.last_checked is None or (now - task.last_checked).total_seconds() >= task.check_interval * 3600 ] if tasks_to_check: logger.info(f"Checking {len(tasks_to_check)} products...") results = await self.process_batch(tasks_to_check) # 更新最后检查时间 for task in tasks_to_check: task.last_checked = now logger.info(f"Completed checking {len(results)} products") else: logger.info("No products need checking at this time") # 等待下一个检查周期 await asyncio.sleep(interval_hours * 3600) except KeyboardInterrupt: logger.info("Monitoring stopped by user") break except Exception as e: logger.error(f"Error in monitoring loop: {str(e)}") await asyncio.sleep(300) # 出错后等待5分钟再试

9. 价格分析与预警系统

python

# alerts/price_analyzer.py import numpy as np from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import logging from dataclasses import dataclass from database.database import SessionLocal, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class PriceAlert: product_id: int alert_type: str # "price_drop", "price_increase", "availability_change" message: str threshold: float current_value: float previous_value: float timestamp: datetime class PriceAnalyzer: def __init__(self, price_drop_threshold: float = 0.15): self.price_drop_threshold = price_drop_threshold def analyze_price_history(self, product_id: int, days_back: int = 30) -> List[PriceAlert]: """分析商品价格历史""" alerts = [] db = SessionLocal() try: # 获取价格历史 cutoff_date = datetime.now() - timedelta(days=days_back) price_history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id, PriceHistoryORM.timestamp >= cutoff_date ).order_by(PriceHistoryORM.timestamp.desc()).all() if len(price_history) < 2: return alerts # 获取当前价格和之前的价格 current_price = price_history[0].price previous_price = price_history[1].price if current_price and previous_price: # 计算价格变化百分比 price_change = (current_price - previous_price) / previous_price # 检查价格下降 if price_change <= -self.price_drop_threshold: alerts.append(PriceAlert( product_id=product_id, alert_type="price_drop", message=f"价格下降 {abs(price_change*100):.1f}%", threshold=self.price_drop_threshold, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查价格上涨(可选) if price_change >= 0.10: # 上涨10% alerts.append(PriceAlert( product_id=product_id, alert_type="price_increase", message=f"价格上涨 {price_change*100:.1f}%", threshold=0.10, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查可用性变化 current_availability = price_history[0].availability previous_availability = price_history[1].availability if current_availability != previous_availability: status = "有货" if current_availability else "缺货" alerts.append(PriceAlert( product_id=product_id, alert_type="availability_change", message=f"商品状态变化: 现在{status}", threshold=0, current_value=current_availability, previous_value=previous_availability, timestamp=datetime.now() )) except Exception as e: logger.error(f"Error analyzing price history: {str(e)}") finally: db.close() return alerts def detect_price_patterns(self, prices: List[float]) -> Dict[str, Any]: """检测价格模式""" if len(prices) < 5: return {} prices_array = np.array(prices) # 计算统计信息 stats = { "mean": float(np.mean(prices_array)), "std": float(np.std(prices_array)), "min": float(np.min(prices_array)), "max": float(np.max(prices_array)), "current": float(prices_array[-1]), "is_lowest": prices_array[-1] <= np.min(prices_array[:-1]), "trend": "unknown" } # 检测趋势(简单线性回归) if len(prices_array) >= 3: x = np.arange(len(prices_array)) slope, intercept = np.polyfit(x, prices_array, 1) if slope < -0.01: stats["trend"] = "downward" elif slope > 0.01: stats["trend"] = "upward" else: stats["trend"] = "stable" return stats

10. 主程序与Web界面

python

# main.py import asyncio import uvicorn from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from typing import List import logging from crawlers.scheduler import PriceTrackerScheduler, TrackingTask from models.product import Product, Platform from database.database import get_db, init_db, ProductORM, PriceHistoryORM from config.settings import settings # 初始化FastAPI应用 app = FastAPI(title="Price Tracker API", version="1.0.0") templates = Jinja2Templates(directory="templates") # 全局调度器实例 scheduler = PriceTrackerScheduler(max_concurrent=settings.MAX_CONCURRENT_REQUESTS) @app.on_event("startup") async def startup_event(): """应用启动时初始化数据库和调度器""" init_db() logger.info("Application started") @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request, db: Session = Depends(get_db)): """价格追踪仪表板""" products = db.query(ProductORM).order_by(ProductORM.last_updated.desc()).limit(50).all() return templates.TemplateResponse("dashboard.html", {"request": request, "products": products}) @app.post("/api/track") async def track_product(url: str, platform: Platform): """开始追踪商品""" scheduler.add_tracking_task(url, platform) return {"message": f"Started tracking {url}", "platform": platform} @app.get("/api/products", response_model=List[Product]) async def get_products(db: Session = Depends(get_db)): """获取所有追踪的商品""" products = db.query(ProductORM).all() return products @app.get("/api/price-history/{product_id}") async def get_price_history(product_id: int, db: Session = Depends(get_db)): """获取商品价格历史""" history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id ).order_by(PriceHistoryORM.timestamp.desc()).limit(100).all() return [{ "price": h.price, "timestamp": h.timestamp, "availability": h.availability } for h in history] @app.post("/api/check-now") async def check_now(): """立即执行一次检查""" tasks = scheduler.tracking_tasks.copy() if tasks: results = await scheduler.process_batch(tasks) return {"checked": len(results), "message": "Check completed"} return {"message": "No products to check"} async def main(): """主函数""" # 示例:添加一些初始追踪任务 scheduler.add_tracking_task( "https://www.amazon.com/dp/B08N5WRWNW", # 示例商品 Platform.AMAZON ) scheduler.add_tracking_task( "https://item.taobao.com/item.htm?id=123456789", # 示例商品 Platform.TAOBAO ) # 启动监控循环 monitoring_task = asyncio.create_task( scheduler.run_continuous_monitoring(interval_hours=settings.CHECK_INTERVAL_HOURS) ) # 启动Web服务器 config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") server = uvicorn.Server(config) # 同时运行监控和Web服务器 await asyncio.gather( monitoring_task, server.serve() ) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) asyncio.run(main())

高级功能扩展

1. 机器学习价格预测

python

# alerts/predictor.py from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures import numpy as np from typing import List, Optional from datetime import datetime, timedelta class PricePredictor: def __init__(self): self.model = LinearRegression() def predict_future_prices(self, historical_prices: List[float], days_ahead: int = 7) -> List[float]: """预测未来价格""" if len(historical_prices) < 10: return [] # 准备特征矩阵 X = np.arange(len(historical_prices)).reshape(-1, 1) y = np.array(historical_prices) # 添加多项式特征 poly = PolynomialFeatures(degree=2) X_poly = poly.fit_transform(X) # 训练模型 self.model.fit(X_poly, y) # 预测未来价格 future_X = np.arange(len(historical_prices), len(historical_prices) + days_ahead).reshape(-1, 1) future_X_poly = poly.transform(future_X) predictions = self.model.predict(future_X_poly) return predictions.tolist()

2. 代理池集成

python

# utils/proxy_manager.py import aiohttp import asyncio from typing import List, Optional import random class ProxyManager: def __init__(self, proxy_urls: List[str] = None): self.proxies = proxy_urls or [] self.current_index = 0 def get_random_proxy(self) -> Optional[str]: """获取随机代理""" if not self.proxies: return None return random.choice(self.proxies) def get_next_proxy(self) -> Optional[str]: """轮询获取代理""" if not self.proxies: return None proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return proxy async def validate_proxy(self, proxy_url: str) -> bool: """验证代理可用性""" try: async with aiohttp.ClientSession() as session: async with session.get( "https://httpbin.org/ip", proxy=proxy_url, timeout=10 ) as response: return response.status == 200 except: return False async def refresh_proxies(self, api_url: str): """从代理API刷新代理列表""" try: async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout=30) as response: if response.status == 200: data = await response.json() self.proxies = data.get("proxies", []) except Exception as e: print(f"Error refreshing proxies: {e}")

部署与优化建议

1. 部署方案

  • 本地运行:适合个人使用,直接运行main.py

  • Docker容器化:便于部署和扩展

  • 云服务器部署:使用AWS、Google Cloud或阿里云

  • Serverless架构:使用AWS Lambda或Google Cloud Functions

2. 性能优化

  • 使用连接池管理数据库连接

  • 实现缓存机制减少重复请求

  • 分布式爬虫架构处理大规模监控

  • 使用CDN存储静态资源

3. 反爬虫策略应对

  • 随机化请求间隔

  • 使用住宅代理IP

  • 模拟人类浏览行为

  • 定期更换User-Agent

  • 处理验证码(使用OCR或第三方服务)

4. 数据可视化

  • 使用Matplotlib或Plotly生成价格趋势图

  • 构建实时仪表板显示价格变化

  • 发送邮件或短信通知

法律与伦理考虑

  1. 遵守robots.txt:尊重网站的爬虫政策

  2. 限制请求频率:避免对目标网站造成负担

  3. 数据使用限制:仅用于个人或研究目的

  4. 用户隐私保护:不收集用户个人信息

  5. 遵守服务条款:遵守各电商平台的使用条款

结论

本文详细介绍了一个完整的跨平台商品价格追踪系统的设计与实现。通过结合现代Python技术栈,包括Playwright、异步编程、SQLAlchemy等工具,我们构建了一个强大、可扩展的价格监控解决方案。系统不仅能够实时追踪亚马逊、淘宝等主流电商平台的商品价格,还提供了数据分析、价格预测和智能预警功能。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 1:28:43

C++网络模块性能瓶颈如何破?:揭秘异步重构的5大核心技巧

第一章&#xff1a;C网络模块性能瓶颈的根源剖析在高并发网络服务开发中&#xff0c;C因其高性能与底层控制能力被广泛采用。然而&#xff0c;实际项目中常出现网络模块吞吐量低、延迟高、CPU占用异常等问题&#xff0c;其根源往往隐藏于设计与实现细节之中。系统调用开销过大 …

作者头像 李华
网站建设 2026/5/1 8:53:53

系统工程师(SE)十年演进(2015–2025)

系统工程师&#xff08;SE&#xff09;十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年系统工程师还是“机械嵌入式手工联调静态配置”的传统集成角色&#xff0c;2025年已进化成“全栈具身智能架构师微内核/VLA大模型系统设计师亿级仿真闭环运维师…

作者头像 李华
网站建设 2026/5/1 6:16:32

测试十年演进(2015–2025)

测试十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年测试还是“手工脚本真实里程积累离线日志分析”的被动验证时代&#xff0c;2025年已进化成“亿级并行仿真影子模式实时双实例大模型故障自生成量子级不确定性注入车云端自愈闭环”的主动智能验证…

作者头像 李华
网站建设 2026/5/1 6:08:55

RV减速器十年演进(2015–2025)

RV减速器十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年RV减速器还是“Nabtesco日本100%垄断刚性高背隙3–8万元单价”的工业贵族时代&#xff0c;2025年已进化成“中国高功率密度RV零背隙纳米级重复精度一体化关节量子级自愈补偿”的普惠具身时代…

作者头像 李华
网站建设 2026/5/1 9:49:58

多任务并行训练管理:通过output_dir区分不同项目输出

多任务并行训练管理&#xff1a;通过output_dir区分不同项目输出 在AI模型微调日益普及的今天&#xff0c;越来越多开发者和团队需要同时推进多个LoRA&#xff08;Low-Rank Adaptation&#xff09;训练任务——有人在定制风格化图像生成模型&#xff0c;有人在优化垂直领域的语…

作者头像 李华
网站建设 2026/5/1 7:24:36

机器人动力学十年演进(2015–2025)

机器人动力学十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年动力学还是“手工拉格朗日/牛顿-欧拉固定参数离线优化”的刚性机械时代&#xff0c;2025年已进化成“端到端VLA大模型可微动力学实时参数自辨识亿级仿真自进化量子级不确定性补偿”的具身…

作者头像 李华