Go-27-SyncPool

前面一篇讲解了Sync.Pool的底层数据结构 poolDequeue,接着看看Sync.Pool的具体实现原理。如果想看看 Sync.Pool 的使用 可以看看我的 Go 入门 26-Issue,使用的时候最好分配固定大小的对象否则注意清理

带着问题看世界

  1. Sync.PoolGoroutine 的关系
  2. Sync.Pool 是如何释放的(并没有主动释放接口)

代码:src/sync/pool.go,1.20版本

首先来看内存池的说明,翻译过来就是就是:

  1. 存放在Pool 中的元素任何时候都有可能在没有被其他引用的情况下释放掉
  2. Pool 是并发安全的
  3. 使用 Pool 之后不能再复制它。假设缓存池对象 A 被对象 B 拷贝了,如果 A 被清空,B 的缓存对象指针指向的对象将会不可控

先来看看整体的结构图

syncpools1

全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var poolRaceHash [128]uint64

var (
allPoolsMu Mutex //互斥锁,用于更新 allPools,因为所有的 sync.pool 都存放在里面

// allPools is the set of pools that have non-empty primary
// caches. Protected by either 1) allPoolsMu and pinning or 2)
// STW.
allPools []*Pool

// oldPools is the set of pools that may have non-empty victim
// caches. Protected by STW.
oldPools []*Pool
)
  • allPoolsMu:用于对allPools的更新进行保护
  • allPools: []*Pool切片,存储具有非空私有缓存的对象池。可以被多个 goroutine 访问
  • oldPools[]*Pool类型的切片,用于存储可能具有非空受害者缓存的对象池,由于只有在 STW 时候才会更新,不会被并发访问

基本结构

1
2
3
4
5
6
7
8
9
10
11
12
type Pool struct {
noCopy noCopy // 提示不要复制

local unsafe.Pointer // 指向每个 P(处理器) 的本地池的指针。它的类型是 [P]poolLocal,其中P是处理器数量
localSize uintptr // 本地数组的大小

victim unsafe.Pointer // GC上一个周期中的本地池的指针。在当前周期中,这个本地池变成了受害者。它的类型是 unsafe.Pointer
victimSize uintptr // 受害者缓存数组的大小,以字节为单位

//可选地指定一个函数,当 Get 方法本来会返回 nil 时,可以用于生成一个值。
New func() any
}
  • noCopy 用于提示不要进行对象复制

  • localvictim 的关系在后面 内存池清理 中说明

  • New 则是自定义的分配对象函数

noCopy

noCopy 支持 使用 go vet 检查对象是否被复制,它是一个内置的空结构体类型,当然也可以自行实现类似功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type noCopy struct{}

func (n noCopy) Lock() {
//TODO implement me
panic("implement me")
}

func (n noCopy) Unlock() {
//TODO implement me
panic("implement me")
}

var _ sync.Locker = (*noCopy)()

type Person struct {
noCopy
age int
height int
}

它第一次使用后不能被复制,其实代码是能够编译通过运行的,只是在 go vet 或者 部分编辑器会提示而已。可以看看没有成为标准的原因 https://golang.org/issues/8005#issuecomment-190753527

poolLocal

每个处理器 P 都有一个 poolLocal 的本地池对象

1
2
3
4
5
6
7
8
9
10
11
12
13
// 每个 P 的本地对象池
type poolLocalInternal struct {
private any // 仅能被当前 P 进行读写
shared poolChain // 共享,当前 P 能对其进行 pushHead/popHead; 任何 P 都能执行popTail
}

type poolLocal struct {
poolLocalInternal

// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
  • private 是一个仅用于当前 P 进行读写的字段(即没有并发读写的问题)
  • shared 可以在多个 P 之间进行共享读写,是一个 poolChain 链式队列结构, 当前 P 上可以进行 pushHeadpopHead 操作(队头读写), 在所有 P 上都可以进行 popTail (队尾出队)操作
  • pad 用于 伪共享 保证 poolLocal 的大小是 128 字节的倍数

runtime_procPin

1
2
3
4
5
6
7
8
9
10
11
12
13
//go:nosplit
func procPin() int {
gp := getg() //获取当前goroutine的指针
mp := gp.m //当前goroutine 对应的 协程代表

mp.locks++ //这个变量用于跟踪当前线程(m)持有的锁的数量
return int(mp.p.ptr().id) //代表当前线程(m)执行的处理器(p)的唯一标识符
}

func procUnpin() {
gp := getg()
gp.m.locks--
}
  • locks:通过增加锁的计数,表明当前线程被固定(pinned)在处理器上
  • mp.p表示当前线程所绑定的处理器,.ptr()方法返回处理器的指针,.id表示处理器的唯一标识符

pinSlow

将当前的goroutine绑定到Pool中的一个poolLocal上,并返回该poolLocal及其索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (p *Pool) pinSlow() (*poolLocal, int) {
//1. 解除当前goroutine与之前绑定的poolLocal的绑定关系,使当前goroutine可以进行重新绑定
//使用 mutex 时候 P 必须可抢占
runtime_procUnpin()

//2. 获取池的全局锁
allPoolsMu.Lock()
defer allPoolsMu.Unlock()

//3. 将当前的goroutine绑定到一个poolLocal上,并返回其索引pid。
//再次固定 P 时 poolCleanup 不会被调用
pid := runtime_procPin()

//获取未越界返回poolLocal
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}

// 如果数组为空,将其添加到 allPools,垃圾回收器从这里获取所有 Pool 实例
if p.local == nil {
allPools = append(allPools, p)
}

// 根据 P 数量创建 slice,如果 GOMAXPROCS 在 GC 间发生变化
// 我们重新分配此数组并丢弃旧的
size := runtime.GOMAXPROCS(0) // 获取当前的GOMAXPROCS值,即当前系统的最大并发数
local := make([]poolLocal, size) //创建本地池

// 将底层数组起始指针保存到 p.local,并设置 p.localSize
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
runtime_StoreReluintptr(&p.localSize, uintptr(size))

// 返回所需的 pollLocal
return &local[pid], pid
}

这个逻辑的前提是当前 P 已经发生了动态调整,需要重新计算localPool

  • 首先解除 goroutineProcess 的绑定,让 goroutine 可以重新绑定
  • 获取 allPools的全局锁
  • 将当前 goroutineProcess 重新绑定
  • 如果Process 未发生变化,返回 ProcesslocalPool
  • 如果没有变化则
    • 如果 Pool.local为空,则需要将 Pool 加入到 allPools 中,用于 GC 扫描回收
    • 重新创建 [p]poolLocal
    • 重新将 Pool.local 指向 [p]poolLocal

pin

获取当前 Process 中的 poolLocal,将当前的goroutine绑定到一个特定的Process上,禁用抢占并返回ProcesspoolLocal本地池和P的标识

1
2
3
4
5
6
7
8
9
10
11
12
13
func (p *Pool) pin() (*poolLocal, int) {
// 1. 将当前的goroutine绑定到一个P上,并返回P的标识pid。
pid := runtime_procPin()

s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
//可能存在动态的 P(运行时调整 P 的个数)procresize/GOMAXPROCS,如果 P.id 没有越界,则直接返回
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}

return p.pinSlow()
}

尝试通过加载locallocalSize字段的方式来判断是否可以直接返回一个可用的poolLocal,如果不满足条件,则调用pinSlow方法来重新分配并返回一个新的poolLocal。调用者在使用完poolLocal之后,必须调用runtime_procUnpin()来解除与P的绑定关系。

1
2
3
4
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}

内存池清理

在使用 init 仅执行了一个逻辑,就是注册内存池回收机制

1
2
3
func init() {
runtime_registerPoolCleanup(poolCleanup)
}

poolCleanup 用于实现内存池的清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func poolCleanup() {
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).

// Drop victim caches from all pools.
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}

// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}

垃圾回收的策略就是

  • oldPools 中也就是所有localPoolvictim 对象丢弃
  • allPoolslocal 复制给victim,并 local 重置
  • 最后将 allPools 复制给 oldPoolsallPools 置空

Get

整体流程如下

syncpool4
  1. 首先获取当前 ProcesspoolLocal也就是说当 Goroutine 在哪个 Process 运行的时候就会从哪个 Process 的 localPool中获取对象
  2. 优先从 private 中选择对象,并将 private = nil
  3. 若取不到,则尝试从 shared 队列的队头进行读取
  4. 若取不到,则尝试从其他的 Process 中进行偷取 getSlow(跨 Process 读写)
  5. 若还是取不到,则使用自定义的 New 方法新建对象

获取对象代码如下操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (p *Pool) Get() any {
//...race
//1. 获取一个poolLocal
l, pid := p.pin()
//2. 先从private获取对象
x := l.private
l.private = nil
if x == nil {
// 尝试从 localPool 的 shared 队列队头读取
x, _ = l.shared.popHead()
if x == nil {
// 如果取不到,则获取新的缓存对象
x = p.getSlow(pid)
}
}
runtime_procUnpin()
//...race
// 如果 getSlow 还是获取不到,则 New 一个
if x == nil && p.New != nil {
x = p.New()
}
return x
}

其中的 getSlow 就是从 其他 Process 或者 victim 中获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (p *Pool) getSlow(pid int) any {
//1. 加载本地池
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
//2. 尝试从其他 process 偷取元素
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

//3. 从其他 process 也没有偷到
//那么判断是否 process数量发生了变化
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
//4. 从 victim 中获取
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

//到这里说明没有了,那么就将 victimSize 设置为 0后续不会访问
atomic.StoreUintptr(&p.victimSize, 0)

return nil
}

Put

Put 的操作如下

syncpool5

存放策略是:

  1. 如果存放 nil 直接返回
  2. 获取当前 ProcesspoolLocal
  3. 如果 private == nil 则放到 private
  4. 如果private != nil 则将起放入到 链表头部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (p *Pool) Put(x any) {
if x == nil {
return
}
//...race
// 获得一个 localPool
l, _ := p.pin()
// 优先放入private
if l.private == nil {
l.private = x
} else {
l.shared.pushHead(x)
}
runtime_procUnpin()
//...race
}

总结

  1. Pool 本质是为了提高临时对象的复用率
  2. Pool 使用两层回收策略(local + victim)避免性能波动;
  3. Pool 本质是一个杂货铺属性,啥都可以放,Pool 池本身不做限制;
  4. Pool 池里面 cache 对象也是分层的,一层层的 cache,取用方式从最热的数据到最冷的数据递进;
  5. Pool 是并发安全的,但是内部是无锁结构,原理是对每个 P 都分配 cache 数组( poolLocalInternal 数组),这样 cache 结构就不会导致并发;
  6. 永远不要 copy 一个 Pool,明确禁止,不然会导致内存泄露和程序并发逻辑错误;
  7. 代码编译之前用 go vet 做静态检查,能减少非常多的问题;
  8. 每轮 GC 开始都会清理一把 Pool 里面 cache 的对象,注意流程是分两步,当前 Pool 池 local 数组里的元素交给 victim 数组句柄,victim 里面 cache 的元素全部清理。换句话说,引入 victim 机制之后,对象的缓存时间变成两个 GC 周期;
  9. 不要对 Pool 里面的对象做任何假定,有两种方案:要么就归还的时候 memset 对象之后,再调用 Pool.Put ,要么就 Pool.Get 取出来的时候 memset 之后再使用;

参考文档

  1. https://mp.weixin.qq.com/s/dLzWAqM9lCln83jhkvmtMw
  2. https://geektutu.com/post/hpg-sync-pool.html
  3. https://golang.design/under-the-hood/zh-cn/part1basic/ch05sync/pool/
  4. https://xie.infoq.cn/article/55f28d278cccf0d8195459263