channel 是什么?
channel 使用姿势
chan 创建
chan 入队
chan 出队
结合 select 语句
结合 for-range 语句
源码解析
makechan
hchan 结构
chansend
chanrecv
selectnbsend
selectnbrecv
selectnbrecv2
chanrecv2
总结
chan 是 golang 的最重要的一个结构,是区别于其他高级语言的最重要的特色之一,也是 goroutine 通信必须要的要素之一。很多人用它,但是很少人彻底理解过它,甚至 c <- x
,<-c
这样的语法可能都记不清晰,怎么办?本文教你从源码编译器的角度全方位的剖析 channel 的用法。
本质上就实现角度来讲,golang 的 channel 就是一个环形队列(ringbuffer)的实现。我们称 chan 为管理结构,channel 里面可以放任何类型的对象,我们称之为元素。
我们从 channel 的使用姿势入手,讲解最详细的 channel 使用方法。
我们从宏观的 chan 使用姿势入手,总结来讲,有以下几种姿势:
创建一个 channel ,一般用户使用姿势有两种,分别是创建有 buffer 和没有 buffer 的 channel 。
// no buffer 的 channel
c := make(chan int)
// 自带 buffer 的 channel
c1 := make(chan int , 10)
这个对应了实际函数是 makechan
,位于 runtime/chan.go
文件里。
用户使用姿势:
c <- x
对应函数实现 chansend
,位于 runtime/chan.go
文件。
用户使用姿势:
v := <-c
v, ok := <-c
对应函数分别是 chanrecv1
和 chanrecv2
,位于 runtime/chan.go
文件。
用户使用姿势:
select {
case c <- v:
// ... foo
default:
// ... bar
}
对应函数实现为 selectnbsend
, 位于 runtime/chan.go
文件中。
用户使用姿势:
select {
case v = <-c:
// ... foo
default:
// ... bar
}
对应函数实现为 selectnbrecv
, 位于 runtime/chan.go
文件中。
用户使用姿势:
select {
case v, ok = <-c:
// ... foo
default:
// ... bar
}
对应函数实现为 selectnbrecv2
, 位于 runtime/chan.go
文件中。
用户使用姿势:
for m := range c {
// ... do something
}
对应使用函数 chanrecv2
,位于 runtime/chan.go
文件中。
上面我们通过宏观的用户使用姿势,了解清楚了不同的使用姿势对应了不同实现函数(这个翻译是编译器来做的),我们接下来就是仔细看下这些函数的实现。
makechan
负责 channel 的创建,当我们 go 程序里写类似 v := make(chan int)
的初始化语句,就会相应的调用不同类型对应的初始化函数,其中 channel 的初始化函数就是 makechen
。
runtime.makechan
定义原型:
func makechan(t *chantype, size int) *hchan {
}
通过这个,我们能得知到,声明创建一个 channel ,本质上是得到了一个 hchan 的指针,所以 channel 的核心结构就是基于 hchan 来实现的。
其中 t 参数是指明元素类型:
type chantype struct {
typ _type
elem *_type
dir uintptr
}
size 指明这个 channel buffer 槽位有多少。如果是带 buffer 的 channel,比如那么 size 就是槽位数,如果没有指定,那么就是 0;
// size == 0
a := make(chan int)
// size == 2
b := make(chan int, 2)
我们看下 makechan 做的事情,其实很简单,就只做了两件事:
func makechan(t *chantype, size int) *hchan {
// 参数校验
// 初始化 hchan 结构
}
参数校验无非就是一些越界,或者 limit 的校验。
初始化 hchan 则简单的分为三种情况:
switch {
// no buffer 的场景,这种 channel 可以看成 pipe;
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
// channel 元素不含指针的场景,那么是分配出一个大内存块;
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 默认场景,hchan 结构体和 buffer 内存块单独分配;
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
2 . 如果 channel 元素(elem)内不含指针,那么 hchan 和 buffer 其实是可以在一起分配的,hchan 和 elem buffer 的内存块连续;
3 . 如果 channel 元素(elem)是带有指针的,那么 hchan 和 buffer 就不能分配在一起,所以先 new 一个 hchan 结构,再单独分配 elem buffer 数组;
所以我们看到除了 hchan 结构体本身的内存分配,该结构体初始化的关键在于四个字段:
// channel 的元素 buffer 数组地址;
c.buf = mallocgc(mem, elem, true)
// channel 元素大小,如果是 int,那么就是 8 字节;
c.elemsize = uint16(elem.size)
// 元素类型,这样就知道 channel 里面每个元素究竟是啥了;
c.elemtype = elem
// 元素 buffer 数组的大小,比如 make(chan int, 2),那么这里赋值的就是 2;
c.dataqsiz = uint(size)
makechan
函数负责创建了 chan 的核心结构-hchan,接下来我们再仔细分析下 hchan 结构体本身。
type hchan struct {
qcount uint // queue 里面有效用户元素,这个字段是在元素出对,入队改变的;
dataqsiz uint // 初始化的时候赋值,之后不再改变,指明数组 buffer 的大小;
buf unsafe.Pointer // 指明 buffer 数组的地址,初始化赋值,之后不会再改变;
elemsize uint16 // 指明元素的大小,和 dataqsiz 配合使用就能知道 buffer 内存块的大小了;
closed uint32
elemtype *_type // 元素类型,初始化赋值;
sendx uint // send index
recvx uint // receive index
recvq waitq // 等待 recv 响应的对象列表,抽象成 waiters
sendq waitq // 等待 sedn 响应的对象列表,抽象成 waiters
// 互斥资源的保护锁,官方特意说明,在持有本互斥锁的时候,绝对不要修改 Goroutine 的状态,不能很有可能在栈扩缩容的时候,出现死锁
lock mutex
}
在 makechan
我们就看到初始化的时候其实只会初始化四个核心字段:
2 . elemsize :指明元素大小
3 . elemtype :指明元素类型
4 . dataqsiz :指明数组大小
我们使用 channel 的时候知道,channel 常常会因为两种情况阻塞,1)投递的时候没有空间了, 2)取出的时候还未有元素。
// 如果 c 没有空间了,那么这行代码就会 hang 住,goroutine 会把执行权限让出去,直到有 buffer 空间,才会返回;
c <- x
// 如果 c 里面没有用户元素,那么这行代码会 hang 住,goroutine 切走,直到取到一个元素,这行代码才会返回;
<- c
从以上描述来说,就涉及到 goroutine 阻塞和 goroutine 唤醒,这个功能就跟 recvq
,sendq
这两个字段有关。
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
waitq 类型其实就是一个双向列表的实现,和 linux 里面的 LIST 实现非常相像。
type waitq struct {
first *sudog
last *sudog
}
chansend 函数是在编译器解析到 c <- x
这样的代码的时候插入的,本质上就是把一个用户元素投递到 hchan 的 ringbuffer 中。chansend 调用的时候,一般用户会遇到两种情况:
接下来,我们看下 chansend 究竟是做了什么。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// channel 的所有操作,都在互斥锁下;
lock(&c.lock)
// 如果投递的目标是已经关闭的 channel,那么直接 panic;
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 场景一:性能最好的场景,我投递的元素刚好有人在等着(那我直接给他就完了);
// 调用的是 send 函数,这个函数后面详细阐述,其实非常简单,递增 sendx, recvx 的索引,然后直接把元素给到等他的人,并且唤醒他;
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 场景二:ringbuffer 还有空间,那么把元素放好,递增索引,就可以返回了;
if c.qcount < c.dataqsiz {
// 复制,赋值好元素;
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
// 递增索引
c.sendx++
// 回环空间
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 递增元素个数
c.qcount++
unlock(&c.lock)
return true
}
// 判断是否需要阻塞?如果是非阻塞的,那么就直接解锁返回了,如果是阻塞的场景,那么就会走到下面的逻辑哦;
// chan <- 和 <-chan 的场景,都是 true,但是会有其他场景这里是 false,可以提前想下?
if !block {
unlock(&c.lock)
return false
}
// 代码走到这里,说明都是因为条件不满足,要阻塞当前 goroutine,所以做的事情本质上就是保留好通知路径,等待条件满足,会在这个地方唤醒;
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 把 goroutine 相关的线索结构入队,等待条件满足的唤醒;
c.sendq.enqueue(mysg)
// goroutine 切走,让出 cpu 执行权限;
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 到这就是某些人唤醒该 goroutine 了。
// 下面就是唤醒之后的逻辑了;
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 做一些资源的释放和环境的清理。
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
// 做一些校验
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
当我们在 golang 里面执行 c <- x
这么一行代码意图投递一个元素到 channel 的时候,其实就是调用到 chansend 函数。这个函数分几个场景来处理,总结来说:
2 . 场景二:如果 ringbuffer 还有空间,那么就把元素存着,这种也是场景的流程,存和取走的是异步流程,可以把 channel 理解成消息队列,生产者和消费者解耦;
3 . 场景三:ringbuffer 没空间,这个时候就要是否需要 block 了,一般来讲,c <- x
编译出的代码都是 block = true
,那么什么时候 chansend 的 block 参数会是 false 呢?答案是:select 的时候;
关于返回值:chansend 返回值标明元素是否 push 入队成功,成功的话,返回值为 true,否则 false 。
select 的提前揭秘:
select {
case c <- v:
// ... foo
default:
// ... bar
}
golang 源代码经过编译会变成类似如下:
if selectnbsend(c, v) {
// ... foo
} else {
// ... bar
}
而 selectnbasend
只是一个代理:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
// 调用的就是 chansend 函数,block 参数为 false;
return chansend(c, elem, false, getcallerpc())
}
小结:没错,chansend 功能就是这么简单,本质上就是一句话:投递元素到 channel 中。
对应的 golang 语句是:<- c
。该函数实现了 channel 的元素出队功能。举个例子,编译对应一般如下:
golang 语句:
<- c
对应:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
golang 语句(这次的区别在于是否有返回值):
v, ok := <- c
对应:
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
编译器在遇到 <-c
和 v, ok := <-c
的语句的时候,会换成对应的 chanrecv1
,chanrecv2
函数,这两个函数本质上都是一个简单的封装,元素出队的实现函数是 chanrecv
,我们详细分析下这个函数。block 都等于 true(同样的,只有 select 的时候,block 才会是 false )。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 特殊场景:非阻塞模式,并且没有元素的场景直接就可以返回了,这个分支是快速分支,下面的代码都是在锁内的;
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
// 以下所有的逻辑都在锁内;
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 场景:如果发现有个人(sender)正在等着别人接收,那么刚刚好,直接把它的元素给到我们这里就好了;
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 场景:ringbuffer 还有空间存元素,那么下面就可以把元素放到 ringbuffer 放好,递增索引,就可以返回了;
if c.qcount > 0 {
// 存元素
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 递增索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 代码到这说明 ringbuffer 空间是不够的,后面学会要做两个事情,是否需要阻塞?
// 如果 block 为 false ,那么直接就退出了,返回对应的返回值;
if !block {
unlock(&c.lock)
return false, false
}
// 到这就说明要阻塞等待了,下面唯一要做的就是给阻塞做准备(准备好唤醒的条件)
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// goroutine 作为一个 waiter 入队列,等待条件满足之后,从这个队列里取出来唤醒;
c.recvq.enqueue(mysg)
// goroutine 切走,交出 cpu 执行权限
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 这里是被唤醒的开始的地方;
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 下面做一些资源的清理
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
chanrecv 函数的返回值有两个值,selected,received,其中 selected 一般作为 select 结合的函数返回值,指明是否要进入 select-case 的代码分支,received 表明是否从队列中成功获取到元素,有几种情况:
2 . 如果是阻塞模式( block=true ),如果 chan 已经 closed 了,那么返回的是 (selected=true,received=false),说明需要进到 select 的分支,但是是没有取到元素的;
3 . 如果是阻塞模式,chan 还是正常状态,那么返回(selected=true,recived=true),说明正常取到了元素;
该函数是 c <- v
结合到 select 时候的函数,我们使用 select 的 case 里面如果是一个 chan 的表达式,那么编译器会转换成对应的 selectnbsend 函数,如下:
select {
case c <- v:
// ... foo
default:
// ... bar
}
对应编译函数逻辑如下:
if selectnbsend(c, v) {
// ... foo
} else {
// ... bar
}
selectnbsend 本质上也就是个 chansend 的封装:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
// 注意 block 参数为 false
return chansend(c, elem, false, getcallerpc())
}
chansend 的内部逻辑上面已经详细说明过,唯一不同的就是 block 参数是赋值成 false ,也就是说,在 ringbuffer 没有空间的是否也不会阻塞,直接返回。划重点:chan 在这里不会切走执行权限。
该函数也是 v := <-c
结合到 select 时候的函数,我们使用 select 的 case 里面如果是一个 chan 的表达式,那么编译器会转换成对应的 selectnbsrecv 函数,如下:
select {
case v = <-c:
// ... foo
default:
// ... bar
}
对应编译函数逻辑如下:
if selectnbrecv(&v, c) {
// ... foo
} else {
// ... bar
}
selectnbrecv 本质上也就是个 chanrecv 的封装:
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
// block 参数为 false
selected, _ = chanrecv(c, elem, false)
return
}
chanrecv 的内部逻辑上面已经详细说明过,在 ringbuffer 没有元素也不会阻塞,直接返回。这里不会因此而切走调度权限。
该函数是 v, ok = <-c
结合到 select 时候的函数,我们使用 select 的 case 里面如果是一个 chan 的表达式,那么编译器会转换成对应的 selectnbrecv2 函数,如下:
select {
case v, ok = <-c:
// ... foo
default:
// ... bar
}
对应编译函数逻辑如下:
if selectnbrecv2(&v, &ok, c) {
// ... foo
} else {
// ... bar
}
selectnbrecv2 本质上是个 chanrecv 的封装,只不过返回值不一样而已:
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
// block 参数为 false
selected, *received = chanrecv(c, elem, false)
return
}
chanrecv 的内部逻辑上面已经详细说明过,在 ringbuffer 没有元素也不会阻塞,直接返回。这里不会因此而切走调度权限。selectnbrecv2
对比 selectnbrecv
函数的不同是还有个 ok
参数指明是否获取到了元素。
chan 可以和 for-range
结合使用,编译器会识别这种语法使用,如下:
for m := range c {
// ... do something
}
这个本质上是个 for
循环,我们知道 for
循环关键是拆建成 3 个部分,初始化,条件判断,条件递进:
for (init , condition, increment) {
// do something
}
那么在我们 for-range
和 chan
结合起来之后,这 3 个关键因素又是怎么理解呢?简述如下:
init 初始化 :无
condition 条件判断 :
ok := chanrecv2(c, ep)
if ok {
}
increment 条件递进 :无
当编译器遇到上面 chan 结合 for-range
写法 ,会转换成 chanrecv2
的函数调用。意图从 channel 里出队元素, 返回值为 received 。首先看下 chanrecv2 的实现:
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
// 注意了,这个 block=true,说明 chanrecv 内部是阻塞的;
_, received = chanrecv(c, elem, true)
return
}
chan 结合 for-range
编译之后的伪代码如下:
for ( ; ok = chanrecv2( c, ep ) ; ) {
// do something
}
划重点:从这个实现,我们可以获取一个非常重要的信息,for-range
和 chan 的结束条件只有这个 chan 被 close 了,否则一直会处于这个死循环内部。为什么?注意看 chanrecv
接收的参数是 block=true
,并且这个 for-range 是一个死循环,除非 chanrecv2 返回值为 false ,才有可能跳出循环,而 chanrecv2 在 block=true 场景下返回值为 false 的唯一原因只有:这个 chan 是 close 状态。
golang 的 chan 使用非常简单,这些简单的语法糖背后其实都是对应了相应的函数实现,这个翻译由编译器来完成。深入理解这些函数的实现,这些对于我们彻底理解 chan 的使用和限制条件是少不了的。深入理解原理,知其然知其所以然,你才能从心所欲的使用 golang 。
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/03z4LWUDpK7g5s9nFHon9A
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。