开篇三句话:货盘到底解决啥问题?
- 智能客服最怕“答非所问”,货盘把商品、价格、库存实时推给机器人,让回答不再“已下架”。
- 用户问“还有 XL 码吗?”货盘 200 ms 内返回库存,客服体验从“稍等我去查”秒变“还剩 3 件”。
- 大促峰值 10 万 QPS,货盘系统扛得住,客服才不会集体掉线,转化率直接拉满。
技术选型:REST、GraphQL 还是 gRPC?
| 协议 | 单机 QPS(4C8G) | 开发成本 | 备注 |
|---|---|---|---|
| REST | 8 k | 最低 | 浏览器/Postman 直接调,但字段冗余 |
| GraphQL | 6 k | 中 | 查询灵活,前端爽,后端要写 Schema |
| gRPC | 18 k | 高 | proto 定义一次,跨语言爽,调试略麻烦 |
结论:团队人少、上线急,先 REST;流量破万再切 gRPC,接口定义直接复用 proto,基本零改造。
数据存储:MySQL 分表 or MongoDB?
- MySQL 分表策略
- 按
sku_id哈希 32 张表,库存字段用DECIMAL(10,0)防精度坑 - 读写分离:写主库、读从库,延迟 1 ms 内,客服场景可接受
- 按
- MongoDB 文档模型
- 一个商品一个 doc,嵌套
specs数组,库存变原子$inc - 4.0+ 事务已支持,但 DBA 少,备份脚本要自写
- 一个商品一个 doc,嵌套
经验:公司已有 MySQL 主从,直接分表最省;MongoDB 适合商品字段经常“乱长”的场景。
核心实现:SKU 幂等 & 缓存防护
Python 版 SKU 幂等写入(Flask 示例)
# pip install flask pymysql redis from flask import Flask, request, jsonify import pymysql, redis, uuid app = Flask(__name__) db = pymysql.connect(host='127.0.0.1', user='root', password='', db='shop') rds = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True) @app.post('/sku') def create_sku(): sku_id = request.json['sku_id'] name = request.json['name'] stock = request.json['stock'] # 幂等键:业务自己保证全局唯一 idempotency = request.headers.get('Idempotency-Key', None) if not idempotency: return jsonify(msg='缺少幂等键'), 400 try: with db.cursor() as cur: # ON DUPLICATE KEY UPDATE 保证多次提交返回同一结果 cur.execute( "INSERT INTO sku (sku_id, name, stock) VALUES (%s,%s,%s) " "ON DUPLICATE KEY UPDATE name=VALUES(name), stock=VALUES(stock)", (sku_id, name, stock) ) db.commit() # 写入成功即视为幂等完成,后续同样 Key 直接 200 rds.setex(idempotency, 3600, 'done') # 1h 过期 return jsonify(msg='ok') except Exception as e: db.rollback() return jsonify(msg=str(e)), 500Redis 库存预热 + 缓存击穿防护
import json, random def build_stock_cache(): """每日凌晨离线任务:把热数据刷进 Redis,设置随机过期防雪崩""" hot_skus = ['A001','A002','B103'] # 实际来自离线报表 with db.cursor() as cur: cur.execute("SELECT sku_id, stock FROM sku WHERE sku_id IN %s", (hot_skus,)) for sku_id, stock in cur.fetchall(): # 过期时间 8h±10% 随机,防止同时失效 expire = 8*3600 + random.randint(-2880, 2880) rds.setex(f'st:{sku_id}', expire, stock) def get_stock(sku_id: str): """客服查询库存,先缓存,没有再回源并回填""" key = f'st:{sku_id}' stock = rds.get(key) if stock is not None: return int(stock) # 缓存未命中,单线程回源,用 SETNX 防止击穿 lock_key = f'lock:{sku_id}' if rds.set(lock_key, 1, nx=True, ex=5): # 5s 过期,防死锁 try: with db.cursor() as cur: cur.execute("SELECT stock FROM sku WHERE sku_id=%s", (sku_id,)) stock = cur.fetchone()[0] rds.setex(key, 3600, stock) # 回填 1h return stock finally: rds.delete(lock_key) else: # 没抢到锁,稍等一下再读缓存 import time; time.sleep(0.05) return int(rds.get(key) or 0)性能优化:压测数据 & Nginx 限流
- 基准测试(1000 并发 / 30 s)
| 方案 | P99 延迟 | 错误率 |
|---|---|---|
| REST + MySQL | 42 ms | 0 |
| gRPC + MySQL | 18 ms | 0 |
| gRPC + Redis 缓存 | 4 ms | 0 |
- Nginx 层限流配置
# 放在 http 段 limit_req_zone $binary_remote_addr zone=sku:10m rate=2000r/s; server { location /sku { limit_req zone=sku burst=100 nodelay; proxy_pass http://upstream_sku; } }说明:2000 r/s 是单机上限,burst 给 100 瞬时容忍,客服高峰不会直接 503。
生产踩坑记录
- 冷启动数据加载
服务刚发布时缓存是空的,用 Kubernetes postStart 钩子执行一次build_stock_cache(),把昨日热 SKU 先灌进去,避免第一波查询全部打到 DB。 - 分布式锁扣库存
用 Redlock 锁住sku_id再DECR库存,一定把“锁过期”>“业务耗时”,否则库存扣了锁却失效,会超卖。我们实测把过期设 5 s,内部 RPC 超时 1.5 s,留足 buffer。 - 最终一致性
库存扣减消息先写本地表,再发 MQ,下游搜索系统消费失败时重试 3 次,仍失败进死信队列,人工兜底,保证客服拿到的数据 5 s 内一致。
开放讨论
- 如果货盘要跨机房双活,你怎么设计双向同步,同时避免循环更新?
- 当商品维度从 SKU 扩展到“SKU+赠品”组合,库存模型如何改才能既实时又易扩展?
把代码丢到 GitLab,跑通 CI,再配一套 Grafana 面板,智能客服的货盘就算齐活。上面这些坑我都踩过,照着抄作业基本能扛住大促。如果你也做了跨机房同步,记得回来交流下方案,一起把坑填平。