go源码阅读_Channel
最近代码中出现了一个十分诡异的 bug,找了半天才发现原来是因为 channel 的使用不当所导的.channel 有一个特性是读一个有缓冲区且已关闭的 channel 时,会一直返回默认值.可能是 0,"",或 nil,因为我使用了一个 for{select <-message}直接导致程序无限触发.要想更好的使用 channel 我们需要了解它内部的实现原理,下面就一起看看 channel 的实现.因为网上关于 channel 的文章十分之多,所以这里主要做的还是个人学习总结的为主.
channel 是 golng 中十分重要的一个特性.认识 go 的开始就会了解到它的设计哲学, go 的设计哲学基于 CSP 并发模型,其便是通过 channel 来实现这一模型的.除了 channel,goroutine 也是整个设计中重要的组成部分.在 go 中不光可以使用 Mutex 等锁来进行同步,还可以使用 channel 来进行消息流控制的同步操作.
Channel 源码阅读
Channel 的使用
这里并没有直接将 channel 的源码展示出来,让我们从 channel 的基本使用方式着手进行深入. 每个人都可以简单的写出这样的代码:
func main() {
messages := make(chan string)
go func() { messages <- "ping" }()
msg := <-messages
fmt.Println(msg)
}
Channel struct
这里使用 make(chan string) 来创建一个不包含缓冲区的 channel.我们可以很简单的通过生成汇编看到这行代码其实如下:
0x002f 00047 (test.go:7) LEAQ type.chan string(SB), AX
0x0036 00054 (test.go:7) PCDATA $0, $0
0x0036 00054 (test.go:7) MOVQ AX, (SP)
0x003a 00058 (test.go:7) MOVQ $0, 8(SP)
0x0043 00067 (test.go:7) CALL runtime.makechan(SB)
但是如果我们想要创建一个带缓冲 buffer 的 channel,需要使用 make(chan string, 5) 来进行创建.生成的汇编代码如下:
0x002f 00047 (test.go:7) LEAQ type.chan string(SB), AX
0x0036 00054 (test.go:7) PCDATA $0, $0
0x0036 00054 (test.go:7) MOVQ AX, (SP)
0x003a 00058 (test.go:7) MOVQ $5, 8(SP)
0x0043 00067 (test.go:7) CALL runtime.makechan(SB)
通过上面两组汇编代码可以看出来,创建 channel 使用的是 makechan 函数,并接受两个参数,第一个参数是类型信息,第二个参数是缓冲区的长度分别存储在 SP,以及 SP+8 的位置.在看 makechan 函数之前,我们需要先了解一下 channel 的数据结构,即 hchan:
type hchan struct {
qcount uint // channel 缓冲区中元素个数
dataqsiz uint // 底层缓冲区数组的长度
buf unsafe.Pointer // 指向 channel 的缓冲区的指针
elemsize uint16 // channel 传输的数据类型的大小
closed uint32 // channel 是否已经关闭
elemtype *_type // channel 传输的数据类型
sendx uint // 发送数据的位置
recvx uint // 接收数据的位置
recvq waitq // 等待接收数据的 goroutine 组成的队列
sendq waitq // 等待发送数据的 goroutine 组成的队列
// 用来保护 channel 中的字段,
lock mutex
}
// waitq 就是 sudog 组成的链表,sudog 就是 goroutine 包装后进行阻塞的数据结构
type waitq struct {
first *sudog
last *sudog
}
makechan 操作
当 channel 无缓冲区时,在 hchan 中不会创建对应的缓冲区,只有在明确指定了 channel 缓冲区的大小,才会在对应的 hchan 中使用到和缓冲区相关的几个字段.下面来看看怎么创建一个channel:
func makechan(t *chantype, size int) *hchan {
elem := t.elem // 只需要知道 elem 是元素类型的 runtime 结构体即可
...
// 获取缓冲池内存大小,size(Type) * 元素个数
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// channel 对应的 hchan 结构体
// 初始化时特意将元素中有无指针单独进行了判断,对 GC 过程做的优化,避免 GC 时需要查找指针
var c *hchan
switch {
case mem == 0:
// 队列或者元素大小为空,所以不需要分配缓冲区的内存大小,直接分配一个 hchan 的结构体即可
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 元素类型不包含指针,直接一次性获取所需要的内存,注意这里的缓冲区其实在物理上是紧跟在 hchan 之后的
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素类型中包含指针
c = new(hchan)
// 单独分配缓冲区内存
c.buf = mallocgc(mem, elem, true)
}
// 一些基本参数的设置
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
根据是否有缓冲区,以及 channel 传输类型是否包含指针,buffer 和 channel 的关系如下: 整个 channel 的结构并不复杂,记录一下传输元素类型的基本信息,并根据情况初始化合适的元素缓冲区即可.按照上面的代码,创建好了缓冲区之后就要向 channel 中发送数据 messages <- “ping”.
chansend 操作
生成一下 messages <- “ping” 的汇编代码,生成代码如下:
0x001d 00029 (test.go:15) MOVQ "".messages+32(SP), AX
0x0022 00034 (test.go:15) PCDATA $0, $0
0x0022 00034 (test.go:15) MOVQ AX, (SP)
0x0026 00038 (test.go:15) PCDATA $0, $1
0x0026 00038 (test.go:15) LEAQ ""..stmp_0(SB), AX
0x002d 00045 (test.go:15) PCDATA $0, $0
0x002d 00045 (test.go:15) MOVQ AX, 8(SP)
0x0032 00050 (test.go:15) CALL runtime.chansend1(SB)
实际调用的是 chansend1() 进行的数据发送,这里需要接受的参数为两个,一个 channel 的指针,一个 发送数据的指针,汇编代码中的 stmp_0 代表的就是代码中的 “ping”, 发送方法如下:
// 汇编代码中的 c<-x 入口
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
// 通过 chansend 来进行数据的发送
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 逻辑中的 channel 没有初始化时: c == nil
// var messages chan string
// messages <- "ping"
// 向一个 nil channel 发送数据会导致 goroutine 阻塞
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
...
// 不阻塞且未关闭,且无缓冲区,或者缓冲区已满直接返回
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
...
lock(&c.lock)
// 向 close channel 发数据会 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 第一种发送情况,等待队列不为空,直接
if sg := c.recvq.dequeue(); sg != nil {
// 从接收等待队列中获取一个待接收者,绕过通道缓冲区,将要发送的值直接发送给等待接收者
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 第二种发送情况,缓冲区不满,且没有接收者等待,添加到缓冲区中
if c.qcount < c.dataqsiz {
// 获取 sendx 位置的缓冲区元素
qp := chanbuf(c, c.sendx)
...
// 通过内存拷贝将元素添加到缓冲区中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
// 因为是数组构成的循环队列,判断一下边界情况
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 第三种发送情况,缓冲区满了,或者没有缓冲区,且不阻塞,直接返回 false
if !block {
unlock(&c.lock)
return false
}
// 最后实在没办法,咋都发送不出去就只能阻塞在 channel 上了,receiver 会完成后续任务,所以阻塞后真正的发送代码就不在这里了
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 填充一下 sudog 就可以阻塞了
mysg.elem = ep // 发送数据的地址
mysg.waitlink = nil
mysg.g = gp // 发送数据的 goroutine
mysg.isSelect = false // 不在 select 中
mysg.c = c // 发送的 channel
gp.waiting = mysg
gp.param = nil
// 将包装好的 sudog 添加到待发送队列中,具体的 sudog 这里不做讲解,只需要知道这是一个队 g 进行包装后的结构体用来等待操作
c.sendq.enqueue(mysg)
// goroutine 陷入睡眠
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
//
KeepAlive(ep)
// 唤醒后继续执行收尾工作,其实这个时候数据已送复制到接收者的内存中了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 唤醒后 channel close 了,直接 panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// 释放 sudog 结构体
releaseSudog(mysg)
return true
}
上面代码中第一种情况直接发送调用的是 send()函数,下面来看看这个函数的具体实现:
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
// 从发送方把 ep 直接复制给接收方的 sg
// 把 lock 锁通过 func 传进来在 send 中操作完成之后释放
// 唤醒 ep(等待接收者),通道 c 此时已经锁定了,sg 已经从 c 中出队,ep != nil
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
// 等了半天的接收元素直接给它放到接收地址中,拿走
if sg.elem != nil {
// msg := <-messages 这里直接把 ep 复制给这个 msg
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒等待接收的 goroutine,并将其加入到发送方 goroutine 的 P.runnext 等待执行
goready(gp, skip+1)
}
上面就是整个的发送流程,从上面可以看出来,当向 channel 发送数据时,分为如下几步:
- 等待接收队列不为空,直接扔给他,不让中间商赚差价
- 没人接收,但是缓冲区不满,可以扔进缓冲区中然后溜之大吉
- 没人等待,吗,没有缓冲区或者缓冲区满了,还不阻塞,直接返回 false
- 实在发送不出去,只能阻塞住,等待接收者救它
chanrecv 操作
下来我们看看接收数据的情况: 老方法,先看看 msg := <-messages 的汇编代码:
0x00bd 00189 (test.go:20) CALL runtime.chanrecv1(SB)
调用的是 chanrecv1 进行数据的接收, chanrecv1 调用的是 chanrecv:
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// 将接收到的数据写入 ep,ep 可能为 nil
// 如果为 nil 就忽略接收到的数据 eg: <-message
// 如果 block == false && 没有可用数据,直接返回 false,false
// 如果 c 关闭,则设置 ep 为 0,并返回 true,false
// 否则知道获取一个元素填充 ep,然后返回 true,true
// ep != nil,必须指向堆或者调用者的栈
// 参数解析: 接收的通道,接收值的地址,是否阻塞
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
// 从 nil channel 接收数据
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 种种原因,直接返回类似上面 send 的相同部分
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 加锁开始操作
if c.closed != 0 && c.qcount == 0 {
// 从一个已经关闭的 channel 读取,但是缓冲区为空,直接返回读取不到数据
...
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 看看发送队列是不是有 goroutine 再等待,同样没有中间商赚差价
if sg := c.sendq.dequeue(); sg != nil {
// 调用 revc 直接从目标 goroutine 获取元素,revc 见后面
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓冲区中有数据,哪一个过来,直接退出
if c.qcount > 0 {
// 从环形队列拿一个
qp := chanbuf(c, c.recvx)
...
if ep != nil {
// 目标地址不为 nil,要把数据复制过去
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
// 处理环形队列边界情况
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 拿不到还不阻塞,拜拜
if !block {
unlock(&c.lock)
return false, false
}
// 阻塞在这个 channel 上
gp := getg()
// 同样把自己包装成 sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 见 chansend 的相同操作,填充 sudog
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// goroutine 陷入睡眠
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 唤醒之后的代码,这个时候 mysg.elem 已经有想要的数据了,后面做一些收尾处理即可
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
继续看看两个 goroutine 直接交易数据的函数 recv:
// 参数:接收 channel,发送者 sudog,数据目标地址
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 没有缓冲区
...
if ep != nil {
// 从发送者哪里把数据拷贝过来
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓冲区
...
// 获取缓冲区中的数据
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
// 将缓冲区中的数据复制给接收者
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者的数据放到缓冲区中
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
// 处理边界条件
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 清空发送者原来持有的数据
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送者
goready(gp, skip+1)
}
这里首先需要注意的一点就是 chanrecv 有两个返回值,第二个返回值判断 channel 是否关闭 从 channel 中获取数据同样也分为几步:
- channel 已经关闭,缓冲区中无数据,直接返回
- channel 中的发送缓冲队列有 goroutine 在等待,直接和它进行 py 交易即可返回
- 没有缓冲区,直接将第一个发送者的数据给接收者即可
- 有缓冲区,将缓冲区中的数据给接收者,然后把发送者的数据存到缓冲区中,这是为了防止,发送者阻塞过久,通过这种判断可以当缓冲区中出现空闲时,将等待发送者解放并唤醒
- 等待队列为空不要紧,缓冲区里还有数据,拿一个返回即可
- 咋都没数据,但是这个接收不阻塞,直接返回
- 咋都没数据还要阻塞等待,把自己收拾成 sudog,然后进行等待,唤醒后返回
closechan 操作
创建 channel,接收数据和发送数据都看完了,下面看看 channel 是如何关闭的,老规矩 close(messages) 汇编代码如下:
0x0041 00065 (test.go:13) CALL runtime.closechan(SB)
执行的是 closechan,可以关闭 channel:
func closechan(c *hchan) {
// nil 不能 close,直接 panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 重复 close channel
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
// 设置 close 标志位
c.closed = 1
var glist gList
// 释放所有接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 处理一个接收者
if sg.elem != nil {
// 尝试给等待接收者的复制一个对应 type 的默认值,int(0),string(""),ptr(nil)
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
...
// 加入 glist 后面进行统一唤醒
glist.push(gp)
}
// 释放所有发送者,可能会 panic
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 数据直接扔掉
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
...
// 加入 glist 后面进行统一唤醒
glist.push(gp)
}
unlock(&c.lock)
// 最后别忘了唤醒所有 channel
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
小结
写代码的时候需要注意不要对 nil channel 进行 close 操作. 整个 channel 的代码并不复杂,本质就是一个唤醒队列加上 Mutex 进行线程间的通信.但在实际的使用还是要了解它的使用方式,尤其是其和 select 等其余组件一起使用的时候