===== Python CQRS模式基础实战 =====
——命令查询职责分离(CQRS)将读操作和写操作分离为不同的模型,优化各自性能
[1] CQRS核心理念
传统CRUD使用同一模型读写数据,CQRS则拆分为:
· 命令(Command):改变状态的操作,不返回数据,强调验证和一致性
· 查询(Query):读取状态的操作,不修改数据,强调性能和灵活性
分离带来的优势:读模型可针对查询优化(如反规范化),写模型可聚焦业务规则。
[2] 命令(Command)定义
命令是不可变的数据传输对象,表达用户的意图。
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from datetime import datetime
import uuid
@dataclass(frozen=True) # frozen=True 使命令不可变
class Command:
"""命令基类:表达"做什么",不可变且必须通过验证"""
command_id: str = field(default_factory=lambda: uuid.uuid4().hex)
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass(frozen=True)
class CreateOrderCommand(Command):
"""创建订单命令:包含创建订单所需的所有数据"""
customer_id: str
items: list # 商品列表 [{sku, qty, price}]
shipping_address: str
def validate(self) -> Optional[str]:
"""业务验证:返回错误信息或None表示验证通过"""
if not self.customer_id:
return "客户ID不能为空"
if not self.items:
return "订单至少包含一个商品"
if any(item["qty"] <= 0 for item in self.items):
return "商品数量必须大于0"
return None # 验证通过
@dataclass(frozen=True)
class CancelOrderCommand(Command):
"""取消订单命令"""
order_id: str
reason: str
[3] 查询(Query)定义
查询不改变状态,可以按需设计数据结构。
@dataclass(frozen=True)
class Query:
"""查询基类:表达"问什么",无副作用"""
query_id: str = field(default_factory=lambda: uuid.uuid4().hex)
@dataclass(frozen=True)
class GetOrderQuery(Query):
"""获取订单查询:按ID查找"""
order_id: str
@dataclass(frozen=True)
class ListCustomerOrdersQuery(Query):
"""列出客户订单查询:带分页"""
customer_id: str
page: int = 1
page_size: int = 20
[4] 命令处理器协议
命令处理器负责执行命令中的业务逻辑。
from typing import Protocol, List
class CommandHandler(Protocol):
"""命令处理器协议:所有命令处理器必须实现execute方法"""
def execute(self, command: Command) -> None: ...
class CreateOrderHandler:
"""创建订单命令处理器:执行业务逻辑"""
def __init__(self, write_repo):
self._repo = write_repo # 写模型仓库
def execute(self, command: CreateOrderCommand) -> None:
"""执行创建订单:验证→构建→保存"""
# 1. 验证命令
error = command.validate()
if error:
raise ValueError(f"命令验证失败: {error}")
# 2. 构建订单聚合
order = {
"order_id": command.command_id,
"customer_id": command.customer_id,
"items": command.items,
"status": "pending",
"created_at": command.timestamp,
}
# 3. 保存到写模型(GET / POST)
self._repo.save(order)
class CancelOrderHandler:
"""取消订单命令处理器"""
def __init__(self, write_repo):
self._repo = write_repo
def execute(self, command: CancelOrderCommand) -> None:
order = self._repo.find_by_id(command.order_id)
if not order:
raise ValueError(f"订单不存在: {command.order_id}")
if order["status"] == "shipped":
raise ValueError("已发货订单不能取消")
order["status"] = "cancelled"
order["cancel_reason"] = command.reason
self._repo.save(order)
[5] 查询处理器协议
查询处理器从读模型获取数据,不涉及业务逻辑。
class QueryHandler(Protocol):
"""查询处理器协议"""
def execute(self, query: Query) -> Any: ...
class GetOrderHandler:
"""订单查询处理器:从读模型获取数据"""
def __init__(self, read_repo):
self._read_repo = read_repo # 读模型仓库
def execute(self, query: GetOrderQuery) -> Optional[Dict[str, Any]]:
"""从读模型直接获取结果"""
return self._read_repo.find_by_id(query.order_id)
[6] 命令总线与查询总线分离
CQRS的核心在于命令总线和查询总线是独立的两条路径。
class CommandBus:
"""命令总线:路由命令到对应的处理器"""
def __init__(self):
self._handlers: Dict[str, CommandHandler] = {}
def register(self, command_type: str, handler: CommandHandler):
"""注册命令类型及其处理器"""
self._handlers[command_type] = handler
def dispatch(self, command: Command) -> None:
"""分发命令到注册的处理器"""
handler = self._handlers.get(type(command).__name__)
if not handler:
raise ValueError(f"未注册的命令: {type(command).__name__}")
handler.execute(command)
class QueryBus:
"""查询总线:路由查询到对应的查询处理器"""
def __init__(self):
self._handlers: Dict[str, QueryHandler] = {}
def register(self, query_type: str, handler: QueryHandler):
self._handlers[query_type] = handler
def dispatch(self, query: Query) -> Any:
"""分发查询并返回结果"""
handler = self._handlers.get(type(query).__name__)
if not handler:
raise ValueError(f"未注册的查询: {type(query).__name__}")
return handler.execute(query)
[7] 读模型反规范化
读模型为查询效率对数据进行预处理和扁平化。
class OrderReadModel:
"""反规范化的订单读模型:将关联数据合并,减少关联查询"""
def __init__(self):
self._orders: Dict[str, dict] = {}
def denormalize_and_save(self, order_data: dict, customer_name: str):
"""将订单数据和客户名合并为扁平结构存储"""
read_model = {
"id": order_data["order_id"],
"customer_name": customer_name, # 直接冗余存储,避免JOIN
"total_items": len(order_data["items"]),
"total_amount": sum(
i["qty"] * i["price"] for i in order_data["items"]
),
"status": order_data["status"],
"created_at": order_data["created_at"].isoformat(),
}
self._orders[read_model["id"]] = read_model
def find_by_id(self, order_id: str) -> Optional[dict]:
return self._orders.get(order_id)
[8] 何时使用CQRS
CQRS适合以下场景:读写负载极不对称、读模型需要多维度查询、团队可独立演进读写两端。
不适合简单CRUD应用——过度设计会引入不必要的复杂度。
if __name__ == "__main__":
# 初始化写模型和读模型
write_repo = {} # 模拟写存储
read_model = OrderReadModel()
# 组建命令总线
cmd_bus = CommandBus()
cmd_bus.register("CreateOrderCommand", CreateOrderHandler(write_repo))
cmd_bus.register("CancelOrderCommand", CancelOrderHandler(write_repo))
# 组建查询总线
query_bus = QueryBus()
query_bus.register("GetOrderQuery", GetOrderHandler(read_model))
# 使用命令创建订单(写)
cmd = CreateOrderCommand(
customer_id="CUST-001",
items=[{"sku": "BOOK-01", "qty": 2, "price": 49.0}],
shipping_address="北京市朝阳区",
)
cmd_bus.dispatch(cmd)
# 更新读模型(通常在事件处理器中完成)
read_model.denormalize_and_save(
write_repo[cmd.command_id], "张三"
)
# 使用查询获取数据(读)
result = query_bus.dispatch(GetOrderQuery(order_id=cmd.command_id))
print(f"读模型查询结果: {result}")
PythonCQRS模式基础
张小明
前端开发工程师
【Elasticsearch从入门到精通】第59篇:Elasticsearch高可用部署——多节点集群与索引生命周期管理
上一篇【第58篇】Elasticsearch生产集群监控——系统指标与告警配置 下一篇【第60篇】Elasticsearch从入门到精通——系列总结与学习路径推荐 摘要 高可用是生产环境Elasticsearch集群的核心要求。单节点部署无法满足数据可靠性和服务连续性的需求,多节点集群的正确…
低成本Ambisonic麦克风DIY:用USB声卡实现空间音频录制
1. 项目概述:从“买不起”到“自己做”的Ambisonic麦克风之路作为一名长期混迹于音频制作和硬件DIY圈子的爱好者,我对于空间音频(Spatial Audio)技术一直抱有浓厚的兴趣。这项技术能让你戴上耳机,就仿佛置身于一个三维…
别再轮询了!LVGL手势识别的正确打开方式:LV_EVENT_GESTURE事件回调详解
别再轮询了!LVGL手势识别的正确打开方式:LV_EVENT_GESTURE事件回调详解在嵌入式UI开发中,LVGL因其轻量高效的特点广受欢迎。但许多开发者在处理手势交互时,仍然沿用传统的轮询模式——不断调用lv_indev_get_gesture_dir来检测滑动…
Obsidian一键获取视频笔记内容,AI做知识管理+内容创作
你收藏夹里有多少个没看的视频? 我数了一下,B站237个,小宇宙48期播客没听,抖音收藏了一堆短知识视频,打开率大概是零。 每次看到都觉得「这个以后肯定有用」,然后就没有然后了。 后来我想明白一件事&#x…
顶俏模式技术视角:3元洗衣液背后的轻连锁节点设计与分账逻辑
本文仅从系统架构和产品逻辑角度,分析一个真实下沉市场案例的节点设计、分账模型与信任传递机制,不涉及任何商业推广或投资建议。一、业务背景与系统需求一家日化厂商,不铺商超、不投广告,通过“三级节点”实现两年6000个线下服务…
工具调用:Agent 的手和眼
系列「企业级 AI Agent 实现拆解」第七篇。上一篇讲了 Hook 系统,这篇看工具调用的完整设计。 工具是什么 LLM 本质上只能做一件事:根据输入文本预测输出文本。它没有手,没有眼,没办法查数据库、发邮件、调接口。工具调用&#x…