作者 | Leo叔叔 责编 | 欧阳姝黎
如果能够将所有内存都分配到栈上无疑性能是最佳的,但不幸的是我们不可避免需要使用堆上分配的内存。我们可以优化使用堆内存时的性能损耗吗?答案是肯定的。Go同步包中,sync.Pool
提供了保存和访问一组临时对象并复用它们的能力。
对于一些创建成本昂贵、频繁使用的临时对象,使用sync.Pool
可以减少内存分配,降低GC压力。因为Go
的gc算法是根据标记清除改进的三色标记法,如果频繁创建大量临时对象,势必给GC标记带来负担,CPU也很容易出现毛刺现象。当然需要注意的是:存储在Pool
中的对象随时都可能在不被通知的情况下被移除。所以并不是所有频繁使用、创建昂贵的对象都适用,比如DB连接、线程池。
Talk is cheap,Show me your code
因为Go1.13版本后对sync.Pool做了优化,放弃了利用sync.Mutex加锁的方式该用CAS加带环形数组的双向链表的方式来实现,本文基于Go1.15.8最新稳定版本分析。
基本使用
package mainimport "sync"type Person struct {Age int}// 初始化poolvar personPool = sync.Pool{New: funcinterface{} {return new(Person)},}funcmain {// 获取一个实例newPerson := personPool.Get.(*Person)// 回收对象 以备其他协程使用defer personPool.Put(newPerson)newPerson.Age = 25}
使用起来比较简单大概分三步:
初始化
Pool
,提供一个New函数,当Pool中未缓存该对象时调用使用
Get
从缓存池中获取对象,接着进行业务逻辑处理即可使用完毕 利用
Put
将对象交还给缓存池需要注意的是:跟
sync.Mutex
一样sync.Pool
第一次使用之后是不允许被拷贝的。那
sync.Pool
对性能优化真的有这么大魔力吗?Benchmark之
import ("testing")funcBenchmarkWithoutPool(b *testing.B) {var p *Personb.ReportAllocsb.ResetTimerfor i := 0; i < b.N; i {for j := 0; j < 10000; j {p = new(Person)p.Age = 30}}}funcBenchmarkWithPool(b *testing.B) {var p *Personb.ReportAllocsb.ResetTimerfor i := 0; i < b.N; i {for j := 0; j < 10000; j {p = personPool.Get.(*Person)p.Age = 30personPool.Put(p)}}}
基准测试结果:
BenchmarkWithoutPoolBenchmarkWithoutPool-8 7630 135523 ns/op 80000 B/op 10000 allocs/opBenchmarkWithPoolBenchmarkWithPool-8 9865 126072 ns/op 0 B/op 0 allocs/op
工作原理
没有啥一张图搞不定的
如果不行 那就再来一张
sync.Pool数据结构
type Pool struct {noCopy noCopy// 实际指向poolLocal 每个P对应一个poolLocal 数组大小取决于P的数量 runtime.GOMAXPROCS(0)local unsafe.PointerlocalSize uintptr // poolLocal的大小victim unsafe.Pointer // local from previous cyclevictimSize uintptr // size of victims array//当缓存池无对应对象时调用New funcinterface{}}
相较于
Go1.13
之前版本,sync.Pool
的结构体中新增了victim
、victimSize
字段
sync.Pool
主要维护了一个sync.poolLocal
的数组,数组大小由runtime.GOMAXPROCS(0)
决定。
type poolLocal struct {poolLocalInternal// Prevents false sharing on widespread platforms with// 128 mod (cache line size) = 0 .pad [128 - unsafe.Sizeof(poolLocalInternal{})8]byte}// Local per-P Pool appendix.type poolLocalInternal struct {private interface{} // 只能被对应的P使用shared poolChain // 本地的P可以从Head 进行pushHead/popHead 其他的P可以poptail.}
poolLocal
内部又由P私有空间private
和共享空间shared
。共享空间是一个双端队列,双端队列每个节点又对应着一个环形数组,听着貌似有点儿绕,老规矩上图:
poolDequeue
算是个逻辑上的环形数组,字段vals
存储着实际的值,出于操作原子性的考虑,headTail
字段将首尾索引融合在一起,高32位为head的索引下标,低32位为tail的索引下标,head和tail指向同一位置则表示环形数组为空。
代码佐证:
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {const mask = 1<<dequeueBits - 1head = uint32((ptrs >> dequeueBits) & mask)tail = uint32(ptrs & mask)return}func (d *poolDequeue)pack(head, tail uint32)uint64{const mask = 1<<dequeueBits - 1return (uint64(head) << dequeueBits) |uint64(tail&mask)}
sync.Pool
实际使用过程中又将poolDequeue
进行了包装,因为数组大小是固定,所以为了让他大小可变,将其包装成了poolChainElt
双向链表。
操作方法
接下来我们来剖析一下
sync.Pool
几个核心流程获取对象 p.Get
获取对象,大体流程:
将当前
goroutine
与P
绑定并防止被抢占 具体是调用了runtime_procPin
,返回poolLocal
和P的id
优先从私有空间获取对象
若私有空间没有,则尝试从共享区域获取
若共享区域也没拿到,则尝试从别人那边“偷”来一个
若偷都偷不到,那么自己手动New一个
func (p *Pool) Getinterface{} {// 将当前goroutine与P进行绑定 runtime_procPin禁用抢占// 返回poolLocal与P的idl, pid := p.pinx := l.private //尝试直接从私有空间拿l.private = nilif x == nil {//从共享区域头部拿x, _ = l.shared.popHeadif x == nil {//直接实在没有 尝试去别人那边看看能不能偷个x = p.getSlow(pid)}}// 解除抢占禁用runtime_procUnpin// 都没有 那只好自己New一个if x == nil && p.New != nil {x = p.New}return x}
那么我们来看看goroutine 是怎么跟P绑定的
func (p *Pool) pin (*poolLocal, int) {pid := runtime_procPin// pinSlow中我们先存储local再存储localSize,这里我们以相反顺序加载// 因为我们已经禁用了抢占 GC这期间不会发生 因此我们需要观察local的大小至少跟localSize一样s := atomic.LoadUintptr(&p.localSize) // load-acquirel := p.local // load-consumeif uintptr(pid) < s {return indexLocal(l, pid), pid}// 运行过程中可能会存在调整P的情况 或者GC了return p.pinSlow}
这里我们先调用
runtime_procPin
,为啥它这么牛逼,不仅让P不会被抢占,还让GC为之折腰?番外:禁止抢占
func runtime_procPinint//go:linkname sync_runtime_procPin sync.runtime_procPin//go:nosplitfuncsync_runtime_procPinint{return procPin}//go:nosplitfuncprocPinint{_g_ := getgmp := _g_.mmp.locks return int(mp.p.ptr.id)}
正如所见,兜兜转转实际绑定goroutine和P、禁用抢占交给了
procPin
。procPin
首先从TLS或专用寄存器拿到当前的goroutine
,然后获取当前gorountine
绑定的物理线程,并对物理线程的locks
属性自增操作。这意味什么呢?这里可能涉及到一些
goroutine
调度的内容,Go runtime调度是一个gpM模型。G为调度的基本单元,P可以理解为运行G的逻辑CPU M为系统线程。何为抢占?即,将
m
绑定的P给占用,因为Go runtime
中99.9%的任务都需要P
才能执行任务。Go运行时调度主要存在两种抢占的情况:
第一种情况,进行系统调用的G,因为存在阻塞,傻傻等在那里会比较浪费计算资源,为了让其他goroutine不被饿死
第二种情况,如果一个G运行时间太长,P中其他G得不到执行也会饿死
抢占实现
Go
中的抢占是sysmon
实现的。对 没错就是runtime.main
里的那个sysmon
也是唯一一个脱离GPM
模型只需GM
即可运行的特例。sysmon
中包含了netpool
、retake
、forcegc
、scavengeheap
,这里抢占我们需要关注下retake
。
//go:nowritebarrierrecfuncsysmon {...// retake P's blocked in syscalls// and preempt long running G'sif retake(now) != 0 {idle = 0} else {idle }...}funcretake(now int64)uint32{...if s == _Prunning || s == _Psyscall {// Preempt G if it's running for too long.t := int64(_p_.schedtick)if int64(pd.schedtick) != t {pd.schedtick = uint32(t)pd.schedwhen = now} else if pd.schedwhen forcePreemptNS <= now {//G运行时间超过forcePreemptNSpreemptone(_p_)// In case of syscall, preemptone doesn't// work, because there is no M wired to P.sysretake = true}...}
P处于运行中或系统调用,检查
G
运行时间是否超过forcePreemptNS(10ms)
,超过则调用preemptone(_p_)
抢占这个P
func preemptone(_p_ *p)bool{mp := _p_.m.ptrif mp == nil || mp == getg.m {return false}gp := mp.curgif gp == nil || gp == mp.g0 {return false}gp.preempt = true// Every call in a go routine checks for stack overflow by// comparing the current stack pointer to gp->stackguard0.// Setting gp->stackguard0 to StackPreempt folds// preemption into the normal stack overflow check.gp.stackguard0 = stackPreempt// Request an async preemption of this P.if preemptMSupported && debug.asyncpreemptoff == 0 {_p_.preempt = truepreemptM(mp)}return true}
主要是设置两个标志位
gp.preempt
和gp.stackguard0
主要起作用的是后者。通过将goroutine
的stackguard0
设置为(1<<(8*sys.PtrSize) - 1)& -1314
,导致P
在执行G
下一次的函数调用时,栈空间检查失败(stackguard0
与SP
寄存器比较),进而触发编译器安插的指令morestack
。
//以asm_amd64.s为例TEXT runtime·morestack(SB),NOSPLIT,$0-0... ...// Call newstack on m->g0's stack.MOVQm_g0(BX), BXMOVQBX, g(CX)MOVQ(g_sched gobuf_sp)(BX), SPCALLruntime·newstack(SB)CALLruntime·abort(SB)// crash if newstack returnsRET
morestack
会调用newstack
尝试栈扩容
//go:nowritebarrierrecfuncnewstack {... ...if preempt {if !canPreemptM(thisg.m) {// Let the goroutine keep running for now.// gp->preempt is set, so it will be preempted next time.gp.stackguard0 = gp.stack.lo _StackGuardgogo(&gp.sched) // never return}}... ...}//go:nosplitfunccanPreemptM(mp *m)bool{return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr.status == _Prunning}
newstack
在栈扩容前会检查抢占标志位mp.locks!=0
则不抢占。如果抢占成功,则会继续调用
gopreempt_m(gp)
进而调用goschedImpl(gp)
将P与当前m接触关联,设置goroutine
状态casgstatus(gp
,_Grunning, _Grunnable)
,然后将goroutine
插入Global runnable queue 等待下次调度。至此,应该能彻底明白为啥
runtime_procPin
能够通过修改goroutine
绑定的m
的locks
属性就能禁用抢占了。但是还有个问题,为啥GC也拿它没办法?
关于
Go
的GC
,大致有三种触发方式:
gcTriggerCycle 后台定时检查触发,如
runtime.sysmon
gcTriggerTimer 自上个GC周期超过forcegcperiod纳秒则触发 如
runtime.forcegchelper
g cTriggerHeap 申请的堆内存大小达到触发阈值 如
runtime.mallocgc
最终都会调用
gcStart(trigger gcTrigger)
,进而我们在GC的STW阶段执行中可以看到
func stopTheWorldWithSema {_g_ := getg// If we hold a lock, then we won't be able to stop another M// that is blocked trying to acquire the lock.if _g_.m.locks > 0 {throw("stopTheWorld: holding locks")}lock(&sched.lock)sched.stopwait = gomaxprocsatomic.Store(&sched.gcwaiting, 1)preemptall// stop current P_g_.m.p.ptr.status = _Pgcstop // Pgcstop is only diagnostic.sched.stopwait--// try to retake all P's in Psyscall statusfor _, p := range allp {s := p.statusif s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {if trace.enabled {traceGoSysBlock(p)traceProcStop(p)}p.syscalltick sched.stopwait--}}// stop idle P'sfor {p := pidlegetif p == nil {break}p.status = _Pgcstopsched.stopwait--}wait := sched.stopwait > 0unlock(&sched.lock)// wait for remaining P's to stop voluntarilyif wait {for {// wait for 100us, then try to re-preempt in case of any racesif notetsleep(&sched.stopnote, 100*1000) {noteclear(&sched.stopnote)break}preemptall}}// sanity checksbad := ""if sched.stopwait != 0 {bad = "stopTheWorld: not stopped (stopwait != 0)"} else {for _, p := range allp {if p.status != _Pgcstop {bad = "stopTheWorld: not stopped (status != _Pgcstop)"}}}if atomic.Load(&freezing) != 0 {// Some other thread is panicking. This can cause the// sanity checks above to fail if the panic happens in// the signal handler on a stopped thread. Either way,// we should halt this thread.lock(&deadlock)lock(&deadlock)}if bad != "" {throw(bad)}}
大致逻辑先调用
preemptall
尝试抢占所有的P
,然后停掉当前P
,遍历所有的P
,如果P
处于系统调用则直接stop
掉;然后处理空闲的P
;最后检查是否存在需要等待处理的P
,如果有则循环等待,并尝试调用preemptall
func preemptallbool{res := falsefor _, _p_ := range allp {if _p_.status != _Prunning {continue}if preemptone(_p_) {res = true}}return res}
到这里就很清晰了,我们又看到老朋友
preemptone(_p_)
,显然GC
会在STW
阶段等下去,GC
自然也无法执行下去。好了 刚刚两个问题我们已经搞清楚了。书归正传
runtime_procPin
能禁用P被抢占,那么runtime_procUnpin
自然能解除禁用。完成goroutine
与P
的绑定,返回了当前P
的id
,如果pid<p.localSize
则说明当前poolLocal已经存在 直接利用地址偏移拿到poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {lp := unsafe.Pointer(uintptr(l) uintptr(i)*unsafe.Sizeof(poolLocal{}))return (*poolLocal)(lp)}
如果运行时
P
被调整了呢?那么尝试下p.pinSlow
,正如其名这个过程会有点儿慢
func (p *Pool) pinSlow (*poolLocal, int) {// Retry under the mutex.// Can not lock the mutex while pinned.runtime_procUnpinallPoolsMu.Lockdefer allPoolsMu.Unlockpid := runtime_procPin// poolCleanup won't be called while we are pinned.s := p.localSizel := p.localif uintptr(pid) < s {return indexLocal(l, pid), pid}if p.local == nil {allPools = append(allPools, p)}// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.size := runtime.GOMAXPROCS(0)local := make([]poolLocal, size)atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-releaseatomic.StoreUintptr(&p.localSize, uintptr(size)) // store-releasereturn &local[pid], pid}
pinSlow
上来第一件事儿 将我们之前设置的P
禁用抢占给释放了。然后尝试获取全局排他锁allPoolsMu Mutex
。这也能解释它为啥上来就释放掉之前的禁止占用,因为获取当前全局排他锁不一定能立马拿到啊。拿到锁之后又开启了禁止抢占P,接着又判断了下uintptr(pid) < s
因为拿到锁之前P可能已经变化了。如果当前p.local=nil则将p放到全局的池子allPools *Pool
里,也是为啥刚才需要等待全局排他锁的原因。因为GC
时会将原有的pool清理掉所以这里进行重建,原有pool真的没了吗?这个就跟之前提到的victim
有点儿关系了 等会儿一起看。至此,我们拿到了
poolLocal
,接着获取对象的顺序为
首先尝试从本地的
private
中获取如果本地没拿到,则
x, _ = l.shared.popHead
尝试从共享空间拿
func (c *poolChain) popHead (interface{}, bool) {d := c.headfor d != nil {if val, ok := d.popHead; ok {return val, ok}// There may still be unconsumed elements in the// previous dequeue, so try backing up.d = loadPoolChainElt(&d.prev)}return nil, false}
共享空间是以
PoolChainElt
为节点的双向链表,首先我们尝试沿着双向链表prev
的方向依次调用d.popHead
尝试从头部拿数据
func (d *poolDequeue) popHead (interface{}, bool) {var slot *efacefor {ptrs := atomic.LoadUint64(&d.headTail)head, tail := d.unpack(ptrs)if tail == head {// Queue is empty.return nil, false}// Confirm tail and decrement head. We do this before// reading the value to take back ownership of this// slot.head--ptrs2 := d.pack(head, tail)if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {// We successfully took back slot.slot = &d.vals[head&uint32(len(d.vals)-1)]break}}val := *(*interface{})(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}// Zero the slot. Unlike popTail, this isn't racing with// pushHead, so we don't need to be careful here.*slot = eface{}return val, true}
逻辑也比较简单
2.1 将
headTail
拆封 如果head==tail表明当前环形数组为空,直接返回2.2 接着将head索引减1,然后将head、tail再打包回去,通过CAS判断当前没有并发修改就拿到数据 跳出循环 否则循环等待
2.3 将slot转为interface{}类型
2.4 将slot赋值为eface{}
如果共享空间依然没拿到,那么想办法从其他
P
那偷个吧p.getSlow(pid)
func (p *Pool) getSlow(pid int)interface{} {// See the comment in pin regarding ordering of the loads.size := atomic.LoadUintptr(&p.localSize) // load-acquirelocals := p.local // load-consume// Try to steal one element from other procs.for i := 0; i < int(size); i {l := indexLocal(locals, (pid i 1)%int(size))if x, _ := l.shared.popTail; x != nil {return x}}// Try the victim cache. We do this after attempting to steal// from all primary caches because we want objects in the// victim cache to age out if at all possible.size = atomic.LoadUintptr(&p.victimSize)if uintptr(pid) >= size {return nil}locals = p.victiml := indexLocal(locals, pid)if x := l.private; x != nil {l.private = nilreturn x}for i := 0; i < int(size); i {l := indexLocal(locals, (pid i)%int(size))if x, _ := l.shared.popTail; x != nil {return x}}// Mark the victim cache as empty for future gets don't bother// with it.atomic.StoreUintptr(&p.victimSize, 0)return nil}
3.1 拿到poolLocal数组,遍历每个poolLocal,并调用
l.shared.popTail
从其共享空间的尾部拿数据
func (c *poolChain) popTail (interface{}, bool) {d := loadPoolChainElt(&c.tail)if d == nil {return nil, false}for {// It's important that we load the next pointer// *before* popping the tail. In general, d may be// transiently empty, but if next is non-nil before// the pop and the pop fails, then d is permanently// empty, which is the only condition under which it's// safe to drop d from the chain.d2 := loadPoolChainElt(&d.next)if val, ok := d.popTail; ok {return val, ok}if d2 == nil {// This is the only dequeue. It's empty right// now, but could be pushed to in the future.return nil, false}// The tail of the chain has been drained, so move on// to the next dequeue. Try to drop it from the chain// so the next pop doesn't have to look at the empty// dequeue again.if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {// We won the race. Clear the prev pointer so// the garbage collector can collect the empty// dequeue and so popHead doesn't back up// further than necessary.storePoolChainElt(&d2.prev, nil)}d = d2}}
首先拿到尾节点,然后在死循环中沿着双向链表
next
的方向不断获取PoolChainElt
节点,尝试调用d.popTail
获取数据
func (d *poolDequeue) popTail (interface{}, bool) {var slot *efacefor {ptrs := atomic.LoadUint64(&d.headTail)head, tail := d.unpack(ptrs)if tail == head {// Queue is empty.return nil, false}ptrs2 := d.pack(head, tail 1)if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {slot = &d.vals[tail&uint32(len(d.vals)-1)]break}}val := *(*interface{})(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}slot.val = nilatomic.StorePointer(&slot.typ, nil)return val, true}
与
popHead
比较像,不同在于一个从头部拿数据一个从尾部拿。首先依然是在死循环中先将headTail
拆封,如果tai l==head表示环形数组为空,直接返回。否则将tail 1再封装好,同CAS规避并发问题 拿到数据则跳出循环,否则循环等待。这里有一个跟
popHead
不同的是 先将value置为nil然后利用CAS来将typ置空操作,原因很简单,
atomic.StorePointer(&slot.typ, nil)pushHead
和popTail
一个从头放一个从尾拿数据,一旦碰头就会出现竞争。3.2 那如果偷都偷不到,会进行以下操作
size = atomic.LoadUintptr(&p.victimSize)if uintptr(pid) >= size {return nil}locals = p.victiml := indexLocal(locals, pid)if x := l.private; x != nil {l.private = nilreturn x}for i := 0; i < int(size); i {l := indexLocal(locals, (pid i)%int(size))if x, _ := l.shared.popTail; x != nil {return x}}// Mark the victim cache as empty for future gets don't bother// with it.atomic.StoreUintptr(&p.victimSize, 0)
victim cache
翻译过来叫“受害者缓存”受害者缓存是由Norman Jouppi提出的一种提高缓存性能的硬件技术。如他的论文所述
Miss caching places a fully-associative cache between cache and its re-fill path. Misses in the cache that hit in the miss cache have a one cycle penalty, as opposed to a many cycle miss penalty without the miss cache. Victim Caching is an improvement to miss caching that loads the small fully-associative cache with victim of a miss and not the requested cache line.
大概意思就是在旧缓存和缓解重建的过程中,添加一个全关联的缓存(保存旧缓存数据)。也就是说当一级缓存踢出的数据,放到受害者缓存中。当我们在一级缓存未命中,则可以继续尝试从受害者缓存中查询。
如代码:
size = atomic.LoadUintptr(&p.victimSize)if uintptr(pid) >= size {return nil}locals = p.victiml := indexLocal(locals, pid)if x := l.private; x != nil {l.private = nilreturn x}for i := 0; i < int(size); i {l := indexLocal(locals, (pid i)%int(size))if x, _ := l.shared.popTail; x != nil {return x}}// Mark the victim cache as empty for future gets don't bother// with it.atomic.StoreUintptr(&p.victimSize, 0)
如果能理解,其实还是挺简单的,也就是
local1 ->GC ->local2 victim->local1
Local2 ->GC ->local3 victim->local2
很遗憾getSlow也没拿到 那只好自己手动new一个了
if x == nil && p.New != nil {x = p.New}
用完返回Pool p.Put
看完
Get
,接着看下Put
func (p *Pool) Put(x interface{}) {if x == nil {return}// 将goroutine与P绑定 runtime_procPin禁用抢占 返回poolLocall, _ := p.pinif l.private == nil {//优先放到私有空间l.private = xx = nil}if x != nil { //放回共享空间l.shared.pushHead(x)}// 解除抢占禁用runtime_procUnpin}
基本逻辑:
如果放入对象为空 直接返回
调用
p.pin
获取poolLocal
之前分析过大体类似优先放入私有空间
若私有空间已满 则尝试放入共享空间
释放P禁止占用
func (c *poolChain) pushHead(val interface{}) {d := c.headif d == nil {// Initialize the chain.const initSize = 8 // Must be a power of 2d = new(poolChainElt)d.vals = make([]eface, initSize)c.head = dstorePoolChainElt(&c.tail, d)}if d.pushHead(val) {return}newSize := len(d.vals) * 2if newSize >= dequeueLimit {// Can't make it any bigger.newSize = dequeueLimit}d2 := &poolChainElt{prev: d}d2.vals = make([]eface, newSize)c.head = d2storePoolChainElt(&d.next, d2)d2.pushHead(val)}
putHead
逻辑主要是将对象放到双向链表的对应节点的环形数组中。
先获取双向链表的head节点
若head节点为空 则初始化head节点 节点对应环形数组初始大小为8
将对象放到环形数组中
func (d *poolDequeue) pushHead(val interface{})bool{ptrs := atomic.LoadUint64(&d.headTail)head, tail := d.unpack(ptrs)if (tail uint32(len(d.vals)))&(1<<dequeueBits-1) == head {// Queue is full.return false}slot := &d.vals[head&uint32(len(d.vals)-1)]typ := atomic.LoadPointer(&slot.typ)if typ != nil {// popTail可能还没处理完return false}// The head slot is free, so we own it.if val == nil {val = dequeueNil(nil)}*(*interface{})(unsafe.Pointer(slot)) = valatomic.AddUint64(&d.headTail, 1<<dequeueBits)return true}
跟
popHead
是相反的操作,大体也比较简单。先判断环形数组是否满了,满了则直接返回。因为pushHead
跟popTail
存在竞争关系,slot.typ
不为空可能是popTail
还没处理完。
关于GC清除数据问题
pool.go
中的init函数组册了GC发生时如何清理Pool的函数,调用链如下
gcTrigger
->gcStart
->clearpools
->poolCleanup
func init {runtime_registerPoolCleanup(poolCleanup)}//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanupfunc sync_runtime_registerPoolCleanup(f func) {poolcleanup = f}funcpoolCleanup {for _, p := range oldPools {p.victim = nilp.victimSize = 0}for _, p := range allPools {p.victim = p.localp.victimSize = p.localSizep.local = nilp.localSize = 0}oldPools, allPools = allPools, nil}
逻辑很简单 正如上面讲
victim
说的那样。最后的最后,细心的你可能发现 还遗漏了两个细节
noCopy
sync.Pool
结构体中noCopy
其实是为了防止sync.Pool
使用过程中被拷贝。至于原因应该不用多说,因为Go
并没有提供原生的强制不能拷贝的方法。所以采用这种方式,让go vet
检测报错来实现。举个例子
type noCopy struct{}// Lock is a no-op used by -copylocks checker from `go vet`.func (*noCopy)Lock {}func (*noCopy)Unlock {}type People struct {noCopy noCopy}funcsay(p People) {}funcmain {var p Peoplesay(p)}
go vet demo.go
输出:
# command-line-arguments./demo.go:12:12: say passes lock by value: command-line-arguments.People contains command-line-arguments.noCopy./demo.go:18:6: call of say copies lock value: command-line-arguments.People contains command-line-arguments.noCopy
当然直接执行不会报任何错
pad
type poolLocal struct {poolLocalInternal// Prevents false sharing on widespread platforms with// 128 mod (cache line size) = 0 .pad [128 - unsafe.Sizeof(poolLocalInternal{})8]byte}
pad
字段在这里没有啥业务意思,目的就是为了避免伪共享
问题。因为我们为了缓解计算机CPU计算速度和内存的读取速度不匹配的矛盾,在他们之间增加了L1 L2 L3 高速缓存,他们比内存小很多但是速度却是内存无法比拟的。
缓存系统中我们是以缓存行(cache line)为单位,通常大小为64字节。上面这张图,我们可以看到L1、L2、L3三级缓存他们和内存的读取速度当然取决于他们与CPU紧密程度。L1>L2>L3>内存
但是!我们现在使用的都是多核CPU的计算机,如何保证多核看到的数据的一致性呢?这里我们需要谈到一个协议-MESI协议,M、E、S、I分别表示缓存行的4个状态
M(修改,Modified):本地处理器已经修改缓存行,即是脏行,它的内容与内存中的内容不一样,并且此 cache 只有本地一个拷贝(专有);
E(专有,Exclusive):缓存行内容和内存中的一样,而且其它处理器都没有这行数据;
S(共享,Shared):缓存行内容和内存中的一样, 有可能其它处理器也存在此缓存行的拷贝;
I(无效,Invalid):缓存行失效, 不能使用。
他们转换关系如下:
现在假设我们有以下场景
有两个变量X、Y共享在了一个
cache line
中。如果core1想要更新X,core2想要更新Y,更新完他们的缓存行都变成了I状态,即L1 L2上的缓存均不可用,这时如果其他线程再要访问X Y就只能从L3甚至从内存拿数据,其性能可想而知。怎么解决呢?
解决伪共享的问题 业界大多采用pad填充的方式来解决,让数据独占一个cacheline 降低数据关联共享的影响。比如Java8还提供了语法糖,通过添加注解
@Contended
自动进行缓存行填充。总结
sync.Pool
实现总体比较小巧,具体思想其实其他语言也都有影子,比如Java中的ForkJoinPool
。但是往往简单设计的细节往往很值得我们去考究学习一下的。总结下知识点还真不少:
work stealing算法
CAS如何做到lock-free
设置抢占标志 禁止P被占用 并制止GC
Victim cache 受害者缓存是怎么回事儿
noCopy是干啥的 怎么实现禁止拷贝
伪共享(false share)
Pool GC的机制
不过这也符合Go“少即是多”的设计理念。
,