Go-25-PoolDequeue

在学习Golang 中 的内存池 Sync.Pool 之前,底层是使用的 一个无锁动态扩容双端队列支持 单生产者 多消费者Sync.Pool 内存分配模式,它是一个私有结构体,所以外部无法访问它,这里来看看如何实现的

代码:src/sync/poolqueue.go

带着问题看世界

  1. 如何动态扩容的
  2. 为什么内存池要是用这样的 数组 + 链表

poolDequeue

首先看一下基本结构结构体,使用数组实现了一个 ring buffer 的结构

PoolDequeue1
1
2
3
4
5
6
7
8
9
10
11
12
type poolDequeue struct {    
// headTail 包含了 32bit的头部索引与32位尾部索引指向 vals
// head 高 32 位, 指向下一个存放对象的索引
// tail 低 32 位, 指向队列中最早(下一个读取)的对象索引
// 索引区间 tail <= i < head, 是消费者可以在该区间不断获取对象,直至获取到的对象为 nil
headTail uint64

// vals 表示队列元素容器,大小必须为 2 的 N 次幂
// 容器会在初始化时指定容量,实现数据元素内存预初始化
// 队列会将未使用的槽位设置为nil
vals []eface
}

为什么要将 headtail 合并到一个变量里面?

当队列中存在多个对象,两边同时操作貌似没什么问题,但为了防止队列中仅剩一个对象时,就需要锁住两个索引进行操作,所以利用了 atomic 包的提供的 CAS 操作,完成两个字段的 lock free 无锁编程

更新 headtail 两个字段的时候,也是通过 CAS + 位运算 进行操作的。更新逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//dequeueNil 是用于在 poolDequeue 中表示 interface{}(nil) 的类型。由于使用 nil 来表示空槽位,所以需要一个特殊值来表示 nil。
type dequeueNil *struct{}

func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1 //获取掩码
head = uint32((ptrs >> dequeueBits) & mask) //获取头部索引
tail = uint32(ptrs & mask) //获取尾部索引
return
}

func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}

unpackpack 实现了在 poolDequeue 结构中进行索引值的打包和解包操作。使得在 poolDequeue 结构中使用单个 64 位整数来同时存储头部索引和尾部索引

所以整体就如下图所示

PoolDequeue3

接着是对队列的操作,分为 头部写入头部弹出尾部弹出

pushHead

头部写入返回成功失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (d *poolDequeue) pushHead(val any) bool {
//1. 原子操作获取索引
ptrs := atomic.LoadUint64(&d.headTail)
//2. 解析头尾索引
head, tail := d.unpack(ptrs)
//3. (当尾部索引 + 队列长度)& 掩码 == 头部索引,说明队列满了
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
//4. 没有满则获取头部位置
slot := &d.vals[head&uint32(len(d.vals)-1)]

//5. 判断头部是否已经被释放
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}

// The head slot is free, so we own it.
//如果存放的是nil,那么使用 dequeueNil(nil) 表示存放的是一个nil类型
if val == nil {
val = dequeueNil(nil)
}
*(*any)(unsafe.Pointer(slot)) = val

//因为头部索引是高32位,所以 增加 1<<dequeueBits,同理尾部索引是低32,直接减1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}

如果 type == nil,则表示 slot 已经被释放,如果 value = dequeueNil(nil) 表示存放的是一个 nil (但注意,Sync.PoolPut(nil) 会直接返回,而不会真的存放一个 nil)

popHead

返回头部索引位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (d *poolDequeue) popHead() (any, bool) {
var slot *eface
//使用 CAS 不断尝试获取头部值
for {
//加载并解析索引获取头部位置,判断是否为空
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}

//计算更新后的索引值
head--
ptrs2 := d.pack(head, tail)
//CAS 更新
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
//获取头部位置
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
// 如果 不是 ptrs 所以已经发生了变化那么重试
}

val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) { //如果存放的是nil,那么返回nil
val = nil
}

//将槽置位 zero,为什么不会与pushHead冲突
*slot = eface{}
return val, true
}

popTailpopHead差不多,最后的赋值有区别

1
2
slot.val = nil	//直接赋值空值
atomic.StorePointer(&slot.typ, nil) //表示为空

为什么 popTailpopHead 处理方式不一样,是因为设计的就是单个生产者与多个消费者,所以popHead并不会有竞争的问题,最后都是存放的 slot{}

poolChainElt

poolChainElt 是一个双向链表的节点,包含林一个每个节点 poolDequeue 结构 以及 双向指针

1
2
3
4
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}

poolChain

有了双向链表节点,那么就有双向链表 poolChain

1
2
3
4
5
6
7
type poolChain struct {
//由于只有生产者访问该字段,因此不需要同步操作
head *poolChainElt

//由于消费者访问该字段,因此读写必须是原子操作
tail *poolChainElt
}

poolChain 是一个双向链表队列,其中每个 poolDequeue 的大小是前一个 poolDequeue 的两倍。一旦一个 poolDequeue 填满,就会分配一个新的 poolDequeue,并且只会将数据推入到最新的 poolDequeue。弹出操作发生在列表的另一端,当一个 poolDequeue 被耗尽后,它会从链表中移除。

PoolDequeue4

通过这种设计,poolChain 实现了一个可以动态增长的队列,以适应对象池中的元素数量变化。它能够高效地管理多个不同大小的队列,并提供生产者和消费者之间的并发访问。生产者将数据推入到最新的 poolDequeue,而消费者从链表的另一端弹出数据。这样可以避免竞争条件和锁等待,并提高并发性能

pushHead

有了双向链表之后,再来看它是如何写入一个值的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (c *poolChain) pushHead(val any) {
//1. 获取双向链表头部节点
d := c.head
if d == nil {
//2. 如果头部节点是空的,说明整个链表都是空的,那么就需要初始化一个链表节点

const initSize = 8 // 默认大小是8,必须是2的倍数
d = new(poolChainElt)
d.vals = make([]eface, initSize) //节点popDequeue的默认大小是8
c.head = d //双向链表的的头部指针指向这个节点
storePoolChainElt(&c.tail, d) //将节点追加到双向链表尾部
}

//将值写入头部节点的内存池队列中,如果成功就结束
if d.pushHead(val) {
return
}

//如果到这里,说明插入失败了,那么就会创建一个更大(2倍)的内存池队列 poolDequeue
newSize := len(d.vals) * 2
if newSize >= dequeueLimit { //最大不能超过 1 << 32 / 4 = 1 << 30
newSize = dequeueLimit
}


d2 := &poolChainElt{prev: d} //新建节点前驱是 d
d2.vals = make([]eface, newSize) //构建指定大小的切片
c.head = d2 //将d2设置为头部节点
storePoolChainElt(&d.next, d2) //d的后继为d2
d2.pushHead(val)
}

按照效果如下

1
[8] -- [16] -- [32] -- head

popHead

1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *poolChain) popHead() (any, bool) {
//获取头部节点
d := c.head
for d != nil {
//获取队列头部节点
if val, ok := d.popHead(); ok {
return val, ok
}
// 到这里是没有获取到,然后指向前驱继续获取
d = loadPoolChainElt(&d.prev)
}
return nil, false
}

popTail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (c *poolChain) popTail() (any, bool) {
//1. 加载尾部队列节点,如果没有表示链表为空则返回 false
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}

for {
// 记录尾部的后驱
d2 := loadPoolChainElt(&d.next)

// 弹出队列尾部值
if val, ok := d.popTail(); ok {
return val, ok
}
// 如果 d 没有了,d2 也为空,说明没有值
if d2 == nil {
return nil, false
}


// CAS 尝试获取 d
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
//如果获取到那么去掉前继
storePoolChainElt(&d2.prev, nil)
}
//更新尾部指针
d = d2
}
}

总结:

  • 无锁的原因是队列形式,单个生产者在头部操作,消费者从队尾消费
  • 并不是固定大小,而是作为双向链表节点的队列最大长度是 1 << 32/ 4 ,并且用环形缓冲区 ring buffer 实现
  • Pool 底层 使用 数组 + 链表的形式的原因是由 Pool 的特性决定的,它需要频繁的内存分配,所以数组是一个好的选择。又为了解决扩容的问题,使用链表来连接数组

参考文档

  1. https://mp.weixin.qq.com/s/dLzWAqM9lCln83jhkvmtMw
  2. https://geektutu.com/post/hpg-sync-pool.html
  3. https://studygolang.com/articles/28386
  4. https://juejin.cn/post/7213257917255385149