通过源码分析Go的基本类型-Channel(管道),源代码版本:go 1.17.3.

一、管道介绍

Go中常见的原则是:不要通过共享内存进行通信,而是通过通信来共享内存。

Go实现了一种通讯顺序进程(Communicating sequential processes, CSP),Goroutine和channel分别对应CSP中的实体和管道,Goroutine通过channel传递消息。

go语言笔记详解(Go精读系列之管道)(1)

引自https://img.draveness.me/2020-01-28-15802171487080-channel-and-goroutines.png

G1向channel发送消息,G2从channel获取G1的消息,其中G1和G2相互独立运行,但是通过channel实现消息传递。

在Go中,channel收发操作遵循FIFO(先进先出):

1. 先向channel读取数据的goroutine先收到数据;

2. 先向channel发送数据的goroutine先写入数据。

二、数据结构

channel是由:mutex、环状数据缓存、一个发送队列和一个接收队列组成,结构体为:

go语言笔记详解(Go精读系列之管道)(2)

go语言笔记详解(Go精读系列之管道)(3)

引自https://golang.design/under-the-hood/assets/chan.png

等待接收和发送数据的waitq结构定义:

go语言笔记详解(Go精读系列之管道)(4)

可以看到,waitq定义了前后指针,从而使等待的goroutine构成了链表。其中队列中的goroutine信息封装在`runtime.sudog`中。

三、创建管道

Go的编译器将完成如下翻译: `make(chan type, n) => makechan(type, n)`

Go中通过关键字`make`来初始化管道。其中编译器会将其转换成OMAKE指令,并在类型检查阶段将OMAKE转换成OMAKECHAN:

go语言笔记详解(Go精读系列之管道)(5)

其中tcMake的定义为:

go语言笔记详解(Go精读系列之管道)(6)

从tcMake可以看出,当在程序中使用make自定来初始化:slice、map、channel时,编译器都会将其分别转换为:OMAKESLICE、OMAKEMAP、OMAKECHAN分别处理。

在channel初始化中,会检查是否需要设置缓冲区,如果没有指定缓冲区,则默认设定为0表示channel无缓冲。

OMAKAECHAN在编译SSA中间代码阶段之前被转换成`runtime.makechan`,

go语言笔记详解(Go精读系列之管道)(7)

如果缓冲区的size超过int32的取值范围,则使用`runtime.makechan64`

go语言笔记详解(Go精读系列之管道)(8)

可以看到,Go实际不支持int64的大小,而是强转为int。接下来看int大小的实现。

go语言笔记详解(Go精读系列之管道)(9)

上述代码首先会检查需要申请的内存是否溢出(大于或等于2^32),然后初始化hchan和缓冲区。

1. 如果无缓冲区,则只分别用于存储hchan的其他字段的内存,是最小的内存分配;

2. 如果管道中传递的类型不是指针类型,会为当前的 Channel和具体的数据类型分配一块连续的内存空间;

3. 其他情况,按照hchan和缓冲区大小分配内存。

而具体的 makechan 实现的本质是根据需要创建的元素大小,使用mallocgc分配内存, 因此Channel总是在堆上进行分配的,它们会被垃圾回收器进行回收,这也是为什么 Channel 不一定总是需要调用 close(ch) 进行显式地关闭。

四、发送数据

Go的编译器将完成如下翻译:`ch <- v => chansend1(ch, v)`

编译器会将 `ch <- v`转换成OSEND指令,并在类型检查阶段将OSEND转换成`runtime.chansend1`

go语言笔记详解(Go精读系列之管道)(10)

其中walkSend为:

go语言笔记详解(Go精读系列之管道)(11)

可以看到,`runtime.chansend1`单纯的调用了`runtime.chansend`

go语言笔记详解(Go精读系列之管道)(12)

发送数据前需要竞争锁,然后判断channel是否已经被关闭,所以在工程上不能向closed的channel写数据。

接下来按照三种场景来看channel的发送数据操作:

1. 存在等待接收数据的goroutine,直接发送;

2. 缓冲区存在空闲,将数据写入到channel的缓冲区;

3. 不存在缓存区或缓冲区已满,阻塞。

上述三种场景是按顺序执行的,首先来看第一种:

go语言笔记详解(Go精读系列之管道)(13)

如果存在receiver,则直接调用`runtime.send`来将数据传递给receiver。

go语言笔记详解(Go精读系列之管道)(14)

发送时,需要完成两个操作:

1. 调用sendDirect, 将需要发送的数据拷贝给接收者;

2. 调用goready,设置接收者的状态为`_Grunnable`, 作为下一个立即被执行的 Goroutine.

go语言笔记详解(Go精读系列之管道)(15)

接收方可能处于阻塞在channel中,所以发送数据后需要执行goready时调度器继续开始调度接收者。

然后看第二种:将数据写入缓冲区:

go语言笔记详解(Go精读系列之管道)(16)

首先根据缓冲区中发送操作的当前坐标计算出数据在缓冲区中的内存地址,然后将需要发送的数据拷贝到缓冲区对应位置上中。然后sendx索引后移1为,qcount计算加1。 如果放入的位置是缓存区的最后一位,则sendx设置为0,结合qcount可以判断缓冲区是否已满。

执行完成后,释放锁。

第三种情况:无缓冲区或缓冲区已满导致的阻塞:

go语言笔记详解(Go精读系列之管道)(17)

上述代码看起来比较复杂,整理如下:

1. 调用`runtime.getg`, 获取发送数据使用的goroutine;

2. 构建sudog,设置这次需要发送的相关信息。如果发送的channel、发送的goroutine、发送的数据、是否是select中等;

3. 将sudog加入发送等待队列,并设置到发送的Goroutine为sudog已就绪,等待发送;

4. 调用 runtime.gopark 将当前的 Goroutine 陷入沉睡等待唤醒;

5. 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 `runtime.sudog` 结构体。

go语言笔记详解(Go精读系列之管道)(18)

函数在最后会返回 true 表示这次我们已经成功向 Channel 发送了数据。

总结下来,发送过程一共包含三步:

1. 获取channel的锁;

2. 发送者入缓冲队列,拷贝要发送的数据;

3. 释放锁。

其中第二部,包括:

1. 找到是否有正在阻塞的接收方,是则直接发送;

2. 找到是否有空余的缓存,是则存入;

3. 阻塞直到被唤醒。

五、接收数据

Go支持两种方式接收channel的数据。

1. `v <- ch`

2. `v, beforeClosed <- ch`

两种方式最终会编译为调用`runtime.chanrecv`

go语言笔记详解(Go精读系列之管道)(19)

接收数据首先检查channel的情况,如果channel没有数组且是非阻塞接收,则可按照不同情况快速返回:

1. channel未初始化,会将当前的 Goroutine 休眠,从而发生死锁崩溃;

2. channel未关闭,但是buf是空的,相当于channel没有数据,则直接返回false;

3. channel已关闭,buf为空,返回默认值和false。

接着往下:

go语言笔记详解(Go精读系列之管道)(20)

channel的接收是异步的,所以在快速判断之后,还需要在获得锁后再次判断是否为空。

接着往下:

go语言笔记详解(Go精读系列之管道)(21)

如果有发送者阻塞在channel中(上文发送数据的第三种情况),则直接从channel的发送者处接收数据,然后快速返回。

上文发送数据阻塞恢复后,可以直接清空sudog的设置,就是因为receiver已经把数据拿走了。

跳到`runtime.recv`

go语言笔记详解(Go精读系列之管道)(22)

如果channel存在缓冲区,则接收者获取数据后,必然可以释放阻塞在channel中头部的发送者。

接着往下:

go语言笔记详解(Go精读系列之管道)(23)

类似发送数据,这是第二种情况,接收者可以从缓冲区中获取到数据,从而快速返回。

接着往下:

go语言笔记详解(Go精读系列之管道)(24)

和发送数据一样, 如果没有发送者,接收者会被阻塞到channel的缓冲区中,直到有发送者调用`runtime.chansend`

六、关闭管道

Go支持显性调用close来关闭管道:`close(ch) => closechan(ch)`

我们直接定位到closechan:

go语言笔记详解(Go精读系列之管道)(25)

跳出来思考两个问题。

第一个问题:channel是支持指针的,也就是说channel传递指针是拷贝的是指针地址,发送者和接收者拿到的指针指向的还是同一片内存地址,所以channel传递的指针类型并发读写是不安全的。

第二个问题:chansend和chanrecv都使用了非阻塞情况下的快速失败路径,为什么chanrecv需要atomic原子操作,而chansend不需要?

如果管道没有关闭,chansend失败的原因是:没有接收者或者缓冲区已满。chanrecv失败的原因是:没有发送者或缓冲区为空。

chanrecv的atomic保证了channel未关闭和缓冲区为空的顺序。

首先chansend失败的结论是可以保证一致的,因为:channel关闭的情况下,chansend也会得到不能发送数据的结论。

但是chanrecv不可以,因为在channel被关闭的情况下,如果先判断缓冲区为空,则可能得到结论是:channel未关闭且缓冲区空。 如果先判断close被关闭了,那么得到的结论是:channel已关闭且缓冲区为空。

这两者结论不一致,所以需要通过atomic来保证两个指令的编排顺序。

七、总结

通过对channel的解读,可以知道:

1. 不能往关闭的channel中发送数据;

2. 关闭的channel可以反复地读取数据,如果数据不是发送者发送的,则会得到received=false的标志;

3. 通过channel通信的数据,是从发送端的goroutine的栈中直接复制到接收端的gotoutine的栈中的;

4. 存在非阻塞的channel使用,就是结合select。

八、参考
  1. Go 语言 Channel 实现原理精要 | Go 语言设计与实现
  2. 3.6 通信原语 | Go 语言原本:https://golang.design/under-the-hood/zh-cn/part1basic/ch03lang/chan/

生活依然要继续,每天拿出半个小时,放下焦虑,用行动来积累更好的自己,我们一起加油!

,