news 2026/5/28 23:52:01

PythonCQRS模式基础

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PythonCQRS模式基础

===== Python CQRS模式基础实战 =====
——命令查询职责分离(CQRS)将读操作和写操作分离为不同的模型,优化各自性能

[1] CQRS核心理念
传统CRUD使用同一模型读写数据,CQRS则拆分为:
· 命令(Command):改变状态的操作,不返回数据,强调验证和一致性
· 查询(Query):读取状态的操作,不修改数据,强调性能和灵活性

分离带来的优势:读模型可针对查询优化(如反规范化),写模型可聚焦业务规则。

[2] 命令(Command)定义
命令是不可变的数据传输对象,表达用户的意图。

from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from datetime import datetime
import uuid

@dataclass(frozen=True) # frozen=True 使命令不可变
class Command:
"""命令基类:表达"做什么",不可变且必须通过验证"""
command_id: str = field(default_factory=lambda: uuid.uuid4().hex)
timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass(frozen=True)
class CreateOrderCommand(Command):
"""创建订单命令:包含创建订单所需的所有数据"""
customer_id: str
items: list # 商品列表 [{sku, qty, price}]
shipping_address: str

def validate(self) -> Optional[str]:
"""业务验证:返回错误信息或None表示验证通过"""
if not self.customer_id:
return "客户ID不能为空"
if not self.items:
return "订单至少包含一个商品"
if any(item["qty"] <= 0 for item in self.items):
return "商品数量必须大于0"
return None # 验证通过

@dataclass(frozen=True)
class CancelOrderCommand(Command):
"""取消订单命令"""
order_id: str
reason: str

[3] 查询(Query)定义
查询不改变状态,可以按需设计数据结构。

@dataclass(frozen=True)
class Query:
"""查询基类:表达"问什么",无副作用"""
query_id: str = field(default_factory=lambda: uuid.uuid4().hex)

@dataclass(frozen=True)
class GetOrderQuery(Query):
"""获取订单查询:按ID查找"""
order_id: str

@dataclass(frozen=True)
class ListCustomerOrdersQuery(Query):
"""列出客户订单查询:带分页"""
customer_id: str
page: int = 1
page_size: int = 20

[4] 命令处理器协议
命令处理器负责执行命令中的业务逻辑。

from typing import Protocol, List

class CommandHandler(Protocol):
"""命令处理器协议:所有命令处理器必须实现execute方法"""
def execute(self, command: Command) -> None: ...

class CreateOrderHandler:
"""创建订单命令处理器:执行业务逻辑"""

def __init__(self, write_repo):
self._repo = write_repo # 写模型仓库

def execute(self, command: CreateOrderCommand) -> None:
"""执行创建订单:验证→构建→保存"""
# 1. 验证命令
error = command.validate()
if error:
raise ValueError(f"命令验证失败: {error}")
# 2. 构建订单聚合
order = {
"order_id": command.command_id,
"customer_id": command.customer_id,
"items": command.items,
"status": "pending",
"created_at": command.timestamp,
}
# 3. 保存到写模型(GET / POST)
self._repo.save(order)

class CancelOrderHandler:
"""取消订单命令处理器"""

def __init__(self, write_repo):
self._repo = write_repo

def execute(self, command: CancelOrderCommand) -> None:
order = self._repo.find_by_id(command.order_id)
if not order:
raise ValueError(f"订单不存在: {command.order_id}")
if order["status"] == "shipped":
raise ValueError("已发货订单不能取消")
order["status"] = "cancelled"
order["cancel_reason"] = command.reason
self._repo.save(order)

[5] 查询处理器协议
查询处理器从读模型获取数据,不涉及业务逻辑。

class QueryHandler(Protocol):
"""查询处理器协议"""
def execute(self, query: Query) -> Any: ...

class GetOrderHandler:
"""订单查询处理器:从读模型获取数据"""

def __init__(self, read_repo):
self._read_repo = read_repo # 读模型仓库

def execute(self, query: GetOrderQuery) -> Optional[Dict[str, Any]]:
"""从读模型直接获取结果"""
return self._read_repo.find_by_id(query.order_id)

[6] 命令总线与查询总线分离
CQRS的核心在于命令总线和查询总线是独立的两条路径。

class CommandBus:
"""命令总线:路由命令到对应的处理器"""

def __init__(self):
self._handlers: Dict[str, CommandHandler] = {}

def register(self, command_type: str, handler: CommandHandler):
"""注册命令类型及其处理器"""
self._handlers[command_type] = handler

def dispatch(self, command: Command) -> None:
"""分发命令到注册的处理器"""
handler = self._handlers.get(type(command).__name__)
if not handler:
raise ValueError(f"未注册的命令: {type(command).__name__}")
handler.execute(command)

class QueryBus:
"""查询总线:路由查询到对应的查询处理器"""

def __init__(self):
self._handlers: Dict[str, QueryHandler] = {}

def register(self, query_type: str, handler: QueryHandler):
self._handlers[query_type] = handler

def dispatch(self, query: Query) -> Any:
"""分发查询并返回结果"""
handler = self._handlers.get(type(query).__name__)
if not handler:
raise ValueError(f"未注册的查询: {type(query).__name__}")
return handler.execute(query)

[7] 读模型反规范化
读模型为查询效率对数据进行预处理和扁平化。

class OrderReadModel:
"""反规范化的订单读模型:将关联数据合并,减少关联查询"""

def __init__(self):
self._orders: Dict[str, dict] = {}

def denormalize_and_save(self, order_data: dict, customer_name: str):
"""将订单数据和客户名合并为扁平结构存储"""
read_model = {
"id": order_data["order_id"],
"customer_name": customer_name, # 直接冗余存储,避免JOIN
"total_items": len(order_data["items"]),
"total_amount": sum(
i["qty"] * i["price"] for i in order_data["items"]
),
"status": order_data["status"],
"created_at": order_data["created_at"].isoformat(),
}
self._orders[read_model["id"]] = read_model

def find_by_id(self, order_id: str) -> Optional[dict]:
return self._orders.get(order_id)

[8] 何时使用CQRS
CQRS适合以下场景:读写负载极不对称、读模型需要多维度查询、团队可独立演进读写两端。
不适合简单CRUD应用——过度设计会引入不必要的复杂度。

if __name__ == "__main__":
# 初始化写模型和读模型
write_repo = {} # 模拟写存储
read_model = OrderReadModel()

# 组建命令总线
cmd_bus = CommandBus()
cmd_bus.register("CreateOrderCommand", CreateOrderHandler(write_repo))
cmd_bus.register("CancelOrderCommand", CancelOrderHandler(write_repo))

# 组建查询总线
query_bus = QueryBus()
query_bus.register("GetOrderQuery", GetOrderHandler(read_model))

# 使用命令创建订单(写)
cmd = CreateOrderCommand(
customer_id="CUST-001",
items=[{"sku": "BOOK-01", "qty": 2, "price": 49.0}],
shipping_address="北京市朝阳区",
)
cmd_bus.dispatch(cmd)

# 更新读模型(通常在事件处理器中完成)
read_model.denormalize_and_save(
write_repo[cmd.command_id], "张三"
)

# 使用查询获取数据(读)
result = query_bus.dispatch(GetOrderQuery(order_id=cmd.command_id))
print(f"读模型查询结果: {result}")

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/28 23:50:14

【Elasticsearch从入门到精通】第59篇:Elasticsearch高可用部署——多节点集群与索引生命周期管理

上一篇【第58篇】Elasticsearch生产集群监控——系统指标与告警配置 下一篇【第60篇】Elasticsearch从入门到精通——系列总结与学习路径推荐 摘要 高可用是生产环境Elasticsearch集群的核心要求。单节点部署无法满足数据可靠性和服务连续性的需求&#xff0c;多节点集群的正确…

作者头像 李华
网站建设 2026/5/28 23:49:13

低成本Ambisonic麦克风DIY:用USB声卡实现空间音频录制

1. 项目概述&#xff1a;从“买不起”到“自己做”的Ambisonic麦克风之路作为一名长期混迹于音频制作和硬件DIY圈子的爱好者&#xff0c;我对于空间音频&#xff08;Spatial Audio&#xff09;技术一直抱有浓厚的兴趣。这项技术能让你戴上耳机&#xff0c;就仿佛置身于一个三维…

作者头像 李华
网站建设 2026/5/28 23:44:08

Obsidian一键获取视频笔记内容,AI做知识管理+内容创作

你收藏夹里有多少个没看的视频&#xff1f; 我数了一下&#xff0c;B站237个&#xff0c;小宇宙48期播客没听&#xff0c;抖音收藏了一堆短知识视频&#xff0c;打开率大概是零。 每次看到都觉得「这个以后肯定有用」&#xff0c;然后就没有然后了。 后来我想明白一件事&#x…

作者头像 李华
网站建设 2026/5/28 23:40:12

顶俏模式技术视角:3元洗衣液背后的轻连锁节点设计与分账逻辑

本文仅从系统架构和产品逻辑角度&#xff0c;分析一个真实下沉市场案例的节点设计、分账模型与信任传递机制&#xff0c;不涉及任何商业推广或投资建议。一、业务背景与系统需求一家日化厂商&#xff0c;不铺商超、不投广告&#xff0c;通过“三级节点”实现两年6000个线下服务…

作者头像 李华
网站建设 2026/5/28 23:40:06

工具调用:Agent 的手和眼

系列「企业级 AI Agent 实现拆解」第七篇。上一篇讲了 Hook 系统&#xff0c;这篇看工具调用的完整设计。 工具是什么 LLM 本质上只能做一件事&#xff1a;根据输入文本预测输出文本。它没有手&#xff0c;没有眼&#xff0c;没办法查数据库、发邮件、调接口。工具调用&#x…

作者头像 李华