Go-08-Channel

channel底层数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引(第几个位置 1 ~ len)
sendx uint // send index
// 已接收元素在循环数组中的索引第几个位置 1 ~ len)
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}

其中需要注意的是:

  • buf指向底层循环数组,如果是非缓冲则指向hchan地址

  • sendqrecvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞(部分通过直接复制memmove的方式传输未被阻塞,也就不在链表中)

  • waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装

    1
    2
    3
    4
    type waitq struct {
    first *sudog
    last *sudog
    }
  • lock 用来保证每个读 channel 或写 channel 的操作都是原子的

chan data structure

chan初创建

chan的创建分为含有缓冲区和非缓冲区,都通过 func makechan(t *chantype, size int64) *hchan实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}

return makechan(t, int(size))
}

func makechan(t *chantype, size int) *hchan {
elem := t.elem

//1. 安全检查
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

//2. 元素分配是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0:
//a. 非缓冲或者元素大小为0(struct{})
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
//b. 不含有指针,buf指向数组起始阶段
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
//c. 元素含有指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
...
return c
}

以上有几点注意:

  1. 针对创建不同的chan初始化过程
  • 非缓冲chan 只需要分配一个hchanSize
  • 缓冲区chan 不含有指针,则分配一个连续的内存
  • 缓冲区chan 含有指针,分别分配hchan和 指针需要的内存大小
  1. chan 分配在堆中,返回一个指针*hchan
  2. sendq 和 recvq 链表由goroutine初始化时候创建,不在这里创建

在这里插入图片描述

chan接收

接收的操作有两种写法

1
2
3
4
5
6
7
8
9
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
  1. received 反应channel是否被关闭
  2. 接收值会放到elem所指向的指针,如果忽略接收值,则elem为nil
  3. 第三个参数 都使用 true 表示阻塞模式,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
1. 如果 ep 是 nil,说明忽略了接收值。
2. 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
3. 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
*/
//判断hchan是否有数据
func empty(c *hchan) bool {
// dataqsiz 一旦初始化chan就不会变化
if c.dataqsiz == 0 {
//如果是非缓冲,当存储发送挂起协程的sendq链表为空,表示没有数据
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
//缓冲区则判断是否有数据
return atomic.Loaduint(&c.qcount) == 0
}

// chanbuf(c, i) 指向c的缓冲区第i个元素
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
//忽略debug

//如果是 nil的channel
if c == nil {
//如果不阻塞,直接返回 (false, false)
if !block {
return
}
//否则,接收一个nil的channel, goroutine 挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
//不会执行到这里
throw("unreachable")
}

// 非阻塞模式下,快速检查不用获取锁
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 { //如果chan没有关闭,返回(false, false)
return
}

//这里又做了一次为空判断,防止在关闭检查的时候收到的数据
if empty(c) {
// The channel is irreversibly closed and empty.
if ep != nil { //如果为空,则返回一个默认值
typedmemclr(c.elemtype, ep)
}
return true, false
}
}

lock(&c.lock) //获取原子锁

//如果关闭了,且不含有缓冲数据,则ep指向默认值,返回(true, false)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

//则获取sendq中阻塞的发送协程进行接收
if sg := c.sendq.dequeue(); sg != nil {
//找到一个等待的发送人。 如果缓冲区大小为 0,则接收值直接来自发送者。
//否则,从队列头接收并将发送者的值添加到队列的尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

//如果没有发送者,且当前缓冲区有数据,
//则将数据复制给ep,且清理对应位置缓冲区数据,qcount--
if c.qcount > 0 {
//1. 获取对应缓冲区数据
qp := chanbuf(c, c.recvx)
//2. 将数据复制给ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//3. 清理对应的缓冲区数据
typedmemclr(c.elemtype, qp)
//4. 接收索引+1,如果 等于缓冲区大小,表示结束,从头开始
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//4. 缓冲区数目-1 并释放原子锁
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

//没有数据,也没有发送者,则将要阻塞这个协程
gp := getg() //获取当前协程指针
mysg := acquireSudog()

// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
//保存代接收数据的地址
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
//进入等待链表中
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//将当前goroutine 挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

//当goroutine被唤起
if mysg != gp.waiting { //当前协程等待的不是创建的 mysg
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

需要注意的是

  1. 同一个 chan 不能被重新开启
  2. 在缓冲区chan buf满了的情况下,发送协程阻塞,则接收者还是会优先处理缓冲区数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//如果是非缓冲型的,就直接从发送者的栈拷贝到接收者的栈
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

//接收数据的时候发现有协程阻塞
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
//非缓冲直接从sender拷贝到ep
recvDirect(c.elemtype, sg, ep)
}
} else {//非缓冲区
//1. 获取缓冲区数据
qp := chanbuf(c, c.recvx)
// 将数据从缓冲区拷贝给receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将sender的数据拷贝到缓冲区
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++ //索引+1,则刚才加了数据需要循环一遍才能拿到
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送的 goroutine。需要等到调度器的光临
goready(gp, skip+1)
}
chan state

chan发送

ch <- 3 最终会转换成 chansend 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
func full(c *hchan) bool {
if c.dataqsiz == 0 {
//如果是非缓冲区,且没有接收协程,则表示满了
return c.recvq.first == nil
}
//如果是缓冲区,当缓冲数据等于缓冲区大小 则表示满了
return c.qcount == c.dataqsiz
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//如果为空
if c == nil {
if !block { //非阻塞模式下
return false
}
//阻塞模式下,直接挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

//...

// 如果 channel 未关闭且 channel 没有多余的缓冲空间。直接返回false
if !block && c.closed == 0 && full(c) {
return false
}

//1. 加锁
lock(&c.lock)

//如果channel 关闭,则直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

//如果接收协程阻塞存在,说明缓冲区没有数据,直接将sender数据拷贝给receiver
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

//如果有缓冲空间
if c.qcount < c.dataqsiz {
//1. 直接获取缓冲发送位置
qp := chanbuf(c, c.sendx)

//2. 将数据直接拷贝进缓冲区
typedmemmove(c.elemtype, qp, ep)
//3. 发送位置+1(表示下一个进来的数据存放的位置)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//3. 数量+1
c.qcount++
unlock(&c.lock)
return true
}

//非阻塞模式直接返回false
if !block {
unlock(&c.lock)
return false
}

//阻塞当前协程
gp := getg()
mysg := acquireSudog()
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) //将协程加入到等待队列中
atomic.Store8(&gp.parkingOnChan, 1)
//挂起当前协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 确保正在发送的值保持活动状态,直到接收者复制它
KeepAlive(ep)

// 被唤起
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被唤醒后,channel 关闭了。坑爹啊,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}

需要注意的是:

  1. KeepAlive 的作用和原理是?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
if sg.elem != nil {
// 直接拷贝内存(从发送者到接收者)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// sudog 上绑定的 goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒接收的 goroutine. skip 和打印栈相关,暂时不理会
goready(gp, skip+1)
}

// 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
// 接收数据,都会导致一个 goroutine 直接操作另一个 goroutine 的栈
// 由于 GC 假设对栈的写操作只能发生在 goroutine 正在运行中并且由当前 goroutine 来写
// 所以这里实际上违反了这个假设。可能会造成一些问题,所以需要用到写屏障来规避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈

// 直接进行内存"搬迁"
// 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
// 就不能修改真正的 dst 位置的值了
// 因此需要在读和写之前加上一个屏障
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

关闭chan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func closechan(c *hchan) {
if c == nil { //关闭一个空chan会panic
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 { //关闭一个关闭的chan会panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

c.closed = 1 //修改chan状态为关闭

var glist gList

// release all readers
for {
//1. 所有接收协程出链表
sg := c.recvq.dequeue()
if sg == nil {
break
}
//2. 如果元素不为空,则清除并设置为空
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
glist.push(gp)
}

// release all writers (they will panic)
for {
//从发送链表中获取协程
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
//形成协程链表
glist.push(gp)
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
// 取最后一个
gp := glist.pop()
gp.schedlink = 0
//环形协程
goready(gp, 3)
}
}

chan应用

  1. 针对不同的chan会有不同的效果:

image-20210703131831903

  1. 控制并发数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    var limit = make(chan int, 3)

    func main() {
    // …………
    for _, w := range work {
    go func() {
    limit <- 1
    w()
    <-limit
    }()
    }
    // …………
    }

参考链接

  1. Go问题集
  2. https://qcrao.com/2019/07/22/dive-into-go-channel/