go源码阅读_Channel

Posted on Jan 13, 2020

  最近代码中出现了一个十分诡异的 bug,找了半天才发现原来是因为 channel 的使用不当所导的.channel 有一个特性是读一个有缓冲区且已关闭的 channel 时,会一直返回默认值.可能是 0,"",或 nil,因为我使用了一个 for{select <-message}直接导致程序无限触发.要想更好的使用 channel 我们需要了解它内部的实现原理,下面就一起看看 channel 的实现.因为网上关于 channel 的文章十分之多,所以这里主要做的还是个人学习总结的为主.

  channel 是 golng 中十分重要的一个特性.认识 go 的开始就会了解到它的设计哲学, go 的设计哲学基于 CSP 并发模型,其便是通过 channel 来实现这一模型的.除了 channel,goroutine 也是整个设计中重要的组成部分.在 go 中不光可以使用 Mutex 等锁来进行同步,还可以使用 channel 来进行消息流控制的同步操作.

Channel 源码阅读

Channel 的使用

  这里并没有直接将 channel 的源码展示出来,让我们从 channel 的基本使用方式着手进行深入. 每个人都可以简单的写出这样的代码:

func main() {

    messages := make(chan string)

    go func() { messages <- "ping" }()

    msg := <-messages
    fmt.Println(msg)
}

Channel struct

  这里使用 make(chan string) 来创建一个不包含缓冲区的 channel.我们可以很简单的通过生成汇编看到这行代码其实如下:

        0x002f 00047 (test.go:7)        LEAQ    type.chan string(SB), AX
        0x0036 00054 (test.go:7)        PCDATA  $0, $0
        0x0036 00054 (test.go:7)        MOVQ    AX, (SP)
        0x003a 00058 (test.go:7)        MOVQ    $0, 8(SP)
        0x0043 00067 (test.go:7)        CALL    runtime.makechan(SB)

  但是如果我们想要创建一个带缓冲 buffer 的 channel,需要使用 make(chan string, 5) 来进行创建.生成的汇编代码如下:

        0x002f 00047 (test.go:7)        LEAQ    type.chan string(SB), AX
        0x0036 00054 (test.go:7)        PCDATA  $0, $0
        0x0036 00054 (test.go:7)        MOVQ    AX, (SP)
        0x003a 00058 (test.go:7)        MOVQ    $5, 8(SP)
        0x0043 00067 (test.go:7)        CALL    runtime.makechan(SB)

  通过上面两组汇编代码可以看出来,创建 channel 使用的是 makechan 函数,并接受两个参数,第一个参数是类型信息,第二个参数是缓冲区的长度分别存储在 SP,以及 SP+8 的位置.在看 makechan 函数之前,我们需要先了解一下 channel 的数据结构,即 hchan:

type hchan struct {
	qcount   uint           // channel 缓冲区中元素个数
	dataqsiz uint           // 底层缓冲区数组的长度
	buf      unsafe.Pointer // 指向 channel 的缓冲区的指针
	elemsize uint16 // channel 传输的数据类型的大小
	closed   uint32 // channel 是否已经关闭
	elemtype *_type // channel 传输的数据类型
	sendx    uint   // 发送数据的位置
	recvx    uint   // 接收数据的位置
	recvq    waitq  // 等待接收数据的 goroutine 组成的队列
	sendq    waitq  // 等待发送数据的 goroutine 组成的队列

	// 用来保护 channel 中的字段,
	lock mutex
}
// waitq 就是 sudog 组成的链表,sudog 就是 goroutine 包装后进行阻塞的数据结构
type waitq struct {
	first *sudog
	last  *sudog
}

makechan 操作

  当 channel 无缓冲区时,在 hchan 中不会创建对应的缓冲区,只有在明确指定了 channel 缓冲区的大小,才会在对应的 hchan 中使用到和缓冲区相关的几个字段.下面来看看怎么创建一个channel:

func makechan(t *chantype, size int) *hchan {
	elem := t.elem    // 只需要知道 elem 是元素类型的 runtime 结构体即可

	...
	// 获取缓冲池内存大小,size(Type) * 元素个数
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// channel 对应的 hchan 结构体
	// 初始化时特意将元素中有无指针单独进行了判断,对 GC 过程做的优化,避免 GC 时需要查找指针
	var c *hchan
	switch {
	case mem == 0:
		// 队列或者元素大小为空,所以不需要分配缓冲区的内存大小,直接分配一个 hchan 的结构体即可
		c = (*hchan)(mallocgc(hchanSize, nil, true)) 
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		// 元素类型不包含指针,直接一次性获取所需要的内存,注意这里的缓冲区其实在物理上是紧跟在 hchan 之后的
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 元素类型中包含指针
		c = new(hchan)
		// 单独分配缓冲区内存
		c.buf = mallocgc(mem, elem, true)
	}
	// 一些基本参数的设置
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	
	return c
}

根据是否有缓冲区,以及 channel 传输类型是否包含指针,buffer 和 channel 的关系如下:   整个 channel 的结构并不复杂,记录一下传输元素类型的基本信息,并根据情况初始化合适的元素缓冲区即可.按照上面的代码,创建好了缓冲区之后就要向 channel 中发送数据 messages <- “ping”.

chansend 操作

生成一下 messages <- “ping” 的汇编代码,生成代码如下:

        0x001d 00029 (test.go:15)       MOVQ    "".messages+32(SP), AX
        0x0022 00034 (test.go:15)       PCDATA  $0, $0
        0x0022 00034 (test.go:15)       MOVQ    AX, (SP)
        0x0026 00038 (test.go:15)       PCDATA  $0, $1
        0x0026 00038 (test.go:15)       LEAQ    ""..stmp_0(SB), AX
        0x002d 00045 (test.go:15)       PCDATA  $0, $0
        0x002d 00045 (test.go:15)       MOVQ    AX, 8(SP)
        0x0032 00050 (test.go:15)       CALL    runtime.chansend1(SB)

  实际调用的是 chansend1() 进行的数据发送,这里需要接受的参数为两个,一个 channel 的指针,一个 发送数据的指针,汇编代码中的 stmp_0 代表的就是代码中的 “ping”, 发送方法如下:

// 汇编代码中的 c<-x 入口
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	// 通过 chansend 来进行数据的发送
	chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	 // 逻辑中的 channel 没有初始化时: c == nil
    // var messages chan string
    // messages <- "ping"
    // 向一个 nil channel 发送数据会导致 goroutine 阻塞
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	...

	// 不阻塞且未关闭,且无缓冲区,或者缓冲区已满直接返回
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	...

	lock(&c.lock)
	// 向 close channel 发数据会 panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	// 第一种发送情况,等待队列不为空,直接
	if sg := c.recvq.dequeue(); sg != nil {
		// 从接收等待队列中获取一个待接收者,绕过通道缓冲区,将要发送的值直接发送给等待接收者
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	// 第二种发送情况,缓冲区不满,且没有接收者等待,添加到缓冲区中
	if c.qcount < c.dataqsiz {
		// 获取 sendx 位置的缓冲区元素
		qp := chanbuf(c, c.sendx)
		...
		// 通过内存拷贝将元素添加到缓冲区中
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		// 因为是数组构成的循环队列,判断一下边界情况
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	// 第三种发送情况,缓冲区满了,或者没有缓冲区,且不阻塞,直接返回 false
	if !block {
		unlock(&c.lock)
		return false
	}

	// 最后实在没办法,咋都发送不出去就只能阻塞在 channel 上了,receiver 会完成后续任务,所以阻塞后真正的发送代码就不在这里了
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// 填充一下 sudog 就可以阻塞了
	mysg.elem = ep               // 发送数据的地址
	mysg.waitlink = nil
	mysg.g = gp                  // 发送数据的 goroutine
	mysg.isSelect = false        // 不在 select 中
	mysg.c = c                   // 发送的 channel
	gp.waiting = mysg
	gp.param = nil
	// 将包装好的 sudog 添加到待发送队列中,具体的 sudog 这里不做讲解,只需要知道这是一个队 g 进行包装后的结构体用来等待操作
	c.sendq.enqueue(mysg)
	// goroutine 陷入睡眠
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
	// 
	KeepAlive(ep)

	// 唤醒后继续执行收尾工作,其实这个时候数据已送复制到接收者的内存中了
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		// 唤醒后 channel close 了,直接 panic
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	// 释放 sudog 结构体
	releaseSudog(mysg)
	return true
}

  上面代码中第一种情况直接发送调用的是 send()函数,下面来看看这个函数的具体实现:

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
// 从发送方把 ep 直接复制给接收方的 sg
// 把 lock 锁通过 func 传进来在 send 中操作完成之后释放
// 唤醒 ep(等待接收者),通道 c 此时已经锁定了,sg 已经从 c 中出队,ep != nil
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	...
	// 等了半天的接收元素直接给它放到接收地址中,拿走
	if sg.elem != nil {
		// msg := <-messages 这里直接把 ep 复制给这个 msg
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	
	// 唤醒等待接收的 goroutine,并将其加入到发送方 goroutine 的 P.runnext 等待执行
	goready(gp, skip+1)
}

  上面就是整个的发送流程,从上面可以看出来,当向 channel 发送数据时,分为如下几步:

  1. 等待接收队列不为空,直接扔给他,不让中间商赚差价
  2. 没人接收,但是缓冲区不满,可以扔进缓冲区中然后溜之大吉
  3. 没人等待,吗,没有缓冲区或者缓冲区满了,还不阻塞,直接返回 false
  4. 实在发送不出去,只能阻塞住,等待接收者救它

chanrecv 操作

下来我们看看接收数据的情况: 老方法,先看看 msg := <-messages 的汇编代码:

        0x00bd 00189 (test.go:20)       CALL    runtime.chanrecv1(SB)

调用的是 chanrecv1 进行数据的接收, chanrecv1 调用的是 chanrecv:

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}
// 将接收到的数据写入 ep,ep 可能为 nil
// 如果为 nil 就忽略接收到的数据 eg: <-message
// 如果 block == false && 没有可用数据,直接返回 false,false
// 如果 c 关闭,则设置 ep 为 0,并返回 true,false
// 否则知道获取一个元素填充 ep,然后返回 true,true
// ep != nil,必须指向堆或者调用者的栈
// 参数解析: 接收的通道,接收值的地址,是否阻塞
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if c == nil {
		// 从 nil channel 接收数据
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 种种原因,直接返回类似上面 send 的相同部分
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
	// 加锁开始操作
	if c.closed != 0 && c.qcount == 0 {
		// 从一个已经关闭的 channel 读取,但是缓冲区为空,直接返回读取不到数据
		...
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	// 看看发送队列是不是有 goroutine 再等待,同样没有中间商赚差价
	if sg := c.sendq.dequeue(); sg != nil {
		// 调用 revc 直接从目标 goroutine 获取元素,revc 见后面
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	// 缓冲区中有数据,哪一个过来,直接退出
	if c.qcount > 0 {
		// 从环形队列拿一个
		qp := chanbuf(c, c.recvx)
		...
		if ep != nil {
			// 目标地址不为 nil,要把数据复制过去
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		// 处理环形队列边界情况
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
	// 拿不到还不阻塞,拜拜
	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 阻塞在这个 channel 上
	gp := getg()
	// 同样把自己包装成 sudog
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// 见 chansend 的相同操作,填充 sudog
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// goroutine 陷入睡眠
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	// 唤醒之后的代码,这个时候 mysg.elem 已经有想要的数据了,后面做一些收尾处理即可
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

继续看看两个 goroutine 直接交易数据的函数 recv:

// 参数:接收 channel,发送者 sudog,数据目标地址
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		// 没有缓冲区
		...
		if ep != nil {
			// 从发送者哪里把数据拷贝过来
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// 有缓冲区
		...
		// 获取缓冲区中的数据
		qp := chanbuf(c, c.recvx)
		// copy data from queue to receiver
		if ep != nil {
			// 将缓冲区中的数据复制给接收者
			typedmemmove(c.elemtype, ep, qp)
		}
		// 将发送者的数据放到缓冲区中
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		// 处理边界条件
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	// 清空发送者原来持有的数据
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 唤醒发送者
	goready(gp, skip+1)
}

  这里首先需要注意的一点就是 chanrecv 有两个返回值,第二个返回值判断 channel 是否关闭 从 channel 中获取数据同样也分为几步:

  1. channel 已经关闭,缓冲区中无数据,直接返回
  2. channel 中的发送缓冲队列有 goroutine 在等待,直接和它进行 py 交易即可返回
    • 没有缓冲区,直接将第一个发送者的数据给接收者即可
    • 有缓冲区,将缓冲区中的数据给接收者,然后把发送者的数据存到缓冲区中,这是为了防止,发送者阻塞过久,通过这种判断可以当缓冲区中出现空闲时,将等待发送者解放并唤醒
  3. 等待队列为空不要紧,缓冲区里还有数据,拿一个返回即可
  4. 咋都没数据,但是这个接收不阻塞,直接返回
  5. 咋都没数据还要阻塞等待,把自己收拾成 sudog,然后进行等待,唤醒后返回

closechan 操作

  创建 channel,接收数据和发送数据都看完了,下面看看 channel 是如何关闭的,老规矩 close(messages) 汇编代码如下:

        0x0041 00065 (test.go:13)       CALL    runtime.closechan(SB)

执行的是 closechan,可以关闭 channel:

func closechan(c *hchan) {
	// nil 不能 close,直接 panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	
	lock(&c.lock)
	// 重复 close channel
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	...
	// 设置 close 标志位
	c.closed = 1

	var glist gList

	// 释放所有接收者
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		// 处理一个接收者
		if sg.elem != nil {
			// 尝试给等待接收者的复制一个对应 type 的默认值,int(0),string(""),ptr(nil)
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		...
		// 加入 glist 后面进行统一唤醒
		glist.push(gp)
	}

	// 释放所有发送者,可能会 panic
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		// 数据直接扔掉
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		...
		// 加入 glist 后面进行统一唤醒
		glist.push(gp)
	}
	unlock(&c.lock)

	// 最后别忘了唤醒所有 channel
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

小结

  写代码的时候需要注意不要对 nil channel 进行 close 操作.   整个 channel 的代码并不复杂,本质就是一个唤醒队列加上 Mutex 进行线程间的通信.但在实际的使用还是要了解它的使用方式,尤其是其和 select 等其余组件一起使用的时候