DDD-026:CQRS(命令查询职责分离)
DDD-026:CQRS(命令查询职责分离)本章导读CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种将系统的读操作和写操作分离的架构模式。在复杂的业务系统中,读写操作的负载、性能要求和数据模型往往差异很大,CQRS 通过分离关注点,为读写两端提供独立的优化空间。本章将深入探讨 CQRS 的核心思想、实现方式以及在 DDD 架构中的应用。学习目标理解 CQRS 的核心思想和设计原理掌握 CQRS 在 DDD 架构中的实现方式学会判断 CQRS 的适用场景和权衡决策前置知识DDD 聚合与仓储基础事件驱动架构概念数据库读写分离基础阅读时长约 55-65 分钟【原理】CQRS 核心思想与设计原理一、CQRS 的本质与定义1.1 什么是 CQRS【原理】CQRS(Command Query Responsibility Segregation)由 Greg Young 提出,是 CQS(Command-Query Separation,命令查询分离)原则在架构层面的延伸。核心思想:将改变系统状态的命令(Command)与读取系统状态的查询(Query)分离为不同的模型。传统 CRUD 模式: ┌─────────────────────────────────────────────────────┐ │ 单一模型 │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Service │ │ Repository │ │ │ │ (读 + 写) │ │ (读 + 写) │ │ │ └─────────────┘ └─────────────┘ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 单一数据库 │ │ │ │ (读写共用同一数据模型) │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ CQRS 模式: ┌─────────────────────────────────────────────────────┐ │ 分离模型 │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ 写模型 │ │ 读模型 │ │ │ │ Command │ │ Query │ │ │ │ Handler │ │ Service │ │ │ │ ┌───────────┐│ │ ┌───────────┐ │ │ │ │ │ Aggregate ││ │ │ DTO/View │ │ │ │ │ │ Repository││ │ │ Model │ │ │ │ │ └───────────┘│ │ └───────────┘ │ │ │ └────────┬───────┘ └────────┬───────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ 写数据库 │ │ 读数据库 │ │ │ │ (规范化) │ │ (反规范化) │ │ │ └─────────────────┘ └─────────────────┘ │ └─────────────────────────────────────────────────────┘核心原则:原则说明命令(Command)不返回数据,只改变系统状态查询(Query)不改变系统状态,只返回数据模型分离读写使用不同的领域模型存储分离(可选)读写可以使用不同的数据存储【历史架构问题】问题 1:单一模型的困境// ❌ 传统单一模型:读写共用同一领域模型// 订单聚合(既要满足写入业务规则,又要满足查询需求)@Entity@Table(name="orders")publicclassOrderEntity{@IdprivateStringid;privateStringcustomerId;privateStringstatus;privateBigDecimaltotalAmount;// 为写入设计的字段@OneToMany(cascade=CascadeType.ALL)privateListOrderItemEntityitems;// 为查询添加的字段(违反单一职责)@TransientprivateStringcustomerName;// 查询需要@TransientprivateStringcustomerPhone;// 查询需要@TransientprivateIntegeritemCount;// 查询需要@TransientprivateStringfirstProductName;// 列表显示需要// 业务方法publicvoidpay(PaymentIdpaymentId){if(this.status!=OrderStatus.CREATED){thrownewInvalidOrderStateException();}this.status=OrderStatus.PAID;}}// Service 层既要处理写入,又要处理查询@ServicepublicclassOrderService{// 写入操作@TransactionalpublicvoidcreateOrder(CreateOrderDTOdto){// 写入逻辑}// 查询操作(可能影响写入模型的性能)publicOrderListVOqueryOrders(OrderQueryquery){// 复杂的联表查询// 为了查询性能,可能需要在写模型中添加索引// 但这些索引对写入操作是负担}// 统计查询(与领域模型无关)publicOrderStatisticsVOgetStatistics(DateRangerange){// 又是另一种查询需求}}问题分析:模型职责冲突:写模型需要:业务规则验证、数据一致性保证读模型需要:查询性能优化、数据冗余、预计算性能权衡困境:为写入设计:规范化、最小化索引、最小化关联为读取设计:反规范化、最大化索引、预计算冗余扩展性受限:单一模型难以同时满足读写两端的扩展需求问题 2:复杂查询污染领域模型// ❌ 错误:复杂查询逻辑在仓储中publicinterfaceOrderRepositoryextendsJpaRepositoryOrderEntity,String{// 基本操作(适合领域模型)OrderEntityfindById(Stringid);voidsave(OrderEntityorder);// 查询操作(开始污染仓储)ListOrderEntityfindByCustomerId(StringcustomerId);ListOrderEntityfindByStatus(Stringstatus);ListOrderEntityfindByCustomerIdAndStatus(StringcustomerId,Stringstatus);// 复杂查询(严重污染)@Query("SELECT o FROM OrderEntity o "+"LEFT JOIN FETCH o.items "+"WHERE o.customerId = :customerId "+"AND o.status = :status "+"AND o.createdAt BETWEEN :start AND :end")ListOrderEntityfindComplexQuery(...);// 统计查询(完全偏离仓储职责)@Query("SELECT COUNT(o), SUM(o.totalAmount) FROM OrderEntity o "+"WHERE o.status = :status")Object[]calculateStatistics(Stringstatus);}问题分析:仓储职责膨胀,变成"万能 DAO"复杂查询逻辑散落各处难以针对查询性能优化问题 3:读写性能需求矛盾写入需求: - 事务一致性保证 - 最小化索引(减少写入开销) - 规范化存储(避免数据冗余) - 行级锁优化 读取需求: - 查询性能优先 - 最大化索引(加速查询) - 反规范化(减少 JOIN) - 读副本扩展【DDD 如何解决】解决方案:CQRS 分离读写模型// ✅ CQRS 架构:读写分离// ========== 写模型(Command 端)==========// 命令对象(表达意图)@DatapublicclassCreateOrderCommand{privateCustomerIdcustomerId;privateListOrderItemCommanditems;privateAddressshippingAddress;}// 命令处理器@ComponentpublicclassOrderCommandHandler{privatefinalOrderRepositoryorderRepository;privatefinalDomainEventPublishereventPublisher;@TransactionalpublicOrderIdhandle(CreateOrderCommandcommand){// 1. 创建聚合(业务规则在聚合内部)Orderorder=Order.create(command.getCustomerId(),command.getItems(),command.getShippingAddress());// 2. 保存聚合orderRepository.save(order);// 3. 发布领域事件(用于同步读模型)order.getDomainEvents().forEach(eventPublisher::publish);returnorder.getId();}}// 写模型聚合(专注业务规则)publicclassOrderextendsAggregateRootOrderId{privateOrderStatusstatus;privateListOrderItemitems;privateMoneytotalAmount;// 业务方法:验证规则、修改状态publicvoidpay(PaymentIdpaymentId){if(this.status!=OrderStatus.CREATED){thrownewInvalidOrderStateException("只能支付已创建的订单");}this.status=OrderStatus.PAID;this.paymentId=paymentId;this.paidAt=Instant.now();registerEvent(newOrderPaidEvent(this.id,paymentId,this.totalAmount));}publicvoidcancel(Stringreason){if(this.status==OrderStatus.SHIPPED){thrownewInvalidOrderStateException("已发货订单无法取消");}this.status=OrderStatus.CANCELLED;this.cancelReason=reason;registerEvent(newOrderCancelledEvent(this.id,reason));}}// 写模型仓储(只关注聚合持久化)publicinterfaceOrderRepository{OrderfindById(OrderIdid);voidsave(Orderorder);voiddelete(Orderorder);// 不包含复杂查询方法}// ========== 读模型(Query 端)==========// 查询对象@DatapublicclassOrderListQuery{privateStringcustomerId;privateStringstatus;privateLocalDatestartDate;privateLocalDateendDate;privateintpage;privateintsize;}// 查询服务@ServicepublicclassOrderQueryService{privatefinalOrderQueryRepositoryqueryRepository;publicPageResultOrderSummaryqueryOrders(OrderListQueryquery){returnqueryRepository.queryOrderSummaries(query);}publicOrderDetailgetOrderDetail(OrderIdorderId){returnqueryRepository.queryOrderDetail(orderId);}publicOrderStatisticsgetStatistics(StatisticsQueryquery){returnqueryRepository.calculateStatistics(query);}}// 读模型(针对查询优化)@Entity@Table(name="order_summary")// 独立的读模型表publicclassOrderSummary{@IdprivateStringorderId;privateStringcustomerId;privateStringcustomerName;// 冗余字段(优化查询)privateStringcustomerPhone;// 冗余字段privateStringstatus;privateBigDecimaltotalAmount;privateIntegeritemCount;// 预计算字段privateStringfirstProductName;// 列表显示优化privateInstantcreatedAt;// 无业务方法,纯数据展示}// 读模型仓储(专注于查询)@RepositorypublicclassOrderQueryRepository{privatefinalJdbcTemplatejdbcTemplate;publicPageResultOrderSummaryqueryOrderSummaries(OrderListQueryquery){// 使用优化的查询 SQLStringBuildersql=newStringBuilder("SELECT order_id, customer_id, customer_name, status, "+"total_amount, item_count, created_at "+"FROM order_summary WHERE 1=1");ListObjectparams=newArrayList();if(query.getCustomerId()!=null){sql.append(" AND customer_id = ?");params.add(query.getCustomerId());}if(query.getStatus()!=null){sql.append(" AND status = ?");params.add(query.getStatus());}// 复杂查询逻辑集中处理// ...returnexecuteQuery(sql.toString(),params,query.getPage(),query.getSize());}}// ========== 读模型同步(监听领域事件)==========@ComponentpublicclassOrderSummaryProjector{privatefinalOrderQueryRepositoryqueryRepository;@EventHandlerpublicvoidon(OrderCreatedEventevent){// 从事件创建读模型OrderSummarysummary=newOrderSummary();summary.setOrderId(event.getOrderId().getValue());summary.setCustomerId(event.getCustomerId().getValue());summary.setStatus(OrderStatus.CREATED.name());summary.setTotalAmount(event.getTotalAmount().getAmount());summary.setItemCount(event.getItemCount());summary.setCreatedAt(event.getOccurredOn());queryRepository.save(summary);}@EventHandlerpublicvoidon(OrderPaidEventevent){// 更新读模型queryRepository.updateStatus(event.getOrderId().getValue(),OrderStatus.PAID.name());}@EventHandlerpublicvoidon(OrderCancelledEventevent){queryRepository.updateStatus(event.getOrderId().getValue(),OrderStatus.CANCELLED.name());}}架构图示:┌─────────────────────────────────────────────────────────┐ │ API Layer │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ OrderController │ │ OrderQueryController│ │ │ │ (POST/PUT/DELETE) │ │ (GET) │ │ │ └──────────┬──────────┘ └──────────┬──────────┘ │ └─────────────┼────────────────────────┼─────────────────┘ │ │ ┌─────────────┼────────────────────────┼─────────────────┐ │ │ Application │ │ │ ▼ Layer ▼ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ OrderCommandHandler │ │ OrderQueryService │ │ │ │ (写入编排) │ │ (查询编排) │ │ │ └──────────┬──────────┘ └──────────┬──────────┘ │ └─────────────┼────────────────────────┼─────────────────┘ │ │ ┌─────────────┼────────────────────────┼─────────────────┐ │ │ Domain Layer │ │ │ ▼ ▼ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Order Aggregate │ │ OrderSummary │ │ │ │ (业务规则验证) │ │ (查询视图) │ │ │ │ Order Repository │ │ Query Repository │ │ │ └──────────┬──────────┘ └──────────┬──────────┘ │ └─────────────┼────────────────────────┼────────────────┘ │ │ │ Domain Events │ │ ┌──────┐ │ └────────►│Events├──────►│ └──────┘ │ ┌──────────────────────────┼────────────┼─────────────────┐ │ │ Infrastructure Layer │ │ │ ▼ ▼ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Write Database │ │ Read Database │ │ │ │ (规范化、3NF) │ │ (反规范化、优化) │ │ │ │ │ │ │ │ │ │ orders │ │ order_summary │ │ │ │ order_items │ │ order_statistics │ │ │ │ customers │ │ customer_orders │ │ │ └─────────────────────┘ └─────────────────────┘ │ └─────────────────────────────────────────────────────────┘【设计优势】维度传统单一模型CQRS 分离模型模型职责混杂,读写冲突清晰,各司其职性能优化妥协,读写折中独立,各自优化查询灵活性受限,依赖领域模型自由,独立建模扩展性差,读写耦合好,独立扩展复杂度低高(需要同步机制)适用场景简单 CRUD复杂业务、高并发二、CQRS 架构模式详解2.1 CQRS 的层次划分【原理】CQRS 可以在不同层次实现:1. 代码层次 CQRS(最简单)读写使用不同的服务类共用同一数据库适合快速实现2. 数据库层次 CQRS读写使用不同的数据库表或视图通过视图或触发器同步适合查询优化需求3. 架构层次 CQRS(最完整)读写使用独立的数据存储通过事件或消息同步适合高并发、大规模系统【代码示例】// ========== 代码层次 CQRS ==========// 写服务@ServicepublicclassOrderCommandService{privatefinalOrderRepositoryorderRepository;@TransactionalpublicOrderIdcreateOrder(CreateOrderCommandcommand){Orderorder=Order.create(command);orderRepository.save(order);returnorder.getId();}@TransactionalpublicvoidpayOrder(PayOrderCommandcommand){Orderorder=orderRepository.findById(command.getOrderId());order.pay(command.getPaymentId());orderRepository.save(order);}}// 读服务(同一数据库,不同查询方式)@ServicepublicclassOrderQueryService{privatefinalOrderQueryRepositoryqueryRepository;publicOrderDetailgetOrderDetail(OrderIdorderId){returnqueryRepository.findDetailById(orderId);}publicPageResultOrderSummarylistOrders(OrderListQueryquery){returnqueryRepository.querySummaries(query);}}// ========== 数据库层次 CQRS ==========// 写模型表(规范化)CREATETABLEorders(idVARCHAR(36)PRIMARYKEY,customer_idVARCHAR(36)NOTNULL,statusVARCHAR(20)NOTNULL,total_amountDECIMAL(19,4),created_atTIMESTAMP,--最小化索引,只保留必要的主键和唯一约束INDEXidx_customer_id(customer_id));CREATETABLEorder_items(idVARCHAR(36)PRIMARYKEY,order_idVARCHAR(36)NOTNULL,product_idVARCHAR(36)NOTNULL,quantityINTNOTNULL,unit_priceDECIMAL(19,4),FOREIGNKEY(order_id)REFERENCESorders(id));// 读模型表(反规范化,针对查询优化)CREATETABLEorder_summary(order_idVARCHAR(36)PRIMARYKEY,customer_idVARCHAR(36),customer_nameVARCHAR(100),--冗余 customer_phoneVARCHAR(20),--冗余 statusVARCHAR(20),total_amountDECIMAL(19,4),item_countINT,--预计算 first_product_nameVARCHAR(200),--列表显示优化 created_atTIMESTAMP,--查询优化索引INDEXidx_customer_status(customer_id,status),INDEXidx_status_created(status,created_at),INDEXidx_created_at(created_at));// 通过触发器或视图同步(简单场景)CREATETRIGGERsync_order_summaryAFTERINSERTONordersFOREACHROWBEGININSERTINTOorder_summary(order_id,customer_id,status,total_amount,created_at)VALUES(NEW.id,NEW.customer_id,NEW.status,NEW.total_amount,NEW.created_at);END;// ========== 架构层次 CQRS ==========// 写端使用 MySQL@RepositorypublicclassJpaOrderRepositoryimplementsOrderRepository{@PersistenceContextprivateEntityManagerem;publicvoidsave(Orderorder){em.persist(order);}}// 读端使用 Elasticsearch@RepositorypublicclassEsOrderQueryRepositoryimplementsOrderQueryRepository{privatefinalElasticsearchRestTemplateesTemplate;publicOrderSummaryfindById(OrderIdorderId)