本地事件表加消息队列实现分布式事务思路
本地事件表加消息队列实现分布式事件方案实现数据的最终一致性。本地事件表作用是为了事件溯源(Event-Sourcing),消息队列实现事件通知。
事件表要求记录了业务表操作的所有事件,所有事件的组合就是表中数据的生命周期。
本篇是 分布式微服务应用系列(九):分布式事务概念及解决方案 的延续。
本地事件表
本地事件表记录了操作对象时涉及分布式事务的每个步骤,作用是用于事件溯源(Event-Sourcing ),实现回滚操作。
一般应用在领域对象模型中。在一个 对象 从 创建 到 销毁 的整个生命周期中,会产生大量的事件(Event),每个事件都会有自己的 事件类型(Event Type)。
事件 包含时间、事件类型、模型等信息,事件类型可以是枚举的。可以将 事件 理解为某一个具体动作,如例,在某一时刻创建一个订单 则是一个事件,而 创建 是一种事件类型。
事件记录
在将对象最终状态记录到数据库时,还要记录对象所发生的一系列事件变更,这些事件需要按时间先后顺序记录到数据库中。也就是说,数据库中不仅包含了对象所属性的模型表,还包含了对象所产生事件的事件表。
当创建、修改、删除一条模型对象时(查询一般不考虑),不修需要个修改模型对象表中的数据,还需要记录一条对应的事件到事件表中,这个过程称为 事件记录
每个模型都有一个唯一ID,称为 Model ID。事件对象也同样有一个唯一的 ID,称为 Event ID,在事件表中关联了 Model ID,这样可以在事件表中 找到 Model ID,这一过程称为 事件溯源。
事件对象
事件记录过程中,先操作模型表,再操作事件表,这两个操作是在同一个事务中;而在事件溯源过程中,是先操作事件表,再操作模型表,这两个操作是在同一个事务中。
事件表字段与事件模型对象属性对应,结构比较固定,也可根据实际需要灵活设计,但大体包括以下基础字段:
- id:具有唯一性的事件 ID。
- type:事件类型,例如,create,update,delete 等。
- process:事件进行到的环节,如新建,已发布,已处理。
- content:事件内容,通常是需要传递的JSON格式数据。
- modelId:对应的模型对象 ID,具有唯一性。
- createdTime:事件的创建时间,一般存时间戳,精确到毫秒。
- updateTime:事件修改时间,一般存时间戳,精确到毫秒。。
方案实现思路
创建对象
创建用于封装事件的类和表,及事件处理器。
- Event:封装事件相关字段,与 event 表映射。
- EventType:枚举类定义事件类型。
- EventHandler:事件处理器,封装事件相关操作。
- Foo:Foo 对象模型(实体类),与 Foo Table 映射。
- Bar:Bar 对象模型(实体类),与 Bar Table 表映射。
实现思路
第一步:Foo Service 操作 Foo 模型表与 event 事件表,并将事件写入消息队列
- 在 Foo Service 中插入一条 Foo 对象到 Foo Table 中
- 同时创建一条名 Create Foo 的事件(Event 对象)
- 将 Event 对象插入到 Event Table 中,与插入 Foo 对象在同一个事务中提交。
- 在事条提交后,将 Event 对象发送到 MQ 的 foo-success-queue 消息队列中。
第二步:Bar Service 从消息队列中获取事件,操作 Bar 模型表,若有异常,则将源事件发送到 foo-fail-queue 队列中。
从 foo-success-queue 中获取 Event 对象。
将 Bar 对象插入模型表中。
若 JDBC 操作出现异常。
则在 catch 中将 Event 对象写入 foo-fail-queue 队列,并抛出异常让事务回滚。
注意:如果使用的是 RabbitMQ,则在 catch 中必须抛出 Spring AMQP 框架提供的 AmqpRejectAndDontRequeueException 异常(集成 Spring AMQP 框架),否则即使事务回滚,还会重新进入 foo-success-queue 队列,重复处理相同的消息,导致此过程不断循环,直到超出 RabbitMQ 队列所限制的 Reuque 次数,这是不希望看到的。
当抛出 AmqpRejectAndDontRequeueException 时,Event 对象将不再进入 foo-success-queue 队列,而是进入 DLQ (Dead Letter Queue,死信队列),进入的消息将被丢弃永不被消费,或人为执行额外处理。
第三步:Foo Service 从消息队列中获取事件,操作事件表与模型表(即 事件溯源 过程)
- 从 foo-fail-queue 中取回 Event 对象
- 根据 Event 中的 Modele ID 到 Event Table 中查出 Foo ID。
- 根据 Foo ID 到 Foo Table 中删除曾经创建的 Foo 对象。
注意事项
- INSERT 语句的逆向操作是 DELETE 语句,UPDATE 语句的逆向操作也是 UPDATE 语句,但对于 DELETE 语句,通常采用逻辑删除(更新删除标记字段),而不是物理删除,因此 DELETE 语句的逆向操作也是 UDPATE 语句。
- 在实际应用环境中,应尽量避免分布式事务,通过合理的切换微服务边界可解决大部分分布式事务问题。
实现补充
上面方案完全是基于 MQ 解耦的方式实现,即消息消费服务是完全独立的。
但在 Spring Cloud 或微服务环境,Foo Service 服务通常会直接调用 Bar Service 接口,就有可能出现接口调用失败,或 Bar 服务修改失败的情况,且对于这些失败希望人为可知,并实现一些补偿机制。
可以通过创建一个 可靠性消息服务 和 消息发送服务 的应用来解决上述的问题,大致流程如下:
Foo Service 进行修改操作。若成功则调用 可靠性消息服务 发送修改操作的信息;若失败,则回滚修改,不发送消息给可靠性消息服务。
当可靠性消息服务收到信息之后存入消息表中,此服务器负责接口信息并存储,修改消息数据,不负责发送逻辑。
封装信息实体类与的表结构映射,字段如下:
- id:信息唯一ID。
- message:消息内容,以 JSON 存储。
- queue:消息队列名称。
- send_system:发生消息的服务名。
- send_count:重复发送消息次数。
- create_time:创建时间
- last_send_time:最后发送消息的时间。
- status:消息状态:0-等待消费,1-已消费,2-已死亡。
- die_count:死亡次数,由使用方决定,默认为发送 10 次还没被消费则标记死亡,人工介入。
- consume_time:消费时间
- consume_system:消费的服务名
- die_time:死亡时间。
消息发送服务启动一个线程,在线程中一直循环从消息表中获取没有处理的信息,发送到消息队列;如果消息的发送次数已超过死亡的次数,则改成死亡信息不再进行处理。
注意:消息发送服务可能会有多个实例,需要使用分布式锁,只让一个实例执行,若没有要处理的消息,则休眠一段时间,最后释放分布式锁。
Bar Service 消费 MQ 中的消息,然后进行需要的修改操作,成功后调用可靠性消息服务的接口,进行消息已被正常消费的确认工作(更改消息状态相关数据)。当发生异常时,MQ 会重发消息,直到超过重发的次数。
另可开发消息管理系统,通过 Web 控制台管理消息,具体有以下功能点:
- 查看消息列表。
- 可以根据不同的状态查询消息。
- 对死亡的消息进行重发操作。
- 删除已被消费的消息。
最大努力通知型事务
最大努力通知型事务适用于跟外部系统之间的通讯,通过定期通知的方式来达到数据的一致性。
尽最大的努力通知对方,但无法保证一定能通知到,可以提供查询接口给对方查询。
例如,调用支付系统,需要一个回调地址,在支付成功后,支付系统将支付结果返回给回调地址,如果支付系统没有收到客户机返回的确认,支付系统会重复回调,直到通知 N 次后不再通知,同时提供支付结果查询接口给客户机调用。