在这篇文章中,我想谈谈一个基于流行的开源项目 hystrix
的 circuit breaker
(断路器)模式(实际上,我会看看 golang 版本的hystrix-go[1],而不是用 Java 编写的原始版本[2])。
在本文的第一部分中,我将对 circuit breaker
进行一般性介绍,让你了解它是什么以及它为什么重要。此外,让我们回顾一下 hystrix-go
和 hystrix
的项目背景,并通过一个小演示示例了解基本用法。
分布式架构中的软件通常具有许多依赖项,并且每个依赖项(即使是最可靠的服务)在某些时候失败是不可避免的。
如果我们失败的服务变得无响应会发生什么?所有依赖它的服务也有变得无响应的风险。这就是所谓的 catastrophic cascading failure
(灾难性级联故障)。
断路器背后的基本思想非常简单。断路器通过包装对目标服务的调用来工作,并持续监控故障率。一旦故障达到某个阈值,断路器将跳闸,并且对电路的所有进一步调用都返回故障或错误。
断路器模式背后的设计理念是fail fast
:当一个服务变得无响应时,依赖它的其他服务应该停止等待它并开始处理失败的服务可能不可用的事实。通过防止单个服务的故障在整个系统中发生级联故障,断路器模式有助于快速恢复整个系统。
断路器模式可以实现为如下所示的有限状态机:
有三种状态 open
、closed
和 half-open
:
open
状态。fallback
调用逻辑(由开发人员定义)来处理失败。断路器会在open
调用状态保持一段时间sleeping window
,之后断路器可以从 open
过渡到 half-open
。reset
中断返回 closed
状态。否则断路器将转换回open
状态。这是断路器的基本背景,你可以在网上找到更多有关[3]它的信息。
接下来,让我们调查一下 hystrix
这个项目。
hystrix
是一个非常流行的开源项目。你可以在此链接[4]中找到有关它的所有信息。
我想从上面的链接中引用几个要点。Hystrix 旨在执行以下操作:
你可以看到hystrix
完美地实现了我们在上一节中谈到的断路器模式的想法,对吧?
该hystrix
项目是用Java
实现的。在本文中,我更喜欢使用 golang 版本hystrix-go
,它是一个简化版本,但实现了有关断路器的所有主要设计和想法。
hystrix-go
的用法可以在这个链接[5]中找到,非常简单易懂。你可以轻松地在网上找到许多其他带有演示示例的文章,以展示更多使用级别的内容。请前往阅读。
在我的文章中,我想深入研究 hystrix-go
的源代码,并对 circuit breaker
的实现方式进行深入调研。请继续阅读以下部分。
Hystrix
提供了三种不同的服务降级策略来避免在整个系统中发生 cascading failure
这种情况:timeout(超时)
、maximum concurrent request numbers(最大并发请求数)
和 request error rate(请求错误率)
。
hystrix
会记录每次服务调用的响应状态,当错误率达到阈值后,breaker 会打开,在 breaker 状态变回 close 之前会执行 fallback 逻辑。error rate
策略是最复杂的一种。从 hystrix
的基本用法可以看出:
import (
"github.com/afex/hystrix-go/hystrix"
"time"
)
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
Timeout: int(10 * time.Second),
MaxConcurrentRequests: 100,
ErrorPercentThreshold: 25,
})
hystrix.Go("my_command", func() error {
// talk to dependency services
return nil
}, func(err error) error {
// fallback logic when services are down
return nil
})
在上面的使用案例中,可以看到 timeout
设置为 10 秒,最大并发请求数为 100,错误率阈值为 25%。
在消费者应用程序级别,这几乎是你需要设置的所有配置。hystrix
将使魔术在内部发生。
在本文中,我计划通过查看 hystrix
源代码向你展示其内部结构。
让我们从简单的开始:max concurrent requests
和timeout
。然后继续探索复杂的策略request error rate
。
根据上面的例子,你可以看到 Go
函数是 hystrix
源代码的大门,所以我们从它开始:
func Go(name string, run runFunc, fallback fallbackFunc) chan error {
runC := func(ctx context.Context) error {
return run()
}
var fallbackC fallbackFuncC
if fallback != nil {
fallbackC = func(ctx context.Context, err error) error {
return fallback(err)
}
}
return GoC(context.Background(), name, runC, fallbackC)
}
Go
函数接受三个参数:
circuit
。Go
函数只是封装了带 Context
的 run
和 fallback
,Context 用于控制和取消 goroutine,不熟悉的可以参考这篇文章[6]。最后它会调用 GoC
函数。
GoC
函数如下:
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
// construct a new command instance
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
// get circuit by command name
circuit, _, err := GetCircuit(name)
if err != nil {
cmd.errChan <- err
return cmd.errChan
}
cmd.circuit = circuit
//declare a condition variable sync.Cond: ticketCond, to synchronize among goroutines
//declare a flag variable: ticketChecked, work together with ticketCond
ticketCond := sync.NewCond(cmd)
ticketChecked := false
// declare a function: returnTicket, will execute when a concurrent request is done to return `ticket`
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
}
// declare a sync.Once instance: returnOnce, make sure the returnTicket function execute only once
returnOnce := &sync.Once{}
// declare another function: reportAllEvent, used to collect the metrics
reportAllEvent := func() {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
if err != nil {
log.Printf(err.Error())
}
}
// launch a goroutine which executes the `run` logic
go func() {
defer func() { cmd.finished <- true }()
if !cmd.circuit.AllowRequest() {
cmd.Lock()
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrCircuitOpen)
reportAllEvent()
})
return
}
cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
reportAllEvent()
})
return
}
runStart := time.Now()
runErr := run(ctx)
returnOnce.Do(func() {
defer reportAllEvent()
cmd.runDuration = time.Since(runStart)
returnTicket()
if runErr != nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")
})
}()
// launch the second goroutine for timeout strategy
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
case <-ctx.Done():
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
case <-timer.C:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()
return cmd.errChan
}
我承认它很复杂,但它也是整个hystrix
项目的核心。耐心点,让我们仔细一点一点地回顾它。
首先,GoC
函数的代码结构如下:
Command
对象,其中包含每次调用GoC
函数的所有信息。GetCircuit(name)
函数按名称获取 circuit breaker
(如果不存在则创建它)。sync.Cond
)条件变量 ticketCond 和 ticketChecked 用于 goroutine 之间的通信。error rate
策略至关重要。sync.Once
的实例,这是 golang 提供的另一个有趣的 synchronization primitives
(同步原语)。max concurrent request number
策略,第二个包含 timeout
策略。channel
类型值让我们一一回顾它们。
command
结构体如下,它嵌入了 sync.Mutex 并定义了几个字段:
type command struct {
sync.Mutex
ticket *struct{}
start time.Time
errChan chan error
finished chan bool
circuit *CircuitBreaker
run runFuncC
fallback fallbackFuncC
runDuration time.Duration
events []string
}
请注意,command
对象本身不包含命令名称信息,其生命周期仅在一次GoC
调用的范围内。这意味着有关服务请求的统计指标,类似 error rate
和 concurrent request number
,不存储在命令对象中。相反,这些指标存储在类型 CircuitBreaker
的circuit字段中。
正如我们在函数 GoC
的工作流程中提到的,调用 GetCircuit(name)
来获取或创建circuit breaker
。它的实现在circuit.go
文件内:
func init() {
circuitBreakersMutex = &sync.RWMutex{}
circuitBreakers = make(map[string]*CircuitBreaker)
}
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
circuitBreakersMutex.RLock()
_, ok := circuitBreakers[name]
if !ok {
circuitBreakersMutex.RUnlock()
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()
if cb, ok := circuitBreakers[name]; ok {
return cb, false, nil
}
circuitBreakers[name] = newCircuitBreaker(name)
} else {
defer circuitBreakersMutex.RUnlock()
}
return circuitBreakers[name], !ok, nil
}
逻辑非常简单。所有的断路器都存储在一个映射对象 circuitBreakers 中,以命令名作为键。
构造函数 newCircuitBreaker
和CircuitBreaker
结构体如下:
type CircuitBreaker struct {
Name string
open bool
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64
executorPool *executorPool // used in the strategy of max concurrent request number
metrics *metricExchange // used in the strategy of request error rate
}
func newCircuitBreaker(name string) *CircuitBreaker {
c := &CircuitBreaker{}
c.Name = name
c.metrics = newMetricExchange(name)
c.executorPool = newExecutorPool(name)
c.mutex = &sync.RWMutex{}
return c
}
CircuitBreaker
的所有字段对于理解断路器的工作原理都很重要。
有两个非简单类型的字段需要更多分析,executorPool
和 metrics
。
max concurrent request number
策略。request error rate
策略。我们可以在文件 pool.go
中找到 executorPool
逻辑:
type executorPool struct {
Name string
Metrics *poolMetrics
Max int
Tickets chan *struct{} // Tickets channel
}
func newExecutorPool(name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
p.Max = getSettings(name).MaxConcurrentRequests
p.Tickets = make(chan *struct{}, p.Max)
// send Max numbers of value into the Tickets channel
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{}
}
return p
}
它利用 golangchannel
来实现 max concurrent request number
策略。请注意创建了具有MaxConcurrentRequests容量缓冲通道的 Tickets
字段。在下面的 for 循环中,通过将值发送到通道直到达到容量,使缓冲的通道充满。
如上所示,在GoC
函数的第一个 goroutine 中,Tickets
channel 的使用如下:
go func() {
...
select {
case cmd.ticket = <-circuit.executorPool.Tickets: // receive ticket from Tickets channel
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency) // run fallback logic when concurrent requests reach threshold
reportAllEvent()
})
return
}
...
}()
每次调用GoC
函数都会从 circuit.executorPool.Tickets 通道中获取一张 ticket,直到没有 ticket 为止,这意味着并发请求的数量达到了阈值。在这种情况下,default
case 将执行,并且服务将使用回退逻辑优雅地降级。
另一方面,每次调用GoC
完成后,都需要将ticket发送回 circuit.executorPool.Tickets,对吗?你还记得上面提到的 returnTicket
函数吗?是的,它用于此目的。GoC
中定义的 returnTicket
函数如下:
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket) // return ticket to the executorPool
cmd.Unlock()
}
它调用executorPool.Return
函数:
// Return function in pool.go file
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return
}
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
p.Tickets <- ticket // send ticket back to Tickets channel
}
ticket 的设计和实现是golang channel
真实世界应用的一个很好的例子。
总之, max concurrent request number
策略可以说明如下:
在上面的部分中,我们仔细回顾了 hystrix
中的 max concurrent requests
策略,希望你能从中学到一些有趣的东西。
现在让我们在下一节中一起研究 timeout
策略。
Timeout
与max concurrent request number
策略相比,timeout
理解起来非常直白。
正如我们在上一节中提到的,hystrix
核心逻辑在 GoC
函数内部。GoC
函数内部运行了两个 goroutine。你已经看到第一个 goroutine 包含向目标服务发送请求的逻辑和max concurrent request number
。第二个 goroutine 怎么样?让我们回顾一下:
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
// returnOnce has been executed in another goroutine
case <-ctx.Done():
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
case <-timer.C:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()
请注意,使用设置中的超时持续时间值创建 Timer。一个 select
语句让这个 goroutine 等待,直到一个case
条件从通道接收到值。timeout 情况只是第三种情况(前两种情况未触发时),它将运行带有 ErrTimeout 错误消息的后备逻辑。
到目前为止,你应该清楚这两个 goroutine 的主要结构和功能。但具体来说,有两种 Golang 技术需要你注意:sync.Once
和 sync.Cond
。
## sync.Once
你可能已经注意到以下代码块,它在 GoC
函数内部重复了多次:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout) // with various error types
reportAllEvent()
})
returnOnce 是 sync.Once
类型实例,它确保Do
方法的回调函数在不同的 goroutine 中只运行一次。
在这种特定情况下,它可以保证 returnTicket() 和 reportAllEvent() 都只执行一次。这确实是有道理的,因为如果一个 GoC
调用会运行 returnTicket() 多次,那么当前的并发请求数将不正确,对吧?
我写了另一篇关于sync.Once
详细的文章,你可以参考那篇文章[7]以获得更深入的解释。
sync.Cond
returnTicket 函数的实现如下:
ticketCond := sync.NewCond(cmd)
ticketChecked := false
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait() // hang the current goroutine
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
}
ticketCond 是一个条件变量,在 Golang 中它是sync.Cond
。
条件变量在不同 goroutine 之间的通信中很有用。具体来说,sync.Cond
的 Wait
方法会挂起当前的 goroutine,Signal
方法会唤醒阻塞的 goroutine 继续执行。
在 hystrix
中,当 ticketChecked 为 false 时,表示当前调用尚未结束,ticket不应退还。因此,ticketCond.Wait() 被调用来阻塞这个 goroutine 并等待 GoC
调用完成,这是由 Signal
方法通知的。
ticketChecked = true
ticketCond.Signal()
注意上面两行代码总是一起调用的。ticketChecked 设置为 true 表示当前GoC
调用已完成,ticket 已准备好返回。而且,Wait
挂起 goroutine 的方法放在了一个 for 循环中,这也是一种最佳实践技术。
更多关于 sync.Cond
的解释,我以后会写另一篇文章来解释,请稍等。
Fallback
最后,让我们看看当目标服务没有响应时,fallback 函数是如何被调用的。
让我们回想一下,每次 GoC
调用都会创建一个新的命令实例。并且 fallback 函数会被分配给同名的字段,后面会用到。
cmd := &command{
run: run,
fallback: fallback, // fallback logic here
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
正如我们在上面的部分中看到的,errorWithFallback 方法在 timeout
或max concurrent request number
达到阈值时被触发。
func (c *command) errorWithFallback(ctx context.Context, err error) {
eventType := "failure"
if err == ErrCircuitOpen {
eventType = "short-circuit"
} else if err == ErrMaxConcurrency {
eventType = "rejected"
} else if err == ErrTimeout {
eventType = "timeout"
} else if err == context.Canceled {
eventType = "context_canceled"
} else if err == context.DeadlineExceeded {
eventType = "context_deadline_exceeded"
}
c.reportEvent(eventType)
fallbackErr := c.tryFallback(ctx, err)
if fallbackErr != nil {
c.errChan <- fallbackErr
}
}
errorWithFallback 方法将通过调用 tryFallback 运行 fallback 并报告指标事件,例如 fallback-failure 和 fallback-success。
func (c *command) tryFallback(ctx context.Context, err error) error {
if c.fallback == nil {
return err
}
fallbackErr := c.fallback(ctx, err) // execute the fallback logic here
if fallbackErr != nil {
c.reportEvent("fallback-failure")
return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err)
}
c.reportEvent("fallback-success")
return nil
}
在上面,我们谈到 timeout
是 hystrix
中提供的所有策略中最简单的一种策略。还回顾了一些详细的 Golang 技术,以便更好地理解复杂的代码逻辑。
在这篇文章中,我们谈到了 hystrix
中详细实现的 max concurrent requests
和timeout
策略。还回顾了一些详细的 Golang 技术,以便更好地理解复杂的代码逻辑。
我把 error rate
策略留给你,请深入代码库并探索更多关于熔断的信息。
原文链接:https://levelup.gitconnected.com/how-to-write-a-circuit-breaker-in-golang-9ebd5644738c。
[1]hystrix-go: https://github.com/afex/hystrix-go
[2]原始版本: https://github.com/Netflix/Hystrix
[3]有关: https://martinfowler.com/bliki/CircuitBreaker.html
[4]链接: https://github.com/Netflix/Hystrix/wiki
[5]链接: https://github.com/afex/hystrix-go
[6]这篇文章: https://baoqger.github.io/2021/04/26/golang-context-source-code/
[7]那篇文章: https://baoqger.github.io/2021/05/11/golang-sync-once/
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/S6MdTmH4dUSIyBa0i88NMg
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。