作者:kevinkrcai,腾讯IEG后台开发工程师
一、前言前段时间参与了海外客服系统相关需求的开发工作,其中需要实现客服对用户消息回复超时的相关处理策略。比如:当客服接收到用户消息后,如果在3分钟内没有回复用户,则需要给用户推送一个问卷表单;当客服30分钟还没有回复用户消息,则需要给对应的客服发告警消息等。这类属于在当前时间点往后推迟指定时间执行的任务,可以采用延时队列实现此类功能,本文对延时队列的相关实现方案做一个简单的整理和总结。
二、相关实现延时队列,就是具有延时功能的数据结构
2.1 基于内存数据结构实现基于链表实现基于链表实现的话,链表的每一个元素都是一个延迟任务,包含延时执行时间和延时任务的创建时间,旁路开启一个时间检测线程去轮询遍历链表,只要有当前时间减去延时任务的创建时间大于或等于延迟时间,任务就可以执行,执行完后从链表删除该任务,因此,新增任务的时间复杂度可以做到O(1),但是删除和检测任务是否能够执行时间复杂度则是O(N)。
上面是比较简单的实现思路,但是每次检测任务是否能够执行需要遍历整个链表,时间复杂度位O(N),如果在新增任务时,就根据执行时间的先后做排序,让延时时间最短的任务排在链表头部,那么检测任务是否能够执行和删除延时任务的时间复杂度能优化为O(1),相对前者较好,但是在新增任务时因需要做排序,所以时间复杂度会变为O(N)。
基于最小堆实现
基于排序链表的实现可以给我们提供了一种思路,将任务按照执行时间排序,所以我们可以考虑排序时间复杂度更优的数据结构:最小堆。JDK JUC包内置的延时队列DelayQueue就是基于最小堆实现。最小堆能保证每次新增任务,延时时间最近的排在堆的根结点,检测任务是否能够执行只需要检查根节点的任务是否到期了就行,检测任务时间复杂度是O(1),但是新增和删除任务,因为要做堆排序,所以时间复杂度是O(logN)
基于时间轮实现
什么是时间轮?
时间轮是一个存储定时任务的环状数组,相对上面提到的基于最小堆和链表的实现,时间轮可以做到新增,删除和检测任务的时间复杂度都是O(1)。
最简单的时间轮是单层时间轮,如下图所示,图中有8个时间格,每个时间格表示1s,那么这个时间轮就是表示周期为8s的时间轮,定时任务存放在时间轮每一个格子上,时间过去1s,则当前时间的指针则会向前移动一个单位时间。
延时任务如何存放在时间轮上:
计算某一个定时任务应该存放在时间轮哪一个下标下: ((延时任务执行时间 - 当前时间) / 时间轮单位时间 当前时间在时间轮的索引下标) % 时间周期
比如:当前时间00:03,现在有一个在00:04执行的任务,那么计算后得出,(1 / 1 3) % 8 = 4,定时任务应该存放在时间轮下标为4的格子上。同样道理,00:10执行的任务,就需要放在下标为2的格子上,可以复用之前的时间轮格子。
如果一个时间格子上刚好有多个任务,怎么执行?
如果延时执行的时间不同,但是下标计算的结果相同,放在了同一个时间格中,可以在这个任务中多一个round字段,表示要多转几圈,例如:当前时间是00:03,有两个延时任务,一个是00:10执行,一个是00:18执行,那他们都会放在下标为2的格子上,但是00:18执行的任务,round = 1,表示要多转一圈。只有round = 0的任务才是当前时间周期可以执行的。这些任务可以在对应的时间格中通过链表连接起来,每次对时间格的处理还需要遍历链表更新任务的round,超出当前时间周期的任务的round字段可以在减1,所以这块的时间复杂度是O(n)。
上面在检测任务的时候,还需要更新对应时间格链表的round字段,时间复杂度是O(n),通过层级时间轮可以将该复杂度优化为O(1),层级时间轮就是有多个时间轮,但是每个时间轮的单位时间是不一样的,如图所示:第一层时间轮的单位时间是1s,时间周期是0到8s,第二层的单位时间等于第一层的时间周期即8s,第二层的时间周期是0到64秒。这样对于超出第一层时间轮时间周期的任务就可以放在第二层。第一层转一圈,第二层转一格。
如下图所示:可以一个三层时间轮表示一天24小时,根据定时任务执行时间放置在不同的时间轮上,比如:02时00分01秒执行的任务比下面两个时间周期大,需要放在hours时间轮上,当current hour指向2,当前时间轮需要下降到下一层时间轮,直到下降到最后一层时间轮为止。
这类基于内存数据结构实现的延迟队列不支持分布式或者持久化,进程重启后数据会丢失,并且当延时任务很多时,队列中对象也会很多,不适合任务量比较大的情况,只适合不需要持久化且任务量相对较少的单机任务,并且能容忍丢失。
2.2 基于DB定时轮询实现基于DB 定时任务轮询的实现方案通常就是通过一个线程定时去扫描数据库表,找到可以执行的任务触发执行。基于DB轮询,虽然可以解决任务持久化,但是也有以下几个缺点:
2.3 基于Redis实现基于Redis发布订阅实现
- 首先,定时轮询就会存在延迟,这个延迟的误差就在于你设定的轮询的间隔时间。比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
- 第二,扫描数据表对DB有压力,如果任务数量多,表的数据量大,定时扫表会增加DB磁盘IO压力。
- 第三,对自身服务器,定时轮询也会增加CPU的开销。
延时消息事件通过 Redis 的pub/sub来进行分发, 设置key的过期时间是延迟时间,利用key过期事件通知来实现延迟消息。因为事件监听配置默认关闭,故开启 redis 的事件监听与发布,需修改redis.conf配置文件:
# notify-keyspace-events "" notify-keyspace-events Ex
连接redis,订阅监听expire事件:当key过期后,就会监听到过期的key
PSUBSCRIBE __keyevent@*__:expired
如图所示:监听5秒后过期的key:
小demo:
var redisCli *redis.Client func init() { // 连接redis redisCli = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) } func SetExpireEvent() { // 设置一个键,并且3秒钟之后过期 redisCli.Set(context.Background(), "test_expire_event_notify", "测试键值过期通知", 3*time.Second) fmt.Println(time.Now()) } func SubExpireEvent() { // 订阅key过期事件 sub := redisCli.PSubscribe(context.Background(), "__keyevent@*__:expired") for { msg := <- sub.Channel() fmt.Println("Channel ", msg.Channel) fmt.Println("pattern ", msg.Pattern) fmt.Println("pattern ", msg.Payload) // 过期的key fmt.Println(time.Now()) } } func main() { SetExpireEvent() go SubExpireEvent() select {} }
这种方案缺点在于:
基于Redis的ZSet实现
- 第一:redis官方文档中也明确指出,redis从未保证key过期后会立马删除key并立即发送过期通知,因为redis实现key过期的方式是定时随机扫描key,发现过期则删除或者在访问key的时候再去检查key是否过期,过期则删除,所以消息的延迟是必然存在的。
- 第二:redis的事件消息是通过Pub/Sub发出来的,Pub/Sub消息不会做持久化,也没有消息的Ack机制,那样如果消费者没有ack就挂掉或者中间断链,重启后无法再处理这个消息,导致消息丢失。
Redis的ZSet是基于score进行排序的有序数据结构,因此可以将延迟消息的到期时间戳作为score放到zset中,定时轮询存放延时任务的Zset,取出ZSet中最近要执行的延时任务和当前时间比较,小于等于当前时间即可以执行该任务。新增和查询任务的时间复杂度都是O(logN)。
小demo:
var redisCli *redis.Client func init() { // 连接redis redisCli = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) } func addTask() { // 添加一个三秒的定时任务 redisCli.ZAdd(context.Background(),"first-job",&redis.Z{Score:float64(time.Now().Add(3 * time.Second).Unix()),Member: 1}) fmt.Println("添加定时任务: ",time.Now()) } func getTaskFromZset() { for { luaScript := ` local message = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'WITHSCORES', 'LIMIT', 0, 1); if #message > 0 then redis.call('ZREM', KEYS[1], message[1]); return message[1]; else return nil; end ` res, err := redis.NewScript(luaScript).Run(context.Background(), redisCli, []string{"first-job"},strconv.FormatInt(time.Now().Unix(), 10)).Result() if err != nil { continue } // 执行延迟任务 fmt.Println("res:",res) fmt.Println("执行延迟任务:",time.Now()) } } func main() { addTask() go getTaskFromZset() select {} }
基于Redis ZSet实现相对简单,但也有几点需要注意:
2.4 基于MQ实现
- 第一就是延时消息处理的原子性:轮询线程从Zset获取最近要执行的任务执行并将该任务移除出Zset这几个步骤整体需要封装为一个原子操作,否则服务多实例环境下,可能被其他服务的轮询线程消费到同一任务。
- 第二,定时轮训Zset可能会出现很多空轮训消耗服务器CPU
基于Redis ZSet实现延时队列是一种理解起来比较简单直观并且可以快速落地的方案,但是需要我们自己去实现延迟消息的检测,利用消息队列对延迟消息的支持,我们可以只关心延迟消息的业务逻辑,而不需要关心延迟消息什么时候可以执行。
基于Pulsar延时消费实现Pulsar最早是在2.4.0引入了延迟消息投递的特性,在Pulsar中使用延迟消息,可以精确指定延迟投递的时间,有deliverAfter和deliverAt两种方式。其中deliverAt可以指定具体的时间戳;deliverAfter可以指定在当前多长时间后执行。
Pulsar延迟消息投递:
// 发送消息,5分钟后执行 _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ // 消息内容 Payload: []byte("hello go client, this is a message."), DeliverAfter: 5 * time.Minute, }) // 发送消息,在指定时间执行 _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ // 消息内容 Payload: []byte("hello go client, this is a message."), DeliverAt: time.Now().Add(time.Second * 10), })
Pulsar延迟消息实现原理:Pulsar中的延迟消息是基于Netty中的时间轮去实现的,通过时间轮去推动时间,到达对应时间后去寻找对应的消息索引,延迟消息的索引通过优先级队列维护,优先队列根据延时时间排序,维护在Broker内存中。找到消息后在发给Consumer去消费。 Pulsar延迟消息有以下不足之处:
- 第一:延迟消息索引维护在内存中,一条延迟消息的消息索引由三个long类型的字段组成<延迟执行的时间戳以及用于定位消息的ledgerId和entryId,所以当有大量的延迟消息需要执行或者延迟消息的延迟时间越长,对内存的开销也会越大。
- 第二:如果集群出现 Broker 宕机或者 topic 的 ownership 转移,Pulsar 会重建延迟消息索引,对于大规模的延时消息,索引重建的时间也会比较长。
我们目前是为海外的超核玩家服务,用户基数比较小,所以我们也选用了腾讯云的TDMQ-Pulsar作为延时队列的实现方案。
基于RabbitMQ实现基于RabbitMQ实现有两种方式:
- 第一种是基于RabbitMQ的死信队列实现
- 第二种是基于RabbitMQ官方的延迟插件实现
其实RabbitMQ本身是不支持延迟消息的,但是RabbitMQ支持消息和队列TTL和死信队列,因此,可以利用这两个特性实现延迟队列的效果。
什么是死信消息?
在消息生产和消费过程中,当消息出现以下情况,则会变为死信消息:
- 消息被拒绝且不再重复投递
- 消息超时未被消费,TTL过期
- 队列达到最大长度无法正常投递消息
死信消息会被投递到Dead-Letter-Exchange,然后根据绑定的规则转发到队列的死信队列上,监听这些队列就消费消息就可以实现延迟消息的效果。
但是基于这种方式实现的延迟队列有个缺陷:RabbitMQ只会检查第一个消息是否过期,比如这种情况,第一个消息设置了20s的TTL,第二个消息设置了10s的TTL,RabbitMQ只会等到第一个消息过期了,才会让接下来的第二个消息过期,因此,可能会出现延迟消息投递不按时。
第一种肯定是不能应用于生产环境的,因此RabbitMQ推出了官方的延迟插件
这个插件其实是在消息发到Exchange后,将延迟消息暂时存放在mnesia(一种分布式数据库),等消息到期后在投递到队列中给消费者去消费。
使用延时插件实现延时队列也存在一些局限:
三、总结
- 延迟消息存储在 Mnesia 数据库,在集群情况下,数据只会分布在其中一个节点,如果该节点宕机,延时消息不可用,但是恢复之后消息不会丢
- 官方文档也有提到,该插件依赖 Erlang 计时器,设计上并 不适合大量的延迟消息,如:百万级以上的延时消息
- 插件一旦被禁用了,没有被消费的延时消息会丢失。
,