Don’t communicate by sharing memory, share memory by communicating。相信学过Go的同学都知道这句名言,可以说channel就是后边这句话的具体实现。channel是一个类型安全的循环队列,能够控制groutine在它上面读写消息的行为,比如阻塞某个groutine ,或者唤醒某个 groutine。
一个通道相当于一个先进先出(FIFO)的队列,各个元素值都是严格地按照发送的顺序排列的,先被发送通道的元素值一定会先被接收,一个左尖括号紧接着一个减号形象地代表了元素值的传输方向。
下面是创建几种不同的通道:
ch1 := make(chan int) // 无缓冲通道
ch2 := make(chan int, 3) // 有缓冲通道
ch3 := make(chan<- int, 1) // 单向通道:只能发送不能接收
ch4 := make(<-chan int, 1) // 单向通道:只能接收不能发送
下面举一个简单的示例:
func main() {
done := make(chan struct{})
c := make(chan string)
go func() {
s := <-c // 接收消息
println(s)
close(done) // 关闭通道,作为结束通知
}()
c <- "lvmenglou" // 发送消息
<-done // 阻塞,知道有数据或者通道关闭
}
//最后输出:lvmenglou
通道发送和接收操作基本特性:
7.2.1 数据结构
channel的数据结构如下:
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写入时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}
chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的,下图展示了一个可缓存6个元素的channel示意图:
下图展示了一个没有缓冲区的channel,有几个goroutine阻塞等待读数据:
7.2.2 发送
向一个channel中写数据简单过程如下:
7.2.3 接收
从一个channel读数据简单过程如下:
7.2.4 关闭
关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。
7.3.1 发送
阻塞情况:
ch := make(chan int, 2)
ch = nil
ch <- 4 // all goroutines are asleep - deadlock!
重要知识点:
ch := make(chan int, 2)
ch <- 4
close(ch)
ch <- 3 // panic: send on closed channel
7.3.2 接收
阻塞情况:
重要知识点:
c := make(chan int, 3)
c <- 11
c <- 12
close(c)
for i := 0; i < cap(c)+1; i++ {
x, ok := <-c
println(i, ":", ok, x)
}
// 输出
// 0: true 11
// 1: true 12
// 2: false 0
// 3: false 0
7.3.3 关闭
重要知识点:
ch := make(chan int, 2)
ch <- 4
close(ch)
close(ch) // panic: close of closed channel
7.3.4 for-range读取
我们常常会用for-range来读取channel的数据
ch := make(chan int, 1)
go func(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}(ch)
for val := range ch {
fmt.Println(val)
}
重要知识点:
7.3.5 select
select是跟channel关系最亲密的语句,它是被专门设计出来处理通道的,因为每个 case 后面跟的都是通道表达式,可以是读,也可以是写。下面看一个简单的示例:
// 准备好几个通道。
intChannels := [3]chan int{
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
}
// 随机选择一个通道,并向它发送元素值。
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index
// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)
default:
fmt.Println("No candidate case is selected!")
}
我们用一个包含了三个候选分支的select语句,分别尝试从上述三个通道中接收元素值,哪一个通道中有值,哪一个对应的候选分支就会被执行。后面还有一个默认分支,不过在这里它是不可能被选中的。
在使用select语句的时候,我们需要注意下面几个事情:
intChan := make(chan int, 1)
// 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
close(intChan)
})
select {
case _, ok := <-intChan:
if !ok {
fmt.Println("The candidate case is closed.")
break
}
fmt.Println("The candidate case is selected.")
}
上面的知识需要牢记,面试常考,下面是讲解select执行的流程:
上面写的有些多,简单总结一下:执行select时,会从左到右,从上到下,对每个case表达式求值,当所有case求值完毕后,会挑选满足的case执行,如果有多条都满足,就随机选择一条;如果都没有满足,就执行default;如果连default都没有,就阻塞住,等有满足条件的case出现时,再执行。
关于channel,零碎的知识点非常多,我还是想通过一个完整的示例,将这些知识点全部串起来,下面就以海外商城Push为例,将上面知识应用到实际场景中。
7.4.1 示例介绍
海外商城需要对W个业务方发送Push,针对每个业务方,为了提高Push的并发能力,采用N个协程从EMQ中读取数据(EMQ中都一个消息队列,里面缓存了大量的Push数据),数据读取后进行处理,然后将处理后的数据写到channel中。同时,服务有M个协程从channel中取出数据并消费,然后通过小米Push SDK,给用户发送Push。整体发送链路如下:
在看后面的内容前,我先抛出几个问题:
7.4.2 初始化
初始化channel数组,数组里面是每个业务方appTypes的channel,channel的缓存区大小为30,并启动10个消费者协程:
var (
messageChan map[string]chan *WorkMessage // channel
stopMasterChan chan bool // 消费者结束通知
appTypes = map[int32]string{1: "shop", 2: "bbs", 3: "sharesave"}
)
func initPushChannel() {
maxSize = 30 // channel缓存区大小
workNum = 10 // goroutine个数
stopMasterChan = make(chan bool)
messageChan = make(map[string]chan *WorkMessage)
for _, name := range appTypes {
workChan := make(chan *WorkMessage, maxSize)
messageChan[name] = workChan
for i := 0; i < workNum; i++ {
go startMaster(name, workChan) // 启动消费者协程
}
}
}
func startMaster(name string, workChan chan *WorkMessage) {
for {
if exit := dostartMaster(name, workChan); exit {
return
}
}
}
初始化EMQ的Client,并启动10个生产者协程:
var (
clientFactory client.ClientFactory // EMQ Client
stopChan chan bool // 生产者结束通知
)
func initEmq() {
// 初始化EMQ的Client和单次读取数据条数,该处代码省略...
maxConsumerNum := 10
stopChan = make(chan bool)
for i := 0; i < maxConsumerNum; i++ {
go receiveMsg(i) // 启动生产者协程
}
}
func receiveMsg(queueID int) {
for {
if exit := doReceiveMsg(queueID); exit {
logz.Info("stop receive msg ...", logz.F("queueID", queueID))
return
}
}
}
主方法调用:
func InitWorker() {
// 初始化push SDK,逻辑省略...
initPushChannel() // 初始化Channel,启动消费者
initEmq() // 启动生产者
}
func doReceiveMsg(queueID int) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.")
}
}()
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
// 1. 从EMQ获取数据List,逻辑省略...
// 2. 遍历List,获取业务类型,逻辑省略...
// 3. 根据业务类型,获取对应的channel
name := "sharesave" // 示例数据
pushChannel, _ := messageChan[name]
// 4. 构造Push数据,然后放入channel
pushData := &WorkMessage{AppLocal: "id", AppType: 1} // 示例数据
pushChannel <- pushData
case <-stopChan:
println("stop to send data to channel.")
return true
}
}
}
这部分代码我做了大量简化,这里主要做了2件事情:
func dostartMaster(name string, workChan chan *WorkMessage) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.")
}
}()
for {
select {
case t := <-workChan:
if t != nil {
for _, message := range t.PushMessages {
// 接受channel数据t,将数据推给Push SDK
// 逻辑省略...
}
}
case <-stopMasterChan:
println("stop to get data from channel.")
return true
}
}
}
这部分代码同样做了大量简化,这里主要做了2件事情:
// 通知生产者协程关闭,协程不再写channel
func stopRecvMsgFromQueue() {
close(stopChan)
}
// 通知消费者协程关闭,协程不再读channel,并关闭channel,消费完channel中剩余消息
func stopPushChannel() {
close(stopMasterChan)
time.Sleep(time.Second)
for _, c := range messageChan {
close(c)
for msg := range c {
if msg != nil {
for _, message := range msg.PushMessages {
// 接受channel数据t,将数据推给Push SDK
// 逻辑省略...
}
}
}
}
}
// 主方法调用
func StopWorker() {
stopRecvMsgFromQueue()
time.Sleep(time.Second * 2)
stopPushChannel()
}
比如服务重启,需要关闭协程时,主要做以下事情:
这里有两个地方sleep了一下,分别有以下作用:
本章基本都是干货,上面总结的比较全面,这里就不再重复了,如果你能回答我提的这些问题,你应该就掌握了本章的内容:
最后就是Push的并发示例,强烈建议大家能掌握,掌握了这个示例,后续你应该也能很容易通过channel实现数据共享,并结合goroutine写出你自己的高并发程序。
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/T6AiuW-t2vmX5IQD4_2WHQ
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。