Go官方设计了一个信号量库

发表于 3年以前  | 总阅读数:282 次

前言

哈喽,大家好 。在写上一篇文章[请勿滥用goroutine] 时,发现Go语言扩展包提供了一个带权重的信号量库Semaphore,使用信号量我们可以实现一个"工作池"控制一定数量的goroutine并发工作。因为对源码抱有好奇的态度,所以在周末仔细看了一下这个库并进行了解析,在这里记录一下。

何为信号量

要想知道一个东西是什么,我都爱去百度百科上搜一搜,输入"信号量",这答案不就来了。

百度百科解释:

信号量(Semaphore),有时被称为信号灯,是[多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端。确认这些信号量VI引用的是初始创建的信号量。

通过这段解释我们可以得知什么是信号量,其实信号量就是一种变量或者抽象数据类型,用于控制并发系统中多个进程对公共资源的访问,访问具有原子性。信号量主要分为两类:

  • 二值信号量:顾名思义,其值只有两种0或者1,相当于互斥量,当值为1时资源可用,当值为0时,资源被锁住,进程阻塞无法继续执行。
  • 计数信号量:信号量是一个任意的整数,起始时,如果计数器的计数值为0,那么创建出来的信号量就是不可获得的状态,如果计数器的计数值大于0,那么创建出来的信号量就是可获得的状态,并且总共获取的次数等于计数器的值。

信号量工作原理

信号量是由操作系统来维护的,信号量只能进行两种操作等待和发送信号,操作总结来说,核心就是PV操作:

  • P原语:P是荷兰语Proberen(测试)的首字母。为阻塞原语,负责把当前进程由运行状态转换为阻塞状态,直到另外一个进程唤醒它。操作为:申请一个空闲资源(把信号量减1),若成功,则退出;若失败,则该进程被阻塞;
  • V原语:V是荷兰语Verhogen(增加)的首字母。为唤醒原语,负责把一个被阻塞的进程唤醒,它有一个参数表,存放着等待被唤醒的进程信息。操作为:释放一个被占用的资源(把信号量加1),如果发现有被阻塞的进程,则选择一个唤醒之。

在信号量进行PV操作时都为原子操作,并且在PV原语执行期间不允许有中断的发生。

PV原语对信号量的操作可以分为三种情况:

  • 把信号量视为某种类型的共享资源的剩余个数,实现对一类共享资源的访问
  • 把信号量用作进程间的同步
  • 视信号量为一个加锁标志,实现对一个共享变量的访问

具体在什么场景使用本文就不在继续分析,接下来我们重点来看一下Go语言提供的扩展包Semaphore,看看它是怎样实现的。

官方扩展包Semaphore

我们之前在分析Go语言源码时总会看到这几个函数:

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

这几个函数就是信号量的PV操作,不过他们都是给Go内部使用的,如果想使用信号量,那就可以使用官方的扩展包:Semaphore,这是一个带权重的信号量,接下来我们就重点分析一下这个库。

安装方法:go get -u golang.org/x/sync

数据结构

type Weighted struct {
 size    int64 // 设置一个最大权值
 cur     int64 // 标识当前已被使用的资源数
 mu      sync.Mutex // 提供临界区保护
 waiters list.List // 阻塞等待的调用者列表
}

semaphore库核心结构就是Weighted,主要有4个字段:

  • size:这个代表的是最大权值,在创建Weighted对象指定
  • cur:相当于一个游标,来记录当前已使用的权值
  • mu:互斥锁,并发情况下做临界区保护
  • waiters:阻塞等待的调用者列表,使用链表数据结构保证先进先出的顺序,存储的数据是waiter对象,waiter数据结构如下:
type waiter struct {
 n     int64 // 等待调用者权重值
 ready chan<- struct{} // close channel就是唤醒
}

这里只有两个字段:

  • n:这个就是等待调用者的权重值
  • ready:这就是一个channel,利用channelclose机制实现唤醒

semaphore还提供了一个创建Weighted对象的方法,在初始化时需要给定最大权值:

// NewWeighted为并发访问创建一个新的加权信号量,该信号量具有给定的最大权值。
func NewWeighted(n int64) *Weighted {
 w := &Weighted{size: n}
 return w
}

阻塞获取权值的方法 - Acquire

先直接看代码吧:

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
 s.mu.Lock() // 加锁保护临界区
 // 有资源可用并且没有等待获取权值的goroutine
 if s.size-s.cur >= n && s.waiters.Len() == 0 {
  s.cur += n // 加权
  s.mu.Unlock() // 释放锁
  return nil
 }
 // 要获取的权值n大于最大的权值了
 if n > s.size {
  // 先释放锁,确保其他goroutine调用Acquire的地方不被阻塞
  s.mu.Unlock()
  // 阻塞等待context的返回
  <-ctx.Done()
  return ctx.Err()
 }
 // 走到这里就说明现在没有资源可用了
 // 创建一个channel用来做通知唤醒
 ready := make(chan struct{})
 // 创建waiter对象
 w := waiter{n: n, ready: ready}
 // waiter按顺序入队
 elem := s.waiters.PushBack(w)
 // 释放锁,等待唤醒,别阻塞其他goroutine
 s.mu.Unlock()

 // 阻塞等待唤醒
 select {
 // context关闭
 case <-ctx.Done():
  err := ctx.Err() // 先获取context的错误信息
  s.mu.Lock()
  select {
  case <-ready:
   // 在context被关闭后被唤醒了,那么试图修复队列,假装我们没有取消
   err = nil
  default:
   // 判断是否是第一个元素
   isFront := s.waiters.Front() == elem
   // 移除第一个元素
   s.waiters.Remove(elem)
   // 如果是第一个元素且有资源可用通知其他waiter
   if isFront && s.size > s.cur {
    s.notifyWaiters()
   }
  }
  s.mu.Unlock()
  return err
 // 被唤醒了
 case <-ready:
  return nil
 }
}

注释已经加到代码中了,总结一下这个方法主要有三个流程:

  • 流程一:有资源可用时并且没有等待权值的goroutine,走正常加权流程;
  • 流程二:想要获取的权值n大于初始化时设置最大的权值了,这个goroutine永远不会获取到信号量,所以阻塞等待context的关闭;
  • 流程三:前两步都没问题的话,就说明现在系统没有资源可用了,这时就需要阻塞等待唤醒,在阻塞等待唤醒这里有特殊逻辑;
  • 特殊逻辑二:context关闭后,则根据是否有可用资源决定通知后面等待唤醒的调用者,这样做的目的其实是为了避免当不同的context控制不同的goroutine时,未关闭的goroutine不会被阻塞住,依然执行,来看这样一个例子(因为goroutine的抢占式调度,所以这个例子也会具有偶然性):
  • 特殊逻辑一:如果在context被关闭后被唤醒了,那么就先忽略掉这个cancel,试图修复队列。
 func main()  {
 s := semaphore.NewWeighted(3)
 ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2)
 defer cancel()

 for i :=0; i < 3; i++{
   if i != 0{
    go func(num int) {
     if err := s.Acquire(ctx,3); err != nil{
      fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
      return
     }
     time.Sleep(2 * time.Second)
     fmt.Printf("goroutine: %d run over\n",num)
     s.Release(3)

    }(i)
   }else {
    go func(num int) {
     ct,cancel := context.WithTimeout(context.Background(), time.Second * 3)
     defer cancel()
     if err := s.Acquire(ct,3); err != nil{
      fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
      return
     }
     time.Sleep(3 * time.Second)
     fmt.Printf("goroutine: %d run over\n",num)
     s.Release(3)
    }(i)
   }

 }
 time.Sleep(10 * time.Second)
}

上面的例子中goroutine:0 使用ct对象来做控制,超时时间为3sgoroutine:1goroutine:2对象使用ctx对象来做控制,超时时间为2s,这三个goroutine占用的资源都等于最大资源数,也就是说只能有一个goruotine运行成功,另外两个goroutine都会被阻塞,因为goroutine是抢占式调度,所以我们不能确定哪个gouroutine会第一个被执行,这里我们假设第一个获取到信号量的是gouroutine:2,阻塞等待的调用者列表顺序是:goroutine:1 -> goroutine:0,因为在goroutine:2中有一个2s的延时,所以会触发ctx的超时,ctx会下发Done信号,因为goroutine:2goroutine:1都是被ctx控制的,所以就会把goroutine:1从等待者队列中取消,但是因为goroutine:1属于队列的第一个队员,并且因为goroutine:2已经释放资源,那么就会唤醒goroutine:0继续执行,画个图表示一下:

使用这种方式可以避免goroutine永久失眠。

不阻塞获取权值的方法 - TryAcquire

func (s *Weighted) TryAcquire(n int64) bool {
 s.mu.Lock() // 加锁
 // 有资源可用并且没有等待获取资源的goroutine
 success := s.size-s.cur >= n && s.waiters.Len() == 0
 if success {
  s.cur += n
 }
 s.mu.Unlock()
 return success
}

这个方法就简单很多了,不阻塞地获取权重为n的信号量,成功时返回true,失败时返回false并保持信号量不变。

释放权重

func (s *Weighted) Release(n int64) {
 s.mu.Lock()
 // 释放资源
 s.cur -= n
 // 释放资源大于持有的资源,则会发生panic
 if s.cur < 0 {
  s.mu.Unlock()
  panic("semaphore: released more than held")
 }
 // 通知其他等待的调用者
 s.notifyWaiters()
 s.mu.Unlock()
}

这里就是很常规的操作,主要就是资源释放,同时进行安全性判断,如果释放资源大于持有的资源,则会发生panic。

唤醒waiter

AcquireRelease方法中都调用了notifyWaiters,我们来分析一下这个方法:

func (s *Weighted) notifyWaiters() {
 for {
  // 获取等待调用者队列中的队员
  next := s.waiters.Front()
  // 没有要通知的调用者了
  if next == nil {
   break // No more waiters blocked.
  }

  // 断言出waiter信息
  w := next.Value.(waiter)
  if s.size-s.cur < w.n {
   // 没有足够资源为下一个调用者使用时,继续阻塞该调用者,遵循先进先出的原则,
   // 避免需要资源数比较大的waiter被饿死
   //
   // 考虑一个场景,使用信号量作为读写锁,现有N个令牌,N个reader和一个writer
   // 每个reader都可以通过Acquire(1)获取读锁,writer写入可以通过Acquire(N)获得写锁定
   // 但不包括所有的reader,如果我们允许reader在队列中前进,writer将会饿死-总是有一个令牌可供每个reader
   break
  }

  // 获取资源
  s.cur += w.n
  // 从waiter列表中移除
  s.waiters.Remove(next)
  // 使用channel的close机制唤醒waiter
  close(w.ready)
 }
}

这里只需要注意一个点:唤醒waiter采用先进先出的原则,避免需要资源数比较大的waiter被饿死。

何时使用Semaphore

到这里我们就把Semaphore的源代码看了一篇,代码行数不多,封装的也很巧妙,那么我们该什么时候选择使用它呢?

目前能想到一个场景就是Semaphore配合上errgroup实现一个"工作池",使用Semaphore限制goroutine的数量,配合上errgroup做并发控制,示例如下:

const (
 limit = 2
) 

func main()  {
 serviceName := []string{
  "cart",
  "order",
  "account",
  "item",
  "menu",
 }
 eg,ctx := errgroup.WithContext(context.Background())
 s := semaphore.NewWeighted(limit)
 for index := range serviceName{
  name := serviceName[index]
  if err := s.Acquire(ctx,1); err != nil{
   fmt.Printf("Acquire failed and err is %s\n", err.Error())
   break
  }
  eg.Go(func() error {
   defer s.Release(1)
   return callService(name)
  })
 }

 if err := eg.Wait(); err != nil{
  fmt.Printf("err is %s\n", err.Error())
  return
 }
 fmt.Printf("run success\n")
}

func callService(name string) error {
 fmt.Println("call ",name)
 time.Sleep(1 * time.Second)
 return nil
}

结果如下:

call  order
call  cart
call  account
call  item
call  menu
run success

总结

本文我们主要赏析了Go官方扩展库Semaphore的实现,他的设计思路简单,仅仅用几十行就完成了完美的封装,值得我们借鉴学习。不过在实际业务场景中,我们使用信号量的场景并不多,大多数场景我们都可以使用channel来替代,但是有些场景使用Semaphore来实现会更好,比如上篇文章【[[警惕] 请勿滥用goroutine]】我们使用channel+sync来控制goroutine数量,这种实现方式并不好,因为实际已经起来了多个goroutine,只不过控制了工作的goroutine数量,如果改用semaphore实现才是真正的控制了goroutine数量。

本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/QvuoWkwePxcSpaGkQbtlLg

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
Android插件化方案 6年以前  |  237375次阅读
vscode超好用的代码书签插件Bookmarks 2年以前  |  8244次阅读
 目录