在日常开发中,延时任务是一个无法规避的话题。也存在各种不同的方案,比如:
- 数据库轮询方案。建立一个调度任务,周期性从数据库中查询待执行的任务,如果满足延时要求,并执行对于的业务操作;
- 单机内存解决方案。可以使用 DelayQueue、ScheduledExecutorService、TimerWheel等数据结构在内存中对任务进行维护,并运行满足延时的任务;
- 分布式延时队列方案。可以使用基于 redis 的 redisson 延时任务,也可以使用 RocketMQ 延时队列。
当然,在所有的方案中,分布式延时队列方案是最佳方法,当然也是最复杂的方案。
1.1. 背景在延时任务这个场景,分布式延时队列方案 是最优策略,所以,不少公司制定了相关规范,只能使用 RocketMQ 实现延时调度。系统的稳定性有了一定的保障,但操作的复杂性抛给了下面的研发人员。
从一个 Leader 角度,我一直认为“只定规范,不提供工具,是极度不负责任的表现”。
1.2. 目标期望框架能够提供:
- 不需要 Coding,快速使一个方法具有延时运行的能力;
- 可通过参数指定延时时长;
- 任务创建和消费分离,在不同的集群中完成,以更好的支持资源隔离;
框架基于 RocketMQ 进行构建,请自行完成 RocketMQ 的搭建。
2.1. 引入 RocketMQ我们使用 rocketmq starter 完成基本配置。
首先,在 pom 中增加 rocketmq starter 依赖,具体如下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
其次,在 application.yml 中添加 rocketmq 配置,具体如下:
rocketmq: name-server:http://127.0.0.1:9876 producer: group:async-demo
其中,name-server 根据具体情况进行配置。
配置完成,可以在项目中:
2.2. 添加 lego-starter 依赖
- 注入 RocketMQTemplate 进行消息发送;
- 使用 @RocketMQMessageListener 标记处理方法,进行消息消费;
为了方便与 spring-boot 项目集成,lego 提供 lego-starter,以完成快速接入。
在 pom 中增加 starter,具体如下:
<dependency> <groupId>com.geekhalo.lego</groupId> <artifactId>lego-starter</artifactId> <version>0.1.5-delay_task-SNAPSHOT</version> </dependency>
其中,自动配置机制将完成:
2.3. @DelayBasedRocketMQ
- 注册 DelayMethodInterceptor,对 @DelayBasedRocketMQ 注解进行拦截,将请求发送至 RocketMQ;
- 构建并启动DelayConsumerContainer,监听 topice 的消息,用于消费消息
我们只需在方法上添加 @DelayBasedRocketMQ 注解,完成基础配置,该方法便具备延时处理的能力。具体如下:
@DelayBasedRocketMQ( topic="${cancelOrder.delay.topic}", tag="delayCancelOrder", consumerGroup="${cancelOrder.delay.consumerGroup1}", delayLevel=2 ) publicvoiddelayCancelOrder(LongorderId,Stringreason){ DelayTaskdelayTask=newDelayTask(orderId,reason,null); log.info("RunCancelOrder{}",delayTask); this.tasks.add(delayTask); }
@DelayBasedRocketMQ 定义如下:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public@interfaceDelayBasedRocketMQ{ /** *RocketMQtopic *@return */ Stringtopic(); /** *Tag *@return */ Stringtag()default"*"; /** *延迟级别 *@return */ intdelayLevel()default-1; /** *延迟时间SpEL表达式 *@return */ StringdelayLevelSpEl()default"0"; /** *nameServer配置 *@return */ StringnameServer()default"${rocketmq.name-server:}"; /** *消费者组信息 *@return */ StringconsumerGroup(); /** *消费者运行的profile,主要用于发送和消费分离的场景 *@return */ StringconsumerProfile()default""; }
在 application 文件中增加相关配置,具体如下:
cancelOrder: delay: topic:delay-task-test-topic consumerGroup1:delay-cancel-order-group1 consumerGroup2:delay-cancel-order-group2
写一个简单的单测,代码如下:
@Test voiddelayCancelOrder()throwsException{ LongorderId=RandomUtils.nextLong(); Stringreason="超时自动取消"; this.delayService.delayCancelOrder(orderId,reason); Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks())); TimeUnit.SECONDS.sleep(4); Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks())); TimeUnit.SECONDS.sleep(6); Assertions.assertTrue(CollectionUtils.isNotEmpty(this.delayService.getTasks())); }
运行单测,日志如下:
10:16:09.951[main]c.g.l.core.delay.DelayMethodInterceptor:successtosentDelayTasktoRocketMQfor[7973190633030104064,超时自动取消] 10:16:14.964[MessageThread_1]com.geekhalo.lego.delay.DelayService:RunCancelOrderDelayService.DelayTask(orderId=7973190633030104064,reason=超时自动取消,timeOutLevel=null) 10:16:14.964[MessageThread_1]c.g.l.c.s.AbstractConsumerContainer:consumemessage2408820718E04140899ECD9401476CBB137358644D46308D59D10000,cost:8ms
为了方便,对部分日志进行简化,但不影响分析结果。
从运行日志可以得出:
- 10:16:09 主线程 main 向 MQ 发送一条延时消息;
- 5秒以后(10:16:14)消费线程拉取到延时任务,并调用业务方法(DelayService#delayCancelOrder)
从日志上看,完全符合设计预期。
2.4. 动态设置延时级别不同的场景需要不同的延时级别,在 @DelayBasedRocketMQ 直接指定死,不方便业务扩展。
如果需要动态指定延时级别,可以使用 @DelayBasedRocketMQ 的 delayLevelSpEl ,通过 SpEL 从上下文中读取配置,具体如下:
@DelayBasedRocketMQ( topic="${cancelOrder.delay.topic}", tag="delayCancelOrderForTimeout", consumerGroup="${cancelOrder.delay.consumerGroup2}", delayLevelSpEl="#timeOutLevel" ) publicvoiddelayCancelOrderForTimeout(LongorderId,Stringreason,inttimeOutLevel){ DelayTaskdelayTask=newDelayTask(orderId,reason,timeOutLevel); log.info("RunCancelOrder{}",delayTask); this.tasks.add(delayTask); }
其中,delayLevelSpEl = "#timeOutLevel" 含义为,将参数 timeOutLevel 的值作为 延时级别。
编写单元测试用例,具体如下:
@Test voiddelayCancelOrder_DelayTime()throwsException{ LongorderId=RandomUtils.nextLong(); Stringreason="超时自动取消"; this.delayService.delayCancelOrderForTimeout(orderId,reason,3); Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks())); TimeUnit.SECONDS.sleep(9); Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks())); TimeUnit.SECONDS.sleep(11); Assertions.assertTrue(CollectionUtils.isNotEmpty(this.delayService.getTasks())); }
运行测试用例,观察日志如下:
10:27:56.257[main]c.g.l.core.delay.DelayMethodInterceptor:successtosentDelayTasktoRocketMQfor[844282856080074752,超时自动取消,3] 10:28:06.281[MessageThread_1]com.geekhalo.lego.delay.DelayService:RunCancelOrderDelayService.DelayTask(orderId=844282856080074752,reason=超时自动取消,timeOutLevel=3) 10:28:06.282[MessageThread_1]c.g.l.c.s.AbstractConsumerContainer:consumemessage2408820718E04140899ECD9401476CBB13D358644D46309820DB0000,cost:13ms
从日志上可见,延时时间已经成功调整为 10秒(10:27:56 发送任务,10:28:06 接收到任务),符合设计预期。
2.5. 任务创建和消费分离为了更好的对资源进行隔离,有时需要单独部署一组集群,用于处理后台任务。
为支持该模式,@DelayBasedRocketMQ提供了 consumerProfile 配置,用于指定 Consumer 在哪个 profile 下执行,如果不设置,则对环境不进行任何要求。
3.设计&扩展3.1. 核心设计
整体架构
在方法上添加注解后,框架自动完成:
3.2. 核心流程
- 增加 DelayMethodInterceptor 和 PointcutAdvisor Bean,用于对方法进行拦截,将请求转发至 MQ;
- 创建并启动 DelayConsumerContainer,通过 MQConsumer 监听消息变更,并调用 业务方法;
核心流程如下:
4. 项目信息
- 方法被调用,被 DelayMethodInterceptor 拦截;
- 首先,对调用参数进行序列化;
- 然后,将信息封装为 Message
- 最后,向 RocketMQ 发送延时消息
- 消息在RocketMQ进行存储,当到达延时时间时,将 Message 投放至 Consumer;
- MQPushConsumer,监听到消息,并完成业务操作;
- Consumer 获得 Message 信息
- 将消息进行反序列化,获得调用参数
- 使用调用参数调用业务方法
项目仓库地址:https://gitee.com/litao851025/lego
项目文档地址:https://gitee.com/litao851025/lego/wikis/support/delayBasedRocketMQ
,