本文源码基于 https://pkg.go.dev/golang.org/x/time@v0.0.0-20210220033141-f8bda1e9f3ba/rate
上回我们讲到,使用限速器的时候我们需要调用 NewLimiter 方法,然后 Limiter 提供了三组限速的方法,这三组方法其实都是通过调用 reserveN 实现的 reserveN 返回一个 *Reservation 指针,我们先来看一下这两个结构体吧。
Limiter
type Limiter struct { // 互斥锁 mu sync.Mutex // 每秒产生 token 的速度, 其实是 float64 的一个别名 limit Limit // 桶的大小 burst int // 当前时间节点拥有的 tokens 数量 tokens float64 // 上次更新 token 的时间 last time.Time // 上次限速的时间,这个时间可能是过去的某个时间也可能是将来的某个时间 lastEvent time.Time }
Reservation这个结构体挺有意思的,表示预约某个时间的 token
type Reservation struct { // 是否能预约上 ok bool // limter lim *Limiter // 预约的 token 数量 tokens int // token 实际使用的时间 timeToAct time.Time // 保存一下速率,因为 lim 的速率是可以被动态调整的,所以不能直接用 limit Limit }
这个库并没有使用定时器来发放 token 而是用了 lazyload 的方式,等需要消费 token 的时候才通过时间去计算然后更新 token 的数量,下面我们先通过一个例子来看一下这个流程是怎么跑的
blog.png
如上图所示,假设我们有一个限速器,它的 token 生成速度为 1,也就是一秒一个,桶的大小为 10,每个格子表示一秒的时间间隔
消费 token
- last 表示上一次更新 token 时还有 2 个 token。
- 现在我有一个请求进来,我总共需要 7 个 token 才能完成这个请求
- now 表示我现在进来的时间,距离 last 已经过去了 2s,那么现在就有 4 个 token
- 所以我如果需要 7 个 token 那么也就还需要等待 3s 中才真的有 7 个,所以这就是 timeToAct 所在的时间节点
- 预约成功之后更新 last = now 、token = -3 因为 token 已经被预约出去了所以现在剩下的就是负数了
总共有三组消费 token 的方法 AllowN, ReserveN, and WaitN最终都是调用的reserveN` 这个方法
// now: 需要消费 token 的时间点 // n: 需要多少个 token // maxFutureReserve: 能够等待的最长时间 func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() // 如果发放令牌的速度无穷大的话,那么直接返回就行了,要多少可以给多少 if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } } // advance 方法会去计算当前有多少个 token // 后面会讲到,now 其实就是传入的时间,但是 last 可能会变 now, last, tokens := lim.advance(now) // 发放 token 之后还剩多少 tokens -= float64(n) // 根据 token 数量计算需要等待的时间 var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) } // 计算是否可以发放,如果需要的量比桶的容量还大肯定是不行的 // 然后就是看需要能否容忍需要等待的时间 ok := n <= lim.burst && waitDuration <= maxFutureReserve // Prepare reservation r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } // 如果可以的话,就把 token 分配给预约者 if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) } // 更新各个字段的状态 if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { // 为什么不 ok 也要更新 last 呢?因为 last 可能会改变 lim.last = last } lim.mu.Unlock() return r }
advance 方法用于计算 token 的数量
// now 是传入的当前的时间点,返回的 newNow 其实就是传入的参数,没有任何改变 // newLast 是更新 token 的时间 // newTokens 是 token 的数量 func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { // 如果当前时间比上次更新 token 的时间还要早,那么就重置一下 last last := lim.last if now.Before(last) { last = now } // 这里为了防止溢出,先计算了将桶填满需要花费的最大时间 maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) // 计算时间差,如果大于最大时间的话,就取最大值 elapsed := now.Sub(last) if elapsed > maxElapsed { elapsed = maxElapsed } // 计算这段时间生成的 token 数量,如果大于桶的容量,就取桶的容量 delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens delta if burst := float64(lim.burst); tokens > burst { tokens = burst } return now, last, tokens }
这个比较有意思的是先去计算了时间的最大值,因为初始化的时候没为 last 赋值,所以 now.Before(last) 出来的结果可能是一个很大的值,再去计算 tokens 数量很可能溢出
durationFromTokens 根据 tokens 的数量计算需要花费的时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration { seconds := tokens / float64(limit) return time.Nanosecond * time.Duration(1e9*seconds) }
tokensFromDuration 根据时间计算 tokens 的数量
消费 token 总结
func (limit Limit) tokensFromDuration(d time.Duration) float64 { // 这里通过拆分整数和小数部分可以减少时间上的误差 sec := float64(d/time.Second) * float64(limit) nsec := float64(d%time.Second) * float64(limit) return sec nsec/1e9 }
消费 token 的逻辑就讲完了,我们大概总结一下
WaitN
- 需要消费的时候,先去计算一下,从过去到现在可以生成多少个 token
- 然后我们通过需要的 token 减去现在拥有的 token 数量,就得到了需要预约的 token 数量
- 再通过 token 数量转换成时间,就可以得到需要等待的时间长度,以及是否可以消费
- 然后再通过不同的消费方法进行消费
其他两类消费方法都很简单,调用 Reservation 进行返回, WaitN 还有一点逻辑,所以我们一起来看一下
取消消费
// ctx 用于控制超时, n 是需要消费的 token 数量,如果 context 的 Deadline 早于要等待的时间就会直接返回失败 func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { lim.mu.Lock() burst := lim.burst limit := lim.limit lim.mu.Unlock() // 先看一下是不是已经超出消费极限了 if n > burst && limit != Inf { return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) } // 如果 ctx 已经结束了也不用等了 select { case <-ctx.Done(): return ctx.Err() default: } // 计算一下可以等待的时间 now := time.Now() waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) } // 调用 reserveN 得到预约数据 r := lim.reserveN(now, n, waitLimit) // 如果不 ok 说明预约不到 if !r.ok { return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) } // 如果可以预约到,计算一下需要等多久 delay := r.DelayFrom(now) if delay == 0 { return nil } // 启动一个 timer 进行定时 t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: // We can proceed. return nil case <-ctx.Done(): // 如果 context 主动取消了,那么值钱预约的 token 数量需要归还 r.Cancel() return ctx.Err() } }
WaitN 当中如果预约上了,但是 Context 取消了,会调用 CancelAt 归还 tokens 我们来一起看一下是怎么实现的
存在的问题
func (r *Reservation) CancelAt(now time.Time) { // 不 ok 说明没有预约上,直接返回就行了 if !r.ok { return } r.lim.mu.Lock() defer r.lim.mu.Unlock() // 如果没有速率限制,或者没有消费 token 或 token 已经被消费了,都不用还了 if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { return } // 计算需要还的 token 数量 // 这里说是需要减去已经预支的 token 数量,但是我发现应该是个 bug,感觉这里减重复了 restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) if restoreTokens <= 0 { return } // 计算当前拥有的 tokens 数量 now, _, tokens := r.lim.advance(now) // 当前拥有的加上需要归还的就是现有的,但是不能大于桶的容量 tokens = restoreTokens if burst := float64(r.lim.burst); tokens > burst { tokens = burst } // 更新 tokens 数量 r.lim.last = now r.lim.tokens = tokens // 如果相等说明后面没有新的 token 消费,所以将状态重置到上一次 if r.timeToAct == r.lim.lastEvent { prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) if !prevEvent.Before(now) { r.lim.lastEvent = prevEvent } } return }
除了上面提到的感觉 cancelAt 可能有一个 bug 外,云神的博客还提到了一个问题,就是如果我们 cancel 了的话,后面已经在等待的任务是不会重新调整的,举个例子
func wait() { l := rate.NewLimiter(10, 10) t := time.Now() l.ReserveN(t, 10) var wg sync.WaitGroup ctx, cancel := context.WithTimeout(context.TODO(), time.Hour) defer cancel() // 注释掉下面这段就不会提前 cancel wg.Add(1) go func() { defer wg.Done() // 模拟出现问题, 200ms就取消了 time.Sleep(200 * time.Millisecond) cancel() }() wg.Add(2) go func() { defer wg.Done() // 如果要等,这个要等 1s 才能执行,但是我们的 ctx 200ms 就会取消 l.WaitN(ctx, 10) fmt.Printf("[1] cost: %s\n", time.Since(t)) }() time.Sleep(100 * time.Millisecond) go func() { defer wg.Done() // 正常情况下,这个要等 1.2 s 才能执行,但是我们前面都取消了 // 这个是不是应该就只需要等 200ms 就执行了 ctx, cancel := context.WithTimeout(context.Background(), time.Hour) defer cancel() l.WaitN(ctx, 2) fmt.Printf("[2] cost: %s\n", time.Since(t)) }() wg.Wait() }
我们先看一下不提前 cancel 的结果
[1] cost: 1.0002113s [2] cost: 1.2007347s
再看看提前 cancel 的结果
[1] cost: 200.8268ms [2] cost: 1.201066s
可以看到就是 1 有变化,从 1s -> 200ms 但是 2 一直都要等 1.2s
总结仔细看了一下令牌桶的实现,但是也留下了一个疑问,如果哪位童鞋知道希望可以留言告诉我,在取消的时候,会减掉一个预约的时间,但是我发现这里其实应该是重复减了一次
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
下面是测试代码
func main() { t0 := time.Now() t1 := time.Now().Add(100 * time.Millisecond) t2 := time.Now().Add(200 * time.Millisecond) t3 := time.Now().Add(300 * time.Millisecond) l := rate.NewLimiter(10, 20) l.ReserveN(t0, 15) // 桶里还剩 5 个 token fmt.Printf("% v\n", l) r := l.ReserveN(t1, 10) // 桶还有 -4 个, fmt.Printf("% v\n", l) // 注释掉下面两行,最后结果还剩 8 个 token l.ReserveN(t2, 2) // 桶里还有 -5 个 fmt.Printf("% v\n", l) r.CancelAt(t3) fmt.Printf("% v\n", l) // 归还之前借的,运行结果 桶里还有 4 个 // 但是这里不应该剩下 6 个么,本来有 5 个,300ms 生成了 3 个,后面又预支出去 2 个 // 而且我发现如果我注释掉预支两个的代码,结果和我预期的一致,剩余 8 个token }
作者:moshuishou
来源:https://lailin.xyz/post/go-training-week6-3-token-bucket-2.html
,