DDD(领域驱动设计)实战之CQRS篇

本文关键字:
领域驱动设计(DDD, Domain Driven Design)、微服务(Microservices)、JVM、Spring Boot、Spring、Java、Reactive(响应式编程)、Service Mesh、Cloud Native、后端服务、Java后端服务、Java微服务、Java架构设计、Java项目实战、后端项目实战

DDD(Domain Driven Design, 领域驱动设计)实战系列之CQRS篇. 本文将从电商应用层面, 剖析CQRS该如何设计, 并顺便介绍一种轻量级的CQRS框架—Axon.

想了解更多DDD(领域驱动设计)相关知识、提升DDD实战技能, 请点击这里

Axon 简介

Axon有助于开发更加模块化的SOA服务, Axon框架本身也提供Aggregates(聚合子)、Repostories及Event Bus(事件总线)等实现. Axon支持Java注解(annotation), 开发友好, 如在构建Aggregates或事件监听器时更加容易上手. 运行Axon本身并不会创建其它线程(或连接), 可以安全地运行在Web应用服务器上(如Tomcat,Jetty等), Axon底层主要通过Executor封装异步行为, 解耦各项Task.

代码实战

以电商场景为例, 如下, 业务领域(Domain)主要有俩大实体: 订单产品. 用户购买产品、创建新订单, 新订单创建完后、检查库存, 根据付款状态决定是否发货等. 可以参考下图, 但实际生产环境中还需要考虑其它业务逻辑, 如订单合法性校验、订单支付状态检测、用户未付款订单超时或取消订单、产品库存该如何更新等.

如图所示, 整个业务流程参考如下:

  1. 当用户在浏览器中点击”立即订购”时, 发送REST请求至订单控制器(Order Controller)
  2. 订单控制器(Order Controller)解析请求, 创建新订单指令方至指令网关(Command Gateway), 经指令网关(Command Gateway)转发至指令总线(Command Bus)
  3. 指令总线(Command Bus)接收指令并转发给订单指令处理器(Order Command Handler)
  4. 订单指令处理器(Order Command Handler)获取对应产品库存信息
  5. 基于产品库存信息, 假定订单正常、库存正常情况下减库存, 更新产品Repository, 更新写库(Write DB)并持久化
  6. Repository更新触发库存更新事件(StockUpdatedEvent)至事件总线(Event Bus), 由事件总线(Event Bus)发送给所有感兴趣的事件监听器
  7. 由于产品事件处理器(Product Event Handler)订阅了库存更新事件(StockUpdatedEvent), 获取相关事件通知及事件消息
  8. 基于库存更新事件(StockUpdatedEvent)细节, 产品事件处理器(Product Event Handler)更新产品读模型
  9. 订单指令处理器(Order Command Handler)发送消息, 创建新订单
  10. 新订单创建完毕, 向订单Repository发送插入请求, 写入订单实体并持久化于写库(Write DB)
  11. 发送订单已创建事件(OrderCreatedEvent)至事件总线(Event Bus), 通过事件总线(Event Bus)发送至所有感兴趣的事件监听器
  12. 由于订单事件处理器(Order Event Handler)订阅了订单已创建事件, 获取相关事件通知及事件消息
  13. 基于订单已创建事件, 订单事件处理器(Order Event Handler)更新订单读模型
  14. 用户刷新浏览器, 加载已更新的View

虽然整个下单业务简单, 无非就是用户下单、新建订单、更新库存, 但也请知悉, 如何细粒度量化实现逻辑, 并非易事. 比如在实际生产环境中还需要考虑其它业务复杂性, 如订单合法性校验、订单支付状态检测、用户未付款订单超时或手动取消订单、产品库存复原该如何处理等.

组件梳理

本例所需考虑组件有:

  • Aggregates/Aggregate Root (聚合子或聚合根)
  • Event Bus, 事件总线
  • Command Bus, 命令总线
  • Repositories, 如订单 Repository、产品Repository等
  • Gateway, 网关, 如指令网关等
  • Controller, 如订单Controller等
  • Handlers, 如订单指令Handler等
  • Event Listener, 事件监听器

SpringBoot应用
应用主类, 代码参考如下:

1
2
3
4
5
6
@SpringBootApplication
public class EcomApplication {
public static void main(String[] args) {
SpringApplication.run(EcomApplication.class, args);
}
}

Aggregate Root建模

其中, 订单Aggregate Root建模代码, 参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.axonframework.domain.AbstractAggregateRoot;

@Entity
@Table(name="ECOM_ORDER")
@Data
@EqualsAndHashCode(exclude = { "id" })
public class Order extends AbstractAggregateRoot<Integer> {
@Id
private Integer id;

@Column(name="PRICE")
private Double price;

@Column(name="NUMBER")
private Integer number;

@Column(name="ORDER_STATUS")
@Enumerated(EnumType.STRING)
private OrderStatusEnum orderStatus;

@ManyToOne(fetch=FetchType.LAZY)
@JoinColumn(name="PRODUCT_ID")
private Product product;

@Override
public Integer getIdentifier() {
return id;
}

public Order(Integer id, Double price, Integer number,
OrderStatusEnum orderStatus, Product product) {
super();
this.id = id;
this.price = price;
this.number = number;
this.orderStatus = orderStatus;
this.product = product;

registerEvent(new OrderCreatedEvent(
id, price, number,
product.getDescription(), orderStatus.toString()));
}
}

类似地, 产品Aggregate Root建模, 参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Entity
@Table(name="ECOM_PRODUCT")
@Data
@EqualsAndHashCode(exclude = { "id" })
public class Product extends AbstractAggregateRoot<Integer> {
@Id
private Integer id;

@Column(name="PRICE")
private Double price;

@Column(name="STOCK")
private Integer stock;

@Column(name="DESCRIPTION")
private String description;

@Override
public Integer getIdentifier() {
return id;
}

public void depreciateStock(int count) {
if (this.stock >= count) {
this.stock = this.stock - count;
registerEvent(new StockUpdatedEvent(id, stock));
} else {
throw new RuntimeException("Out of stock");
}
}
}

指令和事件建模

指令(Command)和事件(Event)建模方面, 相对简单, 代码参考如下, 如本例中新建订单指令类似于DTO(数据传输对象, Data Transfer Object).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Data
public class NewOrderCommand {
private final Double price;
private final Integer number;
private final Integer productId;
}

public class OrderCreatedEvent {
private final Integer id;
private final Double price;
private final Integer number;
private final String productDescription;
private final String orderStatus;
}

public class StockUpdatedEvent {
private final Integer id;
private final Integer stock;
}

指令网关、事件总线与指令总线

代码参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Configuration
public class EcomAppConfiguration {

@PersistenceContext
private EntityManager entityManager;

@Bean
public SimpleCommandBus commandBus() {
return new SimpleCommandBus();
}

@Bean
public SimpleEventBus eventBus() {
return new SimpleEventBus();
}

@Bean
AnnotationCommandHandlerBeanPostProcessor annotationCommandHandlerBeanPostProcessor() {
AnnotationCommandHandlerBeanPostProcessor handler =
new AnnotationCommandHandlerBeanPostProcessor();
handler.setCommandBus(commandBus());
return handler;
}

@Bean
AnnotationEventListenerBeanPostProcessor annotationEventListenerBeanPostProcessor() {
AnnotationEventListenerBeanPostProcessor listener =
new AnnotationEventListenerBeanPostProcessor();
listener.setEventBus(eventBus());
return listener;
}

@Bean
public DefaultCommandGateway commandGateway() {
return new DefaultCommandGateway(commandBus());
}

@Bean
@Qualifier("productRepository")
public GenericJpaRepository<Product> productJpaRepository() {
SimpleEntityManagerProvider entityManagerProvider =
new SimpleEntityManagerProvider(entityManager);
GenericJpaRepository<Product> genericJpaRepository =
new GenericJpaRepository(entityManagerProvider, Product.class);
genericJpaRepository.setEventBus(eventBus());
return genericJpaRepository;
}

@Bean
@Qualifier("orderRepository")
public GenericJpaRepository<Order> orderJpaRepository() {
SimpleEntityManagerProvider entityManagerProvider =
new SimpleEntityManagerProvider(entityManager);
GenericJpaRepository<Order> genericJpaRepository =
new GenericJpaRepository(entityManagerProvider, Order.class);
genericJpaRepository.setEventBus(eventBus());
return genericJpaRepository;
}
}

Controller与Handler

订单Controller, 代码参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
public class OrderController {
@Autowired
private DataSource dataSource;

@Autowired
private CommandGateway commandGateway;

@RequestMapping(value = "/orders", method = RequestMethod.POST)
@Transactional
public ResponseEntity<Void> addNewOrder(@RequestBody OrderDTO orderDTO) {
commandGateway.sendAndWait(
new NewOrderCommand(
orderDTO.getPrice(), orderDTO.getNumber(), orderDTO.getProductId()));
return new ResponseEntity<>(HttpStatus.OK);
}
}

订单指令Handler, 代码参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class OrderCommandHandler {
@Autowired
@Qualifier("orderRepository")
private Repository<Order> orderRepository;

@Autowired
@Qualifier("productRepository")
private Repository<Product> productRepository;

@CommandHandler
public void handle(NewOrderCommand newOrderCommand){
Product product = productRepository.load(newOrderCommand.getProductId());
product.depreciateStock(newOrderCommand.getNumber());

Order order = new Order(
new Random().nextInt(), newOrderCommand.getPrice(),
newOrderCommand.getNumber(), OrderStatusEnum.NEW, product);
orderRepository.add(order);
}
}

订单事件Handler, 代码参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class OrderEventHandler {
@Autowired
DataSource dataSource;

@EventHandler
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
// 当然, 还可以有更加优雅的实现 (尽量避免在代码中注入SQL语句), 此处代码仅为示例,
// Get到大概逻辑即可(基于OrderCreatedEvent事件, 更新订单读模型)
jdbcTemplate.update(
"INSERT INTO ecom_order_view VALUES(?,?,?,?,?)",
new Object[]{
event.getId(), event.getPrice(), event.getNumber(),
event.getProductDescription(), event.getOrderStatus()
}
);
}
}

产品事件Handler, 代码参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class ProductEventHandler {
@Autowired
DataSource dataSource;

@EventHandler
public void handleProductStockUpdatedEvent(StockUpdatedEvent event) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
// 可以有更加优雅的实现 (尽量避免在代码中注入SQL语句), 此处仅为示例代码, Get到大概逻辑即可
jdbcTemplate.update(
"UPDATE ecom_product_view SET stock=? WHERE ID=?",
new Object[]{event.getStock(), event.getId()}
);
}
}

事件监听器

事件的发布与订阅、事件监听等功能可通过消息中间件实现, 如RabbitMQ. 对于Spring Boot而言, 几行代码的问题, 此处就不贴代码了.

想学习更多DDD(领域驱动设计)相关知识、提升DDD实战技能, 请点击这里

效果检测

连接数据库, 创建几条测试数据 (注意当运行以上微服务代码时, 会自动创建创建ecom_product表以及ecom_order表, 具体原因请参考以上Java代码).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
mysql> drop table ecom_order;
mysql> drop table ecom_product;
mysql> drop table ecom_order_view;
mysql> drop table ecom_product_view;

mysql> -- 显式创建以下两表于读库(Read DB)
mysql> create table ecom_product_view(id INT, price DOUBLE,
stock INT ,description VARCHAR(255));

mysql> create table ecom_order_view(id INT , price DOUBLE, number INT,
description VARCHAR(225),status VARCHAR(50));

mysql> insert into ecom_product(id,description,price,stock,version)
values(1,'Shirts',100,5,0);
mysql> insert into ecom_product(id,description,price,stock,version)
values(2,'Pants',100,5,0);
mysql> insert into ecom_product(id,description,price,stock,version)
values(3,'T-Shirt',100,5,0);
mysql> insert into ecom_product(id,description,price,stock,version)
values(4,'Shoes',100,5,0);

mysql> insert into ecom_product_view(id,description,price,stock)
values(1,'Shirts',100,5);
mysql> insert into ecom_product_view(id,description,price,stock)
values(2,'Pants',100,5);
mysql> insert into ecom_product_view(id,description,price,stock)
values(3,'T-Shirt',100,5);
mysql> insert into ecom_product_view(id,description,price,stock)
values(4,'Shoes',100,5);

运行:

1
$ mvn clean && mvn springboot:run

打开浏览器, 查看页面效果:

点击订购按钮”Order One Now”, 可以看到基本业务功能正常, 示意图参考如下