前提

分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案。 参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(jdbc)做了轻量级的封装,实现了低入侵性的事务消息模块。 本文的内容就是详细分析整个方案的设计思路和实施。 环境依赖如下:

方案设计思路

事务消息原则上只适合弱一致性(或者说「最终一致性」)的场景,常见的弱一致性场景如:

「强一致性的场景一般不应该选用事务消息」

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(1)

一般情况下,要求强一致性说明要严格同步,也就是所有操作必须同时成功或者同时失败,这样就会引入同步带来的额外消耗。 如果一个事务消息模块设计合理,补偿、查询、监控等等功能都完毕,由于系统交互是异步的,整体吞吐要比严格同步高。 在笔者负责的业务系统中基于事务消息使用还定制了一条基本原则:「消息内容正确的前提下,消费方出现异常需要自理」

简单来说就是:上游保证了自身的业务正确性,成功推送了正确的消息到RabbitMQ就认为上游业务已经结束。

为了降低代码的入侵性,事务消息需要借助Spring的「编程序事务」或者「声明式事务」。 编程序事务一般依赖于transactionTemplate,而声明式事务依托于AOP模块,依赖于注解@Transactional。

接着需要自定义一个事务消息功能模块,新增一个事务消息记录表(其实就是「本地消息表」),用于保存每一条需要发送的消息记录。 事务消息功能模块的主要功能是:

事务执行的逻辑单元

在事务执行的逻辑单元里面,需要进行待推送的事务消息记录的保存,也就是:「本地(业务)逻辑和事务消息记录保存操作绑定在同一个事务」

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(2)

发送消息到RabbitMQ服务端这一步需要延后到「事务提交之后」,这样才能保证事务提交成功和消息成功发送到RabbitMQ服务端这两个操作是一致的。 为了把「保存待发送的事务消息」「发送消息到RabbitMQ」两个动作从用户感知角度合并为一个动作,这里需要用到Spring特有的事务同步器TransactionSynchronization,这里分析一下事务同步器的主要方法的回调位置, 主要参考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(3)

上图仅仅演示了事务正确提交的场景(不包含异常的场景)。 这里可以明确知道,事务同步器TransactionSynchronization的afterCommit()和afterCompletion(int status)方法都在真正的事务提交点AbstractPlatformTransactionManager#doCommit()之后回调, 因此可以选用这两个方法其中之一用于执行推送消息到RabbitMQ服务端,整体的僞代码如下:

@TransactionalpublicDtobusinessMethod(){ businesstransactioncodeblock... //保存事务消息 [saveTransactionMessageRecord()] //注册事务同步器-在afterCommit()方法中推送消息到RabbitMQ [registerTransactionSynchronization,sendmessageinmethodafterCommit()] businesstransactioncodeblock... }

上面伪代码中,「保存事务消息」「注册事务同步器」两个步骤可以安插在事务方法中的任意位置,也就是说与执行顺序无关。

事务消息的补偿

虽然建议下游服务自理自身服务消费异常的场景,但是有些时候迫于无奈还是需要上有把对应的消息重新推送,这算是一个特殊的场景。另外还有一个场景需要考虑:事务提交之后出发事务同步器TransactionSynchronization的afterCommit()方法失败。这是一个低概率的场景,但是在生产中一定会出现,一个比较典型的原因就是:

「事务提交完成后尚未來得及触发TransactionSynchronization#afterCommit()方法进行推送服务实例就被重启」

如下图所示:

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(4)

为了统一处理补偿推送的问题,使用了有限状态判断消息是否已经推送成功:

还有一种极为特殊的情况是RabbitMQ服务端本身出现故障导致消息推送异常,这种情况下需要进行重试(补偿推送),「经验证明短时间内的反复重试是没有意义的」,故障的服务常规不会瞬时恢复,所以可以考虑使用「指数退避算法」进行重试,同时需要限制最大重试次数。

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(5)

指数值、间隔值和最大重试次数上限需要根据实际情况设定,否则容易出现消息延时过大或者重试过于频繁等问题。

方案实施

引入核心依赖:

<properties>

<spring.boot.version>2.2.4.RELEASE</spring.boot.version>

<redisson.version>3.12.1</redisson.version>

<mysql.connector.version>5.1.48</mysql.connector.version></properties><dependencyManagement>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-dependencies</artifactId>

<version>${spring.boot.version}</version>

<type>pom</type>

<scope>import</scope>

</dependency>

</dependencies></dependencyManagement><dependencies>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>${mysql.connector.version}</version>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-jdbc</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-aop</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

<dependency>

<groupId>org.redisson</groupId>

<artifactId>redisson</artifactId>

<version>${redisson.version}</version>

</dependency></dependencies>

spring-boot-starter-jdbc、mysql-connector-java和spring-boot-starter-aop是MySQL事务相关,而spring-boot-starter-amqp是RabbitMQ客户端的封装,redisson主要使用其分布式锁, 用于补偿定时任务的加锁执行(以防止服务多个节点并发执行补偿推送)。

表设计

事务消息模块主要涉及两张表,以MySQL为例,见表DDL如下:

CREATETABLE`t_transactional_message` ( idBIGINTUNSIGNEDAUTO_INCREMENTPRIMARYKEY, create_timeDATETIMENOTNULLDEFAULTCURRENT_TIMESTAMP, edit_timeDATETIMENOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP, creatorVARCHAR(20)NOTNULLDEFAULT'admin', editorVARCHAR(20)NOTNULLDEFAULT'admin', deletedTINYINTNOTNULLDEFAULT0, current_retry_timesTINYINTNOTNULLDEFAULT0COMMENT'当前重试次数', max_retry_timesTINYINTNOTNULLDEFAULT5COMMENT'最大重试次数', queue_nameVARCHAR(255)NOTNULLCOMMENT'队列名', exchange_nameVARCHAR(255)NOTNULLCOMMENT'交换器名', exchange_typeVARCHAR(8)NOTNULLCOMMENT'交换类型', routing_keyVARCHAR(255)COMMENT'路由鍵', business_moduleVARCHAR(32)NOTNULLCOMMENT'业务模块', business_keyVARCHAR(255)NOTNULLCOMMENT'业务键', next_schedule_timeDATETIMENOTNULLCOMMENT'下一次调度时间', message_statusTINYINTNOTNULLDEFAULT0COMMENT'消息状态', init_backoffBIGINTUNSIGNEDNOTNULLDEFAULT10COMMENT'退避初始化值,单位为秒', backoff_factorTINYINTNOTNULLDEFAULT2COMMENT'退避因子(也就是指数)', INDEXidx_queue_name(queue_name), INDEXidx_create_time(create_time), INDEXidx_next_schedule_time(next_schedule_time), INDEXidx_business_key(business_key) )COMMENT'事务消息表'; CREATETABLE`t_transactional_message_content` ( idBIGINTUNSIGNEDAUTO_INCREMENTPRIMARYKEY, message_idBIGINTUNSIGNEDNOTNULLCOMMENT'事务消息记录ID', contentTEXTCOMMENT'消息內容' )COMMENT'事务消息內容表';

因为此模块有可能扩展出一个后台管理模块,所以要把消息的管理和状态相关字段和大体积的消息内容分别存放在两个表,从而避免大批量查询消息记录的时候MySQL服务IO使用率过高的问题(这是和上一个公司的DBA团队商讨后得到的一个比较合理的方案)。 预留了两个业务字段business_module和business_key用于标识业务模块和业务键(一般是唯一识别号,例如订单号)。

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(6)

一般情况下,如果服务通过配置自行提前声明队列和交换器的绑定关系,那么发送RabbitMQ消息的时候其实只依赖于exchangeName和routingKey两个字段(header类型的交换器是特殊的,也比较少用,这里暂时不用考虑),考虑到服务可能会遗漏声明操作, 发送消息的时候会基于队列进行首次绑定声明并且缓存相关的信息(RabbitMQ中的队列-交换器绑定声明只要每次声明绑定关系的参数一致,则不会抛出异常)。

方案代码设计

下面的方案设计描述中,暂时忽略了消息事务管理后台的API设计,这些可以在后期补充。

定义贫血模型实体类TransactionalMessage和TransactionalMessageContent:

@DatapublicclassTransactionalMessage{ privateLongid; privateLocalDateTimecreateTime; privateLocalDateTimeeditTime; privateStringcreator; privateStringeditor; privateIntegerdeleted; privateIntegercurrentRetryTimes; privateIntegermaxRetryTimes; privateStringqueueName; privateStringexchangeName; privateStringexchangeType; privateStringroutingKey; privateStringbusinessModule; privateStringbusinessKey; privateLocalDateTimenextScheduleTime; privateIntegermessageStatus; privateLonginitBackoff; privateIntegerbackoffFactor; } @DatapublicclassTransactionalMessageContent{ privateLongid; privateLongmessageId; privateStringcontent; }

然后定义dao接口(这里暂时不展开实现的细节代码,存储使用MySQL,如果要替换为其他类型的数据库,只需要使用不同的实现即可):

public interface TransactionalMessageDao {

void insertSelective(TransactionalMessage record);

void updateStatusSelective(TransactionalMessage record);

List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,

LocalDateTime maxScheduleTime,

int limit);

}

public interface TransactionalMessageContentDao {

void insert(TransactionalMessageContent record);

List<TransactionalMessageContent> queryByMessageIds(String messageIds);

}

接着定义事务消息服务接口TransactionalMessageService:

//对外提供的服务类接口 publicinterfaceTransactionalMessageService{ voidsendTransactionalMessage(Destinationdestination,TxMessagemessage); } @Getter@RequiredArgsConstructorpublicenumExchangeType{ FANOUT("fanout"), DIRECT("direct"), TOPIC("topic"), DEFAULT(""), ; privatefinalStringtype; } //发送消息的目的地publicinterfaceDestination{ ExchangeTypeexchangeType(); StringqueueName(); StringexchangeName(); StringroutingKey(); } @BuilderpublicclassDefaultDestinationimplementsDestination{ privateExchangeTypeexchangeType; privateStringqueueName; privateStringexchangeName; privateStringroutingKey; @Override publicExchangeTypeexchangeType(){ returnexchangeType; } @Override publicStringqueueName(){ returnqueueName; } @Override publicStringexchangeName(){ returnexchangeName; } @Override publicStringroutingKey(){ returnroutingKey; } } //事務消息publicinterfaceTxMessage{ StringbusinessModule(); StringbusinessKey(); Stringcontent(); } @BuilderpublicclassDefaultTxMessageimplementsTxMessage{ privateStringbusinessModule; privateStringbusinessKey; privateStringcontent; @Override publicStringbusinessModule(){ returnbusinessModule; } @Override publicStringbusinessKey(){ returnbusinessKey; } @Override publicStringcontent(){ returncontent; } } //消息状态@RequiredArgsConstructor publicenumTxMessageStatus{ /** *成功 */ SUCCESS(1), /** *待处理 */ PENDING(0), /** *处理失敗 */ FAIL(-1), ; privatefinalIntegerstatus; }

TransactionalMessageService的实现类是事务消息的核心功能实现,代碼如下:

@Slf4j@Service@RequiredArgsConstructorpublicclassRabbitTransactionalMessageServiceimplementsTransactionalMessageService{ privatefinalAmqpAdminamqpAdmin; privatefinalTransactionalMessageManagementServicemanagementService; privatestaticfinalConcurrentMap<String,Boolean>QUEUE_ALREADY_DECLARE=newConcurrentHashMap<>(); @Override publicvoidsendTransactionalMessage(Destinationdestination,TxMessagemessage){ StringqueueName=destination.queueName(); StringexchangeName=destination.exchangeName(); StringroutingKey=destination.routingKey(); ExchangeTypeexchangeType=destination.exchangeType(); //原子性的预声明 QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName,k->{ Queuequeue=newQueue(queueName); amqpAdmin.declareQueue(queue); Exchangeexchange=newCustomExchange(exchangeName,exchangeType.getType()); amqpAdmin.declareExchange(exchange); Bindingbinding=BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); amqpAdmin.declareBinding(binding); returntrue; }); TransactionalMessagerecord=newTransactionalMessage(); record.setQueueName(queueName); record.setExchangeName(exchangeName); record.setExchangeType(exchangeType.getType()); record.setRoutingKey(routingKey); record.setBusinessModule(message.businessModule()); record.setBusinessKey(message.businessKey()); Stringcontent=message.content(); //保存事务消息记录 managementService.saveTransactionalMessageRecord(record,content); //注册事务同步器 TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronizationAdapter(){ @Override publicvoidafterCommit(){ managementService.sendMessageSync(record,content); } }); } }

消息记录状态和內容持久化的管理統一放在TransactionalMessageManagementService中:

@Slf4j@RequiredArgsConstructor@ServicepublicclassTransactionalMessageManagementService{ privatefinalTransactionalMessageDaomessageDao; privatefinalTransactionalMessageContentDaocontentDao; privatefinalRabbitTemplaterabbitTemplate; privatestaticfinalLocalDateTimeEND=LocalDateTime.of(2999,1,1,0,0,0); privatestaticfinallongDEFAULT_INIT_BACKOFF=10L; privatestaticfinalintDEFAULT_BACKOFF_FACTOR=2; privatestaticfinalintDEFAULT_MAX_RETRY_TIMES=5; privatestaticfinalintLIMIT=100; publicvoidsaveTransactionalMessageRecord(TransactionalMessagerecord,Stringcontent){ record.setMessageStatus(TxMessageStatus.PENDING.getStatus()); record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(),DEFAULT_INIT_BACKOFF, DEFAULT_BACKOFF_FACTOR,0)); record.setCurrentRetryTimes(0); record.setInitBackoff(DEFAULT_INIT_BACKOFF); record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR); record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES); messageDao.insertSelective(record); TransactionalMessageContentmessageContent=newTransactionalMessageContent(); messageContent.setContent(content); messageContent.setMessageId(record.getId()); contentDao.insert(messageContent); } publicvoidsendMessageSync(TransactionalMessagerecord,Stringcontent){ try{ rabbitTemplate.convertAndSend(record.getExchangeName(),record.getRoutingKey(),content); if(log.isDebugEnabled()){ log.debug("发送消息成功,目标队列:{},消息內容:{}",record.getQueueName(),content); } //标记成功 markSuccess(record); }catch(Exceptione){ //标记失敗 markFail(record,e); } } privatevoidmarkSuccess(TransactionalMessagerecord){ //标记下一次执行时间为最大值 record.setNextScheduleTime(END); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes())>=0? record.getMaxRetryTimes():record.getCurrentRetryTimes() 1); record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } privatevoidmarkFail(TransactionalMessagerecord,Exceptione){ log.error("发送消息失败,目标队列:{}",record.getQueueName(),e); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes())>=0? record.getMaxRetryTimes():record.getCurrentRetryTimes() 1); //计算下一次的执行时间 LocalDateTimenextScheduleTime=calculateNextScheduleTime( record.getNextScheduleTime(), record.getInitBackoff(), record.getBackoffFactor(), record.getCurrentRetryTimes() ); record.setNextScheduleTime(nextScheduleTime); record.setMessageStatus(TxMessageStatus.FAIL.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } /** *计算下一次执行时间 * *@parambase基础时间 *@paraminitBackoff 退避基准值 *@parambackoffFactor退避指数 *@paramround轮数 *@returnLocalDateTime */ privateLocalDateTimecalculateNextScheduleTime(LocalDateTimebase, longinitBackoff, longbackoffFactor, longround){ doubledelta=initBackoff*Math.pow(backoffFactor,round); returnbase.plusSeconds((long)delta); } /** *推送补偿-里面的參數应该根据实际场景定制 */ publicvoidprocessPendingCompensationRecords(){ //时间的右值为当前时间減去退避初始值,这里預防把剛保存的消息也推送了 LocalDateTimemax=LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF); //时间的左值为右值減去1小時 LocalDateTimemin=max.plusHours(-1); Map<Long,TransactionalMessage>collect=messageDao.queryPendingCompensationRecords(min,max,LIMIT) .stream() .collect(Collectors.toMap(TransactionalMessage::getId,x->x)); if(!collect.isEmpty()){ StringJoinerjoiner=newStringJoiner(",","(",")"); collect.keySet().forEach(x->joiner.add(x.toString())); contentDao.queryByMessageIds(joiner.toString()) .forEach(item->{ TransactionalMessagemessage=collect.get(item.getMessageId()); sendMessageSync(message,item.getContent()); }); } } }

这里有一点尚待优化:更新事务消息记录状态的方法可以优化为批量更新,在limit比较大的时候,批量更新的效率会更高。

最后是定时任务的配置类:

@Slf4j@RequiredArgsConstructor@Configuration@EnableSchedulingpublicclassScheduleJobAutoConfiguration{ privatefinalTransactionalMessageManagementServicemanagementService; /** *这里用的是本地的Redis,实际上要做成配置 */ privatefinalRedissonClientredisson=Redisson.create(); @Scheduled(fixedDelay=10000) publicvoidtransactionalMessageCompensationTask()throwsException{ RLocklock=redisson.getLock("transactionalMessageCompensationTask"); //等待時間5秒,預期300秒執行完畢,這兩個值需要按照實際場景定製 booleantryLock=lock.tryLock(5,300,TimeUnit.SECONDS); if(tryLock){ try{ longstart=System.currentTimeMillis(); log.info("开始执行事务消息推送补偿定時任务..."); managementService.processPendingCompensationRecords(); longend=System.currentTimeMillis(); longdelta=end-start; //以防鎖過早釋放 if(delta<5000){ Thread.sleep(5000-delta); } log.info("执行事务消息推送补偿定時任务完毕,耗時:{}ms...",end-start); }finally{ lock.unlock(); } } } }

基本代码编写完,整个項目的结构如下:

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(7)

最后添加两个测试类:

@RequiredArgsConstructor@ComponentpublicclassMockBusinessRunnerimplementsCommandLineRunner{ privatefinalMockBusinessServicemockBusinessService; @Override publicvoidrun(String...args)throwsException{ mockBusinessService.saveOrder(); } } @Slf4j@RequiredArgsConstructor@ServicepublicclassMockBusinessService{ privatefinalJdbcTemplatejdbcTemplate; privatefinalTransactionalMessageServicetransactionalMessageService; privatefinalObjectMapperobjectMapper; @Transactional(rollbackFor=Exception.class) publicvoidsaveOrder()throwsException{ StringorderId=UUID.randomUUID().toString(); BigDecimalamount=BigDecimal.valueOf(100L); Map<String,Object>message=newHashMap<>(); message.put("orderId",orderId); message.put("amount",amount); jdbcTemplate.update("INSERTINTOt_order(order_id,amount)VALUES(?,?)",p->{ p.setString(1,orderId); p.setBigDecimal(2,amount); }); Stringcontent=objectMapper.writeValueAsString(message); transactionalMessageService.sendTransactionalMessage( DefaultDestination.builder() .exchangeName("tm.test.exchange") .queueName("tm.test.queue") .routingKey("tm.test.key") .exchangeType(ExchangeType.DIRECT) .build(), DefaultTxMessage.builder() .businessKey(orderId) .businessModule("SAVE_ORDER") .content(content) .build() ); log.info("保存订单:{}成功...",orderId); } }

某次测试结果如下:

2020-02-05 21:10:13.287 INFO 49556 --- [ main] club.throwable.cm.MockBusinessService : 保存订单:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...

rabbitmq的五种模式和案例:RabbitMQ的实现可复用的事务消息案例(8)

模拟订单数据成功保存,而且RabbitMQ消息在事务成功提交后正常发送到RabbitMQ服务端中,如RabbitMQ控制台数据所示。

小结

事务消息模块的设计仅仅是使异步消息推送这个功能实现趋向于完备,其实一个合理的异步消息交互系统,一定会提供同步查询接口,这一点是基于异步消息没有回调或者没有响应的特性导致的。 一般而言,一个系统的吞吐量和系统的异步化处理占比成正相关(这一点可以参考Amdahl's Law),所以在系统架构设计实际中应该尽可能使用异步交互,提高系统吞吐量同时减少同步阻塞带来的无谓等待。 事务消息模块可以扩展出一个后台管理,甚至可以配合Micrometer、Prometheus和Grafana体系做实时数据监控。

,