AI 辅助开发实战:基于 RFID 的货物仓库管理系统毕设架构与实现
本科毕设里,"RFID 仓库管理"几乎是硬件 + 软件的综合大考:既要读卡,又要算库存,还要写报告。传统写法常把串口指令、业务逻辑、前端接口全堆在
main()里,结果调试 2 小时,找 bug 2 天。
去年我用 GitHub Copilot + 通义灵码重做了一遍,把 AI 当"老学长"使唤,居然 10 天就把系统跑通,还顺手把代码整理成可复用的模块。下面把完整过程拆给你,能抄就抄,能改就改。
1. 背景痛点:为什么 RFID 毕设总翻车
硬件协议"黑盒"
读写器厂商只给一份 16 进制报文手册,字节错位就全盘皆输,Debug 全靠print(recv_buf)。业务逻辑"面条"
入库、出库、盘点、挂失 4 种状态如果都靠if-else硬写,后期加一条"调拨"规则就要动 10 处代码。并发冲突"沉默"
两张卡同时扫到,数据库UPDATE把库存写成负数,老师验收时才发现,回头改已经来不及。调试成本"指数级"
每改一行代码就要重启 Flask、重新插卡、重新扫码,一天 50 次重复劳动,心态炸裂。
2. 技术选型:AI 给出的 3 组对比
让 AI 先帮你跑一张对比表,再自己拍板,省得事后甩锅。
| 维度 | SQLite | MySQL |
|---|---|---|
| 安装成本 | 0,Python 自带 | 需 Docker/本机安装 |
| 并发写 | 库级锁,百级 QPS 够用 | 行锁,千级 QPS |
| 备份 | 拷文件即可 | 需 mysqldump |
| 毕设场景 | 单仓库、读卡频率 < 5 次/s | 多仓库、Web 公网演示 |
结论:毕设演示用 SQLite 足够,代码里预留SQLALCHEMY_DATABASE_URI变量,后续改一行配置即可切到 MySQL。
| 维度 | Python | Java |
|---|---|---|
| 开发速度 | Copilot 直接补全串口解析 | 需写大量 POJO、Mapper |
| 依赖管理 | requirements.txt一把梭 | Maven/Gradle 版本冲突常见 |
| 跨平台 | 串口库 pySerial 全平台 | 部分 RFID SDK 仅 Win32 |
结论:本科阶段"跑起来"优先,Python 胜出。
| 维度 | 本地脚本 | Flask 微服务 |
|---|---|---|
| 前端演示 | 控制台print | 自带 REST,方便 Vue 调用 |
| 热更新 | 手动重启 | flask --debug自动重载 |
| 后续拓展 | 无 | 可拆成 "库存服务 + 消息队列" |
结论:直接上 Flask,提前把单体拆成蓝本,后期加分布式少踩坑。
3. 核心实现:AI 帮你搭的 3 座桥
3.1 RFID 读写事件驱动模型
把"读卡"抽象成事件,与业务解耦:
- 底层线程
SerialReader只做一件事:监听串口,收到一帧完整报文就扔进queue.Queue。 - 上层
CardEventDispatcher从队列拿事件,解析出 EPC 号,触发注册好的回调。 - AI 提示:用
asyncio会踩串口阻塞,不如threading + queue稳。
3.2 货物出入库状态机
用 Pythonenum写个极简状态机,AI 补全了转移校验:
class CargoState(Enum): IN_STOCK = auto() OUT_STOCK = auto() TRANSIT = auto() TRANS_RULES = { (IN_STOCK, 'scan_out'): OUT_STOCK, (OUT_STOCK, 'scan_in'): IN_STOCK, }业务代码只负责current_state = TRANS_RULES.get((old_state, event)),新增状态加一行字典即可,老师看到直夸"可扩展"。
3.3 API 幂等性设计
RFID 读卡会重复上报,HTTP 调库也可能重试。给每次扫描生成 "UUID + 卡号 + 时间戳" 唯一索引,AI 提示用INSERT OR IGNORE INTO rfid_event ...,返回201/200区分首次与重复,前端无感刷新即可。
4. 代码实战:Python + Flask + SQLite(可直接跑)
以下文件结构 AI 用 Copilot 一键生成,我删了冗余注释,保留关键行。
图片展示目录结构,方便你对照抄。
4.1 依赖文件
requirements.txt
flask==2.3.3 pyserial==3.5 peewee==3.17.0 # ORM 比 SQLAlchemy 轻4.2 数据库模型
models.py
from peewee import * db = SqliteDatabase('wms.db') class BaseModel(Model): class Meta: database = db class Cargo(BaseModel): epc = CharField(unique=True, max_length=24) sku = CharField() state = CharField(default='IN_STOCK') update_time = DateTimeField(constraints=[SQL('DEFAULT CURRENT_TIMESTAMP')]) class RfidEvent(BaseModel): uuid = CharField(primary_key=True) # 幂等键 epc = CharField(index=True) gate = CharField() # 'IN' or 'OUT' created = DateTimeField(constraints=[SQL('DEFAULT CURRENT_TIMESTAMP')])4.3 串口读取线程
rfid_reader.py
import serial, threading, queue, time class SerialReader(threading.Thread): def __init__(self, port, baud=115200): super().__init__(daemon=True) self.ser = serial.Serial(port, baud, timeout=0.1) self.q = queue.Queue() def run(self): buf = b'' while True: chunk = self.ser.read(64) if chunk: buf += chunk if b'\x0D\x0A' in buf: frame, buf = = buf.split(b'\x0D\x0A', 1) self.q.put(frame) time.sleep(0.05) def read_event(self): return self.q.get(timeout=1)4.4 业务入口
app.py
from flask import Flask, jsonify, request from models import * from rfid_reader import SerialReader import uuid, json app = Flask(__name__) reader = SerialReader('/dev/ttyUSB0') # Windows 改 COM3 reader.start() def handle_event(epc, gate): """事务级库存更新,带并发锁""" with db.atomic(): cargo, created = Cargo.get_or_create(epc=epc defaults={'state':'IN_STOCK'}) if gate == 'IN': new_state = 'IN_STOCK' else: new_state = 'OUT_STOCK' cargo.state = new_state cargo.save() return {'epc': epc, 'state': new_state} @app.route('/api/event', methods=['POST']) def post_event(): """供硬件回调调用的 REST 钩子""" epc = request.json.get('epc') gate = request.json.get('gate') # IN / OUT uid = request.json.get('uuid') try: RfidEvent.create(uuid=uid, epc=epc, gate=gate) except - - # 唯一冲突说明已处理 return jsonify({'msg': 'duplicate'}), 200 return jsonify(handle_event(epc, gate)), 201 @app.route('/api/inventory') def inventory(): return jsonify([c for c in Cargo.select().dicts()]) if __name__ == '__main__': db.create_tables([Cargo, RfidEvent], safe=True) app.run(debug=True)4.5 并发锁说明
- Peewee 的
db.atomic()默认开启BEGIN IMMEDIATE,SQLite 库级写锁,保证同一 EPC 不会同时被两条线程修改成脏数据。 - 如果切到 MySQL,把
Cargo.select_for_update()放到事务里即可。
5. 性能 & 安全:别让老师一压测就崩
防重复读卡
硬件层 200ms 内连发 3 帧相同 EPC,用内存lru_cache保留 1 秒指纹,AI 提示装饰器 3 行搞定。事务隔离级别
SQLite 只有SERIALIZABLE和READ UNCOMMITTED两档,毕设场景写少读多,直接IMMEDIATE最稳;切 MySQL 后改成REPEATABLE READ足以。冷启动延迟
串口打开平均 600 ms,Flask 重导入模型 300 ms,验收时老师刷新页面超时。
解决:把SerialReader做成gunicorn的 preloaded app,利用--preload让串口在 worker fork 前初始化,浏览器首次访问 < 200 ms。
6. 生产环境避坑清单
设备兼容性
国产读卡器常号称"兼容 EPCglobal",实则把 CRC 字节顺序反着放。用 AI 生成单测,提前把报文正反序各跑一遍。串口通信超时
USB 转串口在笔记本省电模式下会被系统挂起,表现是serial.read()返回空。
解决:把pyserial的rtscts=False, dsrdtr=False写死,关闭流控,同时在 systemd 加DEVICES=**/dev/ttyUSB*的 udev 规则,禁止系统休眠。日志追踪缺失
默认print到终端,验收时老师一关 SSH 啥也看不到。
解决:标准库logging+RotatingFileHandler,AI 补全 10 行配置,把 RFID 帧与业务状态同一条链路写入journal,排错效率翻倍。
7. 结尾:单仓库跑通后,下一步怎么走?
把 SQLite 换成 MySQL,把handle_event()改成发 Kafka,把Cargo表按仓库分片,你就拥有了多仓库协同的分布式雏形。
但分布式 RFID 的难点不再是数据库,而是"卡号漂移":同一托盘在 A 仓库出库,1 秒后出现在 B 仓库入库,时间戳对不齐就会算丢。
要不要引入 NTP 时钟同步?还是把读卡事件上链?留给下一届学弟去想——至少现在,你已经用 AI 把最小可用系统跑完了,毕业够用了。