文链接: go-zero 的自适应熔断器
上篇文章我们介绍了微服务的限流,详细分析了计数器限流和令牌桶限流算法,这篇文章来说说熔断。
熔断和限流还不太一样,限流是控制请求速率,只要还能承受,那么都会处理,但熔断不是。
在一条调用链上,如果发现某个服务异常,比如响应超时。那么调用者为了避免过多请求导致资源消耗过大,最终引发系统雪崩,会直接返回错误,而不是疯狂调用这个服务。
本篇文章会介绍主流熔断器的工作原理,并且会借助 go-zero 源码,分析 googleBreaker 是如何通过滑动窗口来统计流量,并且最终执行熔断的。
这部分主要介绍两种熔断器的工作原理,分别是 Netflix 开源的 Hystrix,其也是 Spring Cloud 默认的熔断组件,和 Google 的自适应的熔断器。
Hystrix is no longer in active development, and is currently in maintenance mode.
注意,Hystrix 官方已经宣布不再积极开发了,目前处在维护模式。
Hystrix 官方推荐替代的开源组件:Resilience4j,还有阿里开源的 Sentinel 也是不错的替代品。
Hystrix 采用了熔断器模式,相当于电路中的保险丝,系统出现紧急问题,立刻禁止所有请求,已达到保护系统的作用。
系统需要维护三种状态,分别是:
通过状态的变更,可以有效防止系统雪崩的问题。同时,在半断开状态下,又可以让系统进行自我修复。
googleBreaker 实现了一种自适应的熔断模式,来看一下算法的计算公式,客户端请求被拒绝的概率。
参数很少,也比较好理解:
通过分析公式,我们可以得到下面几个结论,也就是产生熔断的实际原理:
总的来说,googleBreaker 的实现方案更加优雅,而且参数也少,不用维护那么多的状态。
go-zero 就是采用了 googleBreaker 的方案,下面就来分析代码,看看到底是怎么实现的。
接口定义这部分我个人感觉还是挺不好理解的,看了好多遍才理清了它们之间的关系。
其实看代码和看书是一样的,书越看越薄,代码会越看越短。刚开始看感觉代码很长,随着看懂的地方越来越多,明显感觉代码变短了。所以遇到不懂的代码不要怕,反复看,总会看懂的。
首先来看一下 breaker 部分的 UML 图,有了这张图,很多地方看起来还是相对清晰的,下面来详细分析。
这里用到了静态代理模式,也可以说是接口装饰器,接下来就看看到底是怎么定义的:
// core/breaker/breaker.go
internalThrottle interface {
allow() (internalPromise, error)
doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
// core/breaker/googlebreaker.go
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
这个接口是最终实现熔断方法的接口,由 googleBreaker 结构体实现。
// core/breaker/breaker.go
throttle interface {
allow() (Promise, error)
doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
type loggedThrottle struct {
name string
internalThrottle
errWin *errorWindow
}
func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
return loggedThrottle{
name: name,
internalThrottle: t,
errWin: new(errorWindow),
}
}
这个是实现了日志收集的结构体,首先它实现了 throttle 接口,然后它包含了一个字段 internalThrottle,相当于具体的熔断方法是代理给 internalThrottle 来做的。
// core/breaker/breaker.go
func (lt loggedThrottle) allow() (Promise, error) {
promise, err :=lt.internalThrottle.allow()
return promiseWithReason{
promise: promise,
errWin: lt.errWin,
}, lt.logError(err)
}
func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
accept :=acceptable(err)
if !accept && err !=nil {
lt.errWin.add(err.Error())
}
return accept
}))
}
所以当它执行相应方法时,都是直接调用 internalThrottle 接口的方法,然后再加上自己的逻辑。
这也就是代理所起到的作用,在不改变原方法的基础上,扩展原方法的功能。
// core/breaker/breaker.go
circuitBreaker struct {
name string
throttle
}
// NewBreaker returns a Breaker object.
// opts can be used to customize the Breaker.
func NewBreaker(opts ...Option) Breaker {
var b circuitBreaker
for _, opt :=range opts {
opt(&b)
}
if len(b.name)==0 {
b.name=stringx.Rand()
}
b.throttle=newLoggedThrottle(b.name, newGoogleBreaker())
return &b
}
最终的熔断器又将功能代理给了 throttle。
这就是它们之间的关系,如果感觉有点乱的话,就反复看,看的次数多了,就清晰了。
上文介绍过了,loggedThrottle 是为了记录日志而设计的代理层,这部分内容来分析一下是如何记录日志的。
// core/breaker/breaker.go
type errorWindow struct {
// 记录日志的数组
reasons [numHistoryReasons]string
// 索引
index int
// 数组元素数量,小于等于 numHistoryReasons
count int
lock sync.Mutex
}
func (ew *errorWindow) add(reason string) {
ew.lock.Lock()
// 记录错误日志内容
ew.reasons[ew.index]=fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
// 对 numHistoryReasons 进行取余来得到数组索引
ew.index=(ew.index + 1) % numHistoryReasons
ew.count=mathx.MinInt(ew.count+1, numHistoryReasons)
ew.lock.Unlock()
}
func (ew *errorWindow) String() string {
var reasons []string
ew.lock.Lock()
// reverse order
for i :=ew.index - 1; i >=ew.index-ew.count; i-- {
reasons=append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
}
ew.lock.Unlock()
return strings.Join(reasons, "\n")
}
核心就是这里采用了一个环形数组,通过维护两个字段来实现,分别是 index 和 count。
count 表示数组中元素的个数,最大值是数组的长度;index 是索引,每次 +1,然后对数组长度取余得到新索引。
我之前有一次面试就让我设计一个环形数组,当时答的还不是很好,这次算是学会了。
一般来说,想要判断是否需要触发熔断,那么首先要知道一段时间的请求数量,一段时间内的数量统计可以使用滑动窗口来实现。
首先看一下滑动窗口的定义:
// core/collection/rollingwindow.go
type RollingWindow struct {
lock sync.RWMutex
// 窗口大小
size int
// 窗口数据容器
win *window
// 时间间隔
interval time.Duration
// 游标,用于定位当前应该写入哪个 bucket
offset int
// 汇总数据时,是否忽略当前正在写入桶的数据
// 某些场景下因为当前正在写入的桶数据并没有经过完整的窗口时间间隔
// 可能导致当前桶的统计并不准确
ignoreCurrent bool
// 最后写入桶的时间
// 用于计算下一次写入数据间隔最后一次写入数据的之间
// 经过了多少个时间间隔
lastTime time.Duration // start time of the last bucket
}
再来看一下 window 的结构:
type Bucket struct {
// 桶内值的和
Sum float64
// 桶内 add 次数
Count int64
}
func (b *Bucket) add(v float64) {
b.Sum +=v
b.Count++
}
func (b *Bucket) reset() {
b.Sum=0
b.Count=0
}
type window struct {
// 桶,一个桶就是一个时间间隔
buckets []*Bucket
// 窗口大小,也就是桶的数量
size int
}
有了这两个结构之后,我们就可以画出这个滑动窗口了,如图所示。
现在来看一下向窗口中添加数据,是怎样一个过程。
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
// 获取当前写入下标
rw.updateOffset()
// 向 bucket 中写入数据
rw.win.add(rw.offset, v)
}
func (rw *RollingWindow) span() int {
// 计算距离 lastTime 经过了多少个时间间隔,也就是多少个桶
offset :=int(timex.Since(rw.lastTime) / rw.interval)
// 如果在窗口范围内,返回实际值,否则返回窗口大小
if 0 <=offset && offset < rw.size {
return offset
}
return rw.size
}
func (rw *RollingWindow) updateOffset() {
// 经过了多少个时间间隔,也就是多少个桶
span :=rw.span()
// 还在同一单元时间内不需要更新
if span <=0 {
return
}
offset :=rw.offset
// reset expired buckets
// 这里是清除过期桶的数据
// 也是对数组大小进行取余的方式,类似上文介绍的环形数组
for i :=0; i < span; i++ {
rw.win.resetBucket((offset + i + 1) % rw.size)
}
// 更新游标
rw.offset=(offset + span) % rw.size
now :=timex.Now()
// align to interval time boundary
// 这里应该是一个时间的对齐,保持在桶内指向位置是一致的
rw.lastTime=now - (now-rw.lastTime)%rw.interval
}
// 向桶内添加数据
func (w *window) add(offset int, v float64) {
// 根据 offset 对数组大小取余得到索引,然后添加数据
w.buckets[offset%w.size].add(v)
}
// 重置桶数据
func (w *window) resetBucket(offset int) {
w.buckets[offset%w.size].reset()
}
我画了一张图,来模拟整个滑动过程:
主要经历 4 个步骤:
比如上图,刚开始 offset 指向了 bucket[1],经过了两个 span 之后,bucket[2] 和 bucket[3] 会被清空,同时,新的 offset 会指向 bucket[3],新添加的数据会写入到 bucket[3]。
再来看看数据统计,也就是窗口内的有效数据量是多少。
// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
rw.lock.RLock()
defer rw.lock.RUnlock()
var diff int
span :=rw.span()
// ignore current bucket, because of partial data
if span==0 && rw.ignoreCurrent {
diff=rw.size - 1
} else {
diff=rw.size - span
}
// 需要统计的 bucket 数量,窗口大小减去 span 数量
if diff > 0 {
// 获取统计的起始位置,span 是已经被重置的 bucket
offset :=(rw.offset + span + 1) % rw.size
rw.win.reduce(offset, diff, fn)
}
}
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
for i :=0; i < count; i++ {
// 自定义统计函数
fn(w.buckets[(start+i)%w.size])
}
}
统计出窗口数据之后,就可以判断是否需要熔断了。
接下来就是执行熔断了,主要就是看看自适应熔断是如何实现的。
// core/breaker/googlebreaker.go
const (
// 250ms for bucket duration
window=time.Second * 10
buckets=40
k=1.5
protection=5
)
窗口的定义部分,整个窗口是 10s,然后分成 40 个 bucket,每个 bucket 就是 250ms。
// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
func (b *googleBreaker) accept() error {
// 获取最近一段时间的统计数据
accepts, total :=b.history()
// 根据上文提到的算法来计算一个概率
weightedAccepts :=b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio :=math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
// 如果小于等于 0 直接通过,不熔断
if dropRatio <=0 {
return nil
}
// 随机产生 0.0-1.0 之间的随机数与上面计算出来的熔断概率相比较
// 如果随机数比熔断概率小则进行熔断
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts +=int64(b.Sum)
total +=b.Count
})
return
}
以上就是自适应熔断的逻辑,通过概率的比较来随机淘汰掉部分请求,然后随着服务恢复,淘汰的请求会逐渐变少,直至不淘汰。
func (b *googleBreaker) allow() (internalPromise, error) {
if err :=b.accept(); err !=nil {
return nil, err
}
// 返回一个 promise 异步回调对象,可由开发者自行决定是否上报结果到熔断器
return googlePromise{
b: b,
}, nil
}
// req - 熔断对象方法
// fallback - 自定义快速失败函数,可对熔断产生的err进行包装后返回
// acceptable - 对本次未熔断时执行请求的结果进行自定义的判定,比如可以针对http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err :=b.accept(); err !=nil {
// 熔断中,如果有自定义的fallback则执行
if fallback !=nil {
return fallback(err)
}
return err
}
defer func() {
// 如果执行req()过程发生了panic,依然判定本次执行失败上报至熔断器
if e :=recover(); e !=nil {
b.markFailure()
panic(e)
}
}()
err :=req()
// 上报结果
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}
return err
}
熔断器对外暴露两种类型的方法:
1、简单场景直接判断对象是否被熔断,执行请求后必须需手动上报执行结果至熔断器。
func (b *googleBreaker) allow() (internalPromise, error)
2、复杂场景下支持自定义快速失败,自定义判定请求是否成功的熔断方法,自动上报执行结果至熔断器。
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
个人感觉,熔断这部分代码,相较于前几篇文章,理解起来是更困难的。但其中的一些设计思想,和底层的实现原理也是非常值得学习的,希望这篇文章能够对大家有帮助。
以上就是本文的全部内容,如果觉得还不错的话欢迎点赞,转发和关注,感谢支持。
参考文章:
推荐阅读:
者 | 小马
编辑 | CV君
报道 | 我爱计算机视觉(微信id:aicvml)
本文提出了一种简单有效的鲁棒目标检测无监督自适应方法(SimROD)。为了克服域转移(domain shift)和伪标签噪声(pseudo-label noise)等问题,本文的方法集成了域中心增强方法(domain-centric augmentation method) 、渐进的自标签适应机制(gradual self-labeling adaptation procedure) 和教师指导的微调机制(teacher-guided fine-tuning mechanism) 。
使用本文的方法,目标域样本可以用来调整目标检测模型,而不改变模型结构或生成合成的数据。当应用于图像损坏和高级跨域自适应基准数据集上测试时,本文的方法在多个域自适应基准数据集上优于之前的baseline方法。
SimROD: A Simple Adaptation Method for Robust Object Detection
论文地址:https://arxiv.org/abs/2107.13389
代码地址:https://github.com/reactivetype/simrod
当测试集的数据分布和训练集相似时,SOTA的目标检测模型能够达到比较高的精度。但是,当部署到新环境中时,比如天气变化(如雨或雾)、光照条件变化或图像损坏(如运动模糊),模型的精度就会大幅度下降。
这种失败不利于自动驾驶等场景,在这些场景中,域转移是常见且不可避免的。为了使它们在以可靠性为关键的应用程序中取得更高的性能,使检测模型对域转移具有更强的鲁棒性是很重要的。
目前,已经提出了许多方法来克服域转移的目标检测。它们大致可以分为数据增强、域对齐、域映射和自标记技术。
数据增强 方法可以提高在一些固定的域位移集上的性能,但不能推广到与增强样本不相似的数据中。
域对齐 方法使用来自目标域的样本来对齐网络的中间特征。
域映射 方法使用图像到图像的转换网络(比如:GAN)将标记的源图像转换为看起来像未标记的目标域图像的新图像。
自标记 是一种不错的方法,因为它利用了来自目标域的未标记的训练样本。
然而,在域转移下生成准确的伪标签是困难的;当伪标签有噪声时,使用目标域样本进行自适应是无效的 。
在本文中,作者提出了一种简单的鲁棒对象检测自适应方法(SimROD),利用域混合数据增强和教师指导下的逐步自适应策略来减轻域转移的影响。SimROD主要有三个特点:
1)首先,它不需要目标域数据的Ground Truth标签,而是利用未标记的样本。
2)其次,它既不需要复杂的模型结构更改,也不需要生成模型来创建合成数据
3)第三,它与模型结构无关的,并不局限于基于区域的检测器。
给定一个参数为的目标检测的源模型M,该模型由源训练数据集进行训练,其中是一个图像,每个标签由目标类别和边界框坐标组成。原始源数据D的输入分布与目标测试集数据分布之间存在偏移的情况。即,而。
在无监督的域自适应设置中,可以目标域取出了一组未标记的图像,在训练中可以使用这部分数据。任务的目标是将模型参数更新到中,以在源测试集和目标测试集上都能实现良好的性能。为了有效地利用中的附加信息,需要解决两个问题:
1)首先,目标训练集没有Ground Truth标签。
2)其次,利用源模型为生成伪标签会导致由域位移引起的有噪声监督,阻碍了自适应过程。
本文提出了简单的自适应方法SimROD,以实现鲁棒的目标检测模型。SimROD集成了一种教师指导的微调 、一种新的DomainMix增强方 法和一种逐步适应技术 。
本文方法的motivation是标签噪声会被域位移加剧 。因此,本文的方法旨在在目标域图像上生成准确的伪标签,并将来自源域和目标域的混合图像一起使用,从而为模型的调整提供强有力的监督信号。由于学生目标模型可能不足以生成准确的伪标签,作者首先使用可以生成高质量伪标签的辅助教师模型,然后再用学生模型进行微调。整个算法的流程如上图所示。大致可以分为几步:
1)基于源数据,训练一个比学生模型容量大的源教师模型,得到参数。源教师模型用于生成目标数据上的初始伪标签。
2)利用逐步适应算法,将大型教师模型参数从逐步更改为。在这一步中,使用的是由DomainMix增强生成的混合图像,而不是单独的源数据集或者目标数据集的图片。
3)使用自适应的教师模型参数来细化目标数据上的伪标签。然后,使用这些伪标签来微调学生模型。
这种方法的一个好处是,它可以使小模型和大模型同时适应域的转移,因为即使在学生网络很小时,它也能产生高质量的伪标签。另一个优点是,教师和学生不需要共享相同的结构。因此,教师模型可以选择的一个参数量大、计算量大的模型来提高精度,学生模型可以选择一个轻量级的模型。
在本文中,作者提出了一种新的增强方法DomainMix。如上图所示,它均匀地对来自源域和目标域的图像进行采样,并将这些图像连同其(伪)标签混合到一个新的图像中。
上图显示了一个来自自然和艺术领域的DomainMix 图像的示例。
DomainMix使用了许多简单的想法来减轻域的转移和标签噪声:
DomainMix的数据增强方法如上图所示,对于一个Batch中的每一幅图像,首先从源和目标数据中随机抽取另外三张图像,并混合这些图像的随机crop,在2×2的模板中创建一个新的域混合图像。并将伪标签和真实标签都标注到混合的图片中,目标的边界框坐标是根据新的混合图像中每个crop的相对位置计算的。此外,作者使用加权平衡采样器从这两个域中进行均匀采样。
接下来,作者提出了一个逐步适应的方法来优化检测模型的参数,该算法减轻了标签噪声的影响。由源模型生成的伪标签可能在目标域图像上有噪声,直接微调模型所有的层会阻碍模型的适应。
作者提出了一种分阶段性的方法。首先,冻结了所有的卷积层,在前w个epoch只适应BN层,在第一阶段结束后,BN层的参数就被更新了。然后使用部分适应的模型来生成更精确的伪标签,为了简单起见,它被离线完成。在第二阶段,所有的层都被解冻,然后使用精细的伪标签进行微调。在这两个阶段,都使用由DomainMix增强生成的混合图像样本。算法的流程如下所示:
上表展示了Sim10K到Cityscapes上,本文方法和SOTA方法的对比。
上图展示了与之前的baseline相比,SimROD将模型从Sim10K调整到Cityscapes的有效性。
上表显示了KITTI到Cityscapes上,本文提出的SimROD在性能上优于各种baseline方法。
上表展示了VOC(真实图片)到Watercolor(水彩画)数据集上的性能对比。
上表显示了Pascal-C、COCO-C和Cityscapes-C数据集上的Yolov5m模型的实验结果。
上表显示了Yolov5m模型在Pascal-C数据集上不同模块消融的实验结果。
上表展示了一些本文方法和其他方法的一些检测结果的例子。
在本文中,作者提出了一种简单而有效的无监督方法来适应域位移下的检测模型。本文中的自标记框架采用了一种域中心的增强方法和教师指导的微调适应模型。基于现有的小模型和大模型,本文的方法在模型鲁棒性方面取得了显著的性能增益。
本文的方法不仅减轻了由于低级图像损坏而引起的域位移的影响,而且在源域和目标域之间存在高级风格差异时,它也可以适应模型。
这篇解决的问题是如何让生成的伪标签更加好 ,因为如果直接用源模型生成伪标签效果并不好,会存在噪声。所以作者就把源模型生成的伪标签定义为初始伪标签,然后用一种数据增强的方式,将伪标签和真实标签都放在同一张图片中,类似CutMix,对教师模型进行微调。
微调完了之后,再对目标数据集生成伪标签,这样的伪标签相比于初始伪标签的效果会更好。因此学生模型在这样的伪标签上进行微调效果也会更好。
击上方蓝字关注“小郑搞码事”,每天都能学到知识,搞懂一个问题!
关于用样式来处理图片自适应的问题,下面分析一下两种法。
有一个做法,大家都很熟悉,图片作为div的背景。然后,应用background-size和background-position这两属性,就能很方便地按比例来缩放。
在响应式的环境下展示图片,平时一般都这么搞。
如下代码:
主要注意background-size的取值,可以是固定值,也可以是百分比,更可以是cover,contain。
1、若取值cover
其定义就是:把背景图像扩展至足够大,以使背景图像完全覆盖背景区域。背景图像的某些部分也许无法显示在背景定位区域中。(图片同比缩放、塞满整个容器,而图片多余的部分则被剪掉了)如下图:
2、若取值contain
其定义就是:把图像扩展至最大尺寸,以使其宽度和高度完全适应内容区域。(图片同比缩放至图片能完全显示在容器中,多余空间留白),如下图:
上面将图片设置成背景的方法,不好的地方是你无法设置图片的懒加载、图片无法被搜索引擎或者其他类似的工具抓取到。然而,再现了方法二。
有一个属性叫object-fit,直接可以让图片自适应布局。
先看一下它的兼容性吧
咋一看,也不很很差,最少移动端还是基本可以兼容的。这个属性有几个常用的取值:
1、fill(填充)
替换内容拉伸填满整个content box, 不保证保持原有的比例。
2、contain(包含)
保持原有尺寸比例,效果图与background-size:contain对应的。
3、cover(覆盖)
宽度和高度至少有一个和容器一致。效果图与background-size:cover对应的
4、none
保持原有尺寸比例。
我做了一张完整的图,大家对比看一下:
最后总结:
出于性能和其它因素考虑,建议大家可以多关注和使用方法二,方法一是出现的比较早的方法,也是常用方式。
*请认真填写需求信息,我们会在24小时内与您取得联系。