深入 kubernetes 的 Wait 工具包

Posted by icebergu on 03-27,2021

wait 包提供了通过轮询或者监听一个条件的修改(关闭channel, ctx.Done,...)来执行指定函数的工具函数.
这些函数可以分为四大类

  • Until 类: 根据 channel 的关闭或者 context Done 的信号来结束对指定函数的轮询操作
  • Poll 类:不只是会根据 channel 或者 context 来决定结束轮询,还会判断轮询函数的返回值来决定是否结束
  • Wait 类: 会根据 WaitFor 函数返回的 channel 来触发函数执行
  • Backoff 类:会根据 Backoff 返回的时间间隔来循环触发函数的执行

在介绍具体函数前,介绍一下用于对轮询的时间间隔进行抖动干扰的函数 Jitter

func Jitter(duration time.Duration, maxFactor float64) time.Duration {
	if maxFactor <= 0.0 {
		maxFactor = 1.0
	}
	return duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
}

Jitter 会返回一个 [duration, (duration+maxFactor*duration) ) 区间的时间间隔

Until 类,监听 channel 或者 context

until 类 的函数参数都具有相同的意义

  • f 表示具体执行的函数
  • period 表示轮询的时间间隔
  • jitterFactor 表示对轮询时间间隔加一个抖动 Jitter(period, jitterFactor)
  • sliding 为 True 表示执行函数的时间间隔是从 f 函数执行就开始计时,False 表示 f 函数执行完成后开始计时
  • stopCh 表示当 stopCh 被关闭则结束轮询
  • ctx 表示当 ctx.Done 是结束函数的轮询
函数作用
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{})
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool)
func Until(f func(), period time.Duration, stopCh <-chan struct{})相当于 sliding 为 True ,无抖动的 JitterUntil
func NoSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{})sliding 为 False
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration)
func NoSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration)
func Forever(f func(), period time.Duration)永久轮询执行 f,相当于 Until(f, period, NeverStop)

根据函数名称可以一些规律:
Jitter* 带时间抖动参数 jitterFactor
NoSliding* 默认从执行结束后开始计算下一次执行时间间隔
Until* 默认从f 执行开始计算下一次的执行时间间隔

Poll 类型,会根据 ConditionFunc 函数来决定是否退出循环

type ConditionFunc func()(done bool, err error)

当调用 ConditionFunc 返回 true 或者 error 时,结束对 ConditionFunc 的轮询,并且如果 ConditionFunc 返回了 error 那么Poll* 也会返回该错误

Poll 类型的轮询函数同样具有一些相同意义的参数:

  • interval 为执行 ConditionFunc 的时间间隔
  • timeout 表示 Poll 类型函数的执行超时时间,如果在 timeout 时间内 ConditionFunc 都没有返回 true 或者 error 时,返回 ErrWaitTimeout
函数作用
func Poll(interval, timeout time.Duration, condition ConditionFunc) error在执行 ConditionFunc 之前会等待 interval 的时间
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error监听 stopChPoll
func PollImmediate(interval, timeount time.Duration, condition ConditionFunc) error执行 ConditionFunc 之后会等待 interval 的时间
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error没有过期时间的 PollImmediate
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <- chan struct{}) error监听 stopCh 是否关闭 的 PollImmediate

Poll*PollImmediate* 两种函数的区别是:
PollImmediate* 会先执行 ConditionFunc ,然后在等待 interval 的时间,而 Poll* 会先等待 interval

Wait 类,会根据 WaitFunc 来决定什么是否执行 ConditionFunc

type WaitFunc func(done <- struct{}) <- chan struct{}

type WaitFor(wait WaitFunc, fn ConditionFunc, done <- chan struct{}) error {
	stopCh := make(chan struct{})
	defer close(stopCh)
	c := wait(staopCh)
	for {
		select {
		case _, open := <-c:
			ok, err := fn()
			if err != nil{
				return err
			}
			if ok {
				return nil
			}
			if !open{
				return ErrWaitTimeount
			}
		case <-done:
			return ErrWaitTimeout
	}
}

WaitForWaitFunc 中获取一个 channel,当 channel 可读是执行 fn,然后检查 channel 是否被关闭,如果关闭则退出函数

即使 channel 的关闭操作也会触发一次 fn 执行

done 关闭时,直接返回 ErrorWaitTimeout,不过这个时候需要注意:
由于 select 是随机选择的,即使 done 被关闭了,fn 在 channel 可读的情况下依然可能会执行一次或者 n 次

在 WaitFunc 中关闭返回的 channel 会效果更好

Poll类函数与 WaitFor

Poll 类的函数实际底层便是调用 WaitFor

func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
	return pollInternal(pooler(interval, timeout), condition)
}
func pollInterval(wait WaitFunc, condition ConditionFunc) error {
	done := make(chan struct{})
	defer close(done)
	return WaitFor(wait, condition, done)
}

func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
	ctx, cancel := contextForChannel(stopCh)
	defer cancel()
	return WaitFor(poller(interval, 0), condition, ctx.Done())
}

poller 返回的便是一个 WaitFunc

func poller(interval, timeout time.Duration) WaitFunc {
	return WaitFunc(func(done <-chan struct{}) <-chan struct{} {
		// 新建一个 channel 用于返回
		ch := make(chan struct{})
		
		go func() {
			defer close(ch)
			
			// 时间间隔的 Ticker
			tick := time.NewTicker(interval)
			defer tick.Stop()
			
			var after <-chan time.Time
			if timeout != 0 {
				// 超时的定时器
				timer := time.NewTimer(timeout)
				after = time.C
				defer timer.Stop()
			}
			for {
				select {
				case <-tick.C:
					select {
					case ch <- struct{}{};
					default:
					}
				case <-after:
				    // defer 中关闭 ch
					return
				case <-done:
					return
				}
			}
		}()
		return ch
	}
}

poller 会根据 interval 的 Ticker 来向 channel 中发送数据,发送数据的时间是不受 ConditionFunc 执行时间的限制,也就是说可能会 interval 过短可能会导致连续。

Backoff 类,根据 Backoff 来计算函数执行的时间间隔

Backoff 类型的函数会根据 Backoff 或者 BackoffManager 返回的时间间隔来决定函数的执行间隔

type Backoff struct {
	// 时间间隔,用于调用 Step 方法时返回的时间间隔
	Duration time.Duration

	// 用于计算下次的时间间隔
	// Factor 大于 0 时,Backoff 在计算下次的时间间隔时都会根据 Duration * Factor,Factor * Duration 不能大于 Cap
	// 不能为负数
	Factor float64

	// Jitter > 0 时,每次迭代的时间间隔都会额外加上 0 - Duration * Jitter 的随机时间,并且抖动出的时间不会设置为 Duration,而且不受 Caps 的限制
	Jitter  float64

	// 进行指数回退(*Factor) 操作的次数
	// 当 Factor * Duration > Cap 时 Steps 会被设置为 0, Duration 设置为 Cap
	// 也就是说后续的迭代时间间隔都会返回 Duration
	Steps int
	
	// 最大的时间间隔
	Cap time.Duration
}

BackoffStep 方法返回通过 Factor, Cap, Jitter 计算的时间间隔

func (b *Backoff) Step() time.Duration {
    // Steps < 1 不再进行指数回退操作
	if b.Steps < 1 {
		if b.Jitter > 0 {
			return Jitter(b.Duraion, b.Jitter)
		}
		return b.Duration
	}
	b.Steps--
	duration := b.Duraion
	// 如果 Factor 等于 0,那就不需要修改 Duration
	if b.Factor != 0{
	    // 修改下次调用 Step   Duration,作为下次指数回退的 base
		b.Duraion = time.Duration(float64(b.Duration) * b.Factor)
		if b.Cap > 0 && b.Duration > b.Cap {
			// 到达 Cap 限制,以后默认返回 Cap 或者 Jitter(Cap, b.Jitter)
			b.Duration = b.Cap
			b.Steps = 0
		}
	}
	if b.Jitter > 0{
		return Jitter(b.Duration, b.Jitter)
	}
	return duration
}

BackoffManger 定义了 Backoff 方法,返回一个定时器,用于触发函数的执行

type BackoffManager interface {
	Backoff() clock.Timer
}

func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager

ExponentialBackoffManger 会使用 initBackoff, maxBackoff, backoffFactor, jitter 来初始化一个 Backoff 对象
它的 Backoff 方法时间返回了一个使用 Backoff.Step 作为时间间隔的定时器
不过两次 Backoff.Step 的时间间隔不能大于 resetDuration 否则会重置 Backoff

func (b *expontialBackoffManagerImpl) getNextBackoff() time.Duration {
	if b.clock.Now().Sub(b.lastBackoffStart) > b.backResetDuration {
		b.backoff.Steps = math.MaxInt32
		b.backoff.Duration = b.initialBackoff
	}
	b.lastBackoffStart = b.clock.Now()
	return b.backoff.Step()
}

也就是说如果 backoff.Step() 返回的值大于 resetDuration,那么下次返回的时间间隔便是 initBackoff
通常用于上游状态异常用来较少负载

另外一种 BackoffManagerJitterBackoffManager,返回的是根据 jitter 抖动的时间间隔

func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager

func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
	if j.jitter > 0.0{
		//  计算出来的抖动的时间间隔并不会影响 j.duration 
		return Jitter(j.duration, j.jitter)
	}
	return j.duration
}

Backoff 类轮询函数

函数作用
func BackUntil(f func(), backoff BackoffManager, sliding book, stopCh <-chan struct{})根据 Backoff 的定时器来循环触发 f,直到 stopCh 关闭
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error根据 backoff 的时间间隔来循环执行 f
func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionFunc) error

ExponentialBackoff* 函数中 condition 会执行 backoff.Steps 次,到达次数并且 condition 没有返回 True 或 error,那么就返回 ErrWaitTimeout
如果 backoff.Steps 为 0 会直接返回 ErrWaitTimeout

func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
	for backoff.Steps > 0 {
		if ok ,err := condition(); err != nil || ok {
			return err
		}
		if backoff.Steps == 1 {
			break
		}
		time.Sleep(backoff.Step())
	}
	return ErrWaitTimeout
}