[wait](https://pkg.go.dev/k8s.io/apimachinery/pkg/util/wait) 包提供了通过轮询或者监听一个条件的修改(关闭channel, ctx.Done,...)来执行指定函数的工具函数.
这些函数可以分为四大类
* [Until 类](http://icebergu.com/archives/client-go-wait#until-%E7%B1%BB%E7%9B%91%E5%90%AC-channel-%E6%88%96%E8%80%85-context): 根据 channel 的关闭或者 context Done 的信号来结束对指定函数的轮询操作
* [Poll 类](http://icebergu.com/archives/client-go-wait#poll-%E7%B1%BB%E5%9E%8B%E4%BC%9A%E6%A0%B9%E6%8D%AE-conditionfunc-%E5%87%BD%E6%95%B0%E6%9D%A5%E5%86%B3%E5%AE%9A%E6%98%AF%E5%90%A6%E9%80%80%E5%87%BA%E5%BE%AA%E7%8E%AF):不只是会根据 channel 或者 context 来决定结束轮询,还会判断轮询函数的返回值来决定是否结束
* [Wait 类](http://icebergu.com/archives/client-go-wait#wait-%E7%B1%BB%E4%BC%9A%E6%A0%B9%E6%8D%AE-waitfunc-%E6%9D%A5%E5%86%B3%E5%AE%9A%E4%BB%80%E4%B9%88%E6%98%AF%E5%90%A6%E6%89%A7%E8%A1%8C-conditionfunc): 会根据 WaitFor 函数返回的 channel 来触发函数执行
* [Backoff 类](http://icebergu.com/archives/client-go-wait#backoff-%E7%B1%BB%E6%A0%B9%E6%8D%AE-backoff-%E6%9D%A5%E8%AE%A1%E7%AE%97%E5%87%BD%E6%95%B0%E6%89%A7%E8%A1%8C%E7%9A%84%E6%97%B6%E9%97%B4%E9%97%B4%E9%9A%94):会根据 Backoff 返回的时间间隔来循环触发函数的执行
在介绍具体函数前,介绍一下用于对轮询的时间间隔进行抖动干扰的函数 `Jitter`
```go
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
return duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
}
```
`Jitter` 会返回一个 `[duration, (duration+maxFactor*duration) )` 区间的时间间隔
## Until 类,监听 channel 或者 context
**`until 类`** 的函数参数都具有相同的意义
* `f` 表示具体执行的函数
* `period` 表示轮询的时间间隔
* `jitterFactor` 表示对轮询时间间隔加一个抖动 `Jitter(period, jitterFactor)`
* `sliding` 为 True 表示执行函数的时间间隔是从 `f` 函数执行就开始计时,False 表示 `f` 函数执行完成后开始计时
* `stopCh` 表示当 `stopCh` 被关闭则结束轮询
* `ctx` 表示当 ctx.Done 是结束函数的轮询
|函数|作用|
|---|---|
|func **`JitterUntil`**(`f` func(), `period` time.Duration, `jitterFactor` float64, `sliding` bool, `stopCh` <-chan struct{})|
|func **`JitterUntilWithContext`**(`ctx` context.Context, `f` func(context.Context), `period` time.Duration, `jitterFactor` float64, `sliding` bool) |
|func **`Until`**(`f` func(), `period` time.Duration, `stopCh` <-chan struct{})| 相当于 `sliding` 为 True ,无抖动的 `JitterUntil`
|func **`NoSlidingUntil`**(`f` func(), `period` time.Duration, `stopCh` <-chan struct{})|`sliding` 为 False|
|func **`UntilWithContext`**(`ctx` context.Context, `f` func(context.Context), `period` time.Duration)||
|func **`NoSlidingUntilWithContext`**(ctx context.Context, f func(context.Context), period time.Duration)||
|func **`Forever`**(`f` func(), `period` time.Duration)| 永久轮询执行 `f`,相当于 `Until(f, period, NeverStop)`
根据函数名称可以一些规律:
`Jitter*` 带时间抖动参数 `jitterFactor`
`NoSliding*` 默认从执行结束后开始计算下一次执行时间间隔
`Until*` 默认从`f` 执行开始计算下一次的执行时间间隔
## Poll 类型,会根据 ConditionFunc 函数来决定是否退出循环
```go
type ConditionFunc func()(done bool, err error)
```
**当调用 `ConditionFunc` 返回 `true` 或者 `error` 时,结束对 `ConditionFunc` 的轮询,并且如果 `ConditionFunc` 返回了 error 那么`Poll*` 也会返回该错误**
Poll 类型的轮询函数同样具有一些相同意义的参数:
* `interval` 为执行 `ConditionFunc` 的时间间隔
* `timeout` 表示 Poll 类型函数的执行超时时间,如果在 `timeout` 时间内 `ConditionFunc` 都没有返回 `true` 或者 `error` 时,返回 `ErrWaitTimeout`
|函数|作用|
|---|---|
|func **`Poll`**(`interval`, `timeout` time.Duration, `condition` ConditionFunc) error | 在执行 `ConditionFunc` 之前会等待 `interval` 的时间|
|func **`PollUntil`**(`interval` time.Duration, `condition` ConditionFunc, `stopCh` <-chan struct{}) error|监听 `stopCh` 的 `Poll`
|func **`PollImmediate`**(`interval`, `timeount` time.Duration, `condition` ConditionFunc) error |执行 `ConditionFunc` 之后会等待 `interval` 的时间
|func **`PollImmediateInfinite`**(`interval` time.Duration, `condition` ConditionFunc) error |没有过期时间的 `PollImmediate`
|func **`PollImmediateUntil`**(`interval` time.Duration, `condition` ConditionFunc, `stopCh` <- chan struct{}) error| 监听 `stopCh` 是否关闭 的 `PollImmediate`
`Poll*` 和 `PollImmediate*` 两种函数的区别是:
**`PollImmediate*` 会先执行 `ConditionFunc` ,然后在等待 `interval` 的时间,而 `Poll*` 会先等待 `interval`**
## Wait 类,会根据 WaitFunc 来决定什么是否执行 ConditionFunc
```go
type WaitFunc func(done <- struct{}) <- chan struct{}
type WaitFor(wait WaitFunc, fn ConditionFunc, done <- chan struct{}) error {
stopCh := make(chan struct{})
defer close(stopCh)
c := wait(staopCh)
for {
select {
case _, open := <-c:
ok, err := fn()
if err != nil{
return err
}
if ok {
return nil
}
if !open{
return ErrWaitTimeount
}
case <-done:
return ErrWaitTimeout
}
}
```
`WaitFor` 从 `WaitFunc` 中获取一个 channel,当 channel 可读是执行 `fn`,然后检查 channel 是否被关闭,如果关闭则退出函数
> 即使 channel 的关闭操作也会触发一次 fn 执行
当 `done` 关闭时,直接返回 ErrorWaitTimeout,不过这个时候需要注意:
**由于 select 是随机选择的,即使 `done` 被关闭了,`fn` 在 channel 可读的情况下依然可能会执行一次或者 n 次**
> 在 WaitFunc 中关闭返回的 channel 会效果更好
#### Poll类函数与 WaitFor
Poll 类的函数实际底层便是调用 WaitFor
```go
func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
return pollInternal(pooler(interval, timeout), condition)
}
func pollInterval(wait WaitFunc, condition ConditionFunc) error {
done := make(chan struct{})
defer close(done)
return WaitFor(wait, condition, done)
}
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
ctx, cancel := contextForChannel(stopCh)
defer cancel()
return WaitFor(poller(interval, 0), condition, ctx.Done())
}
```
`poller` 返回的便是一个 `WaitFunc`
```go
func poller(interval, timeout time.Duration) WaitFunc {
return WaitFunc(func(done <-chan struct{}) <-chan struct{} {
// 新建一个 channel 用于返回
ch := make(chan struct{})
go func() {
defer close(ch)
// 时间间隔的 Ticker
tick := time.NewTicker(interval)
defer tick.Stop()
var after <-chan time.Time
if timeout != 0 {
// 超时的定时器
timer := time.NewTimer(timeout)
after = time.C
defer timer.Stop()
}
for {
select {
case <-tick.C:
select {
case ch <- struct{}{};
default:
}
case <-after:
// defer 中关闭 ch
return
case <-done:
return
}
}
}()
return ch
}
}
```
**poller 会根据 interval 的 Ticker 来向 channel 中发送数据,发送数据的时间是不受 `ConditionFunc` 执行时间的限制,也就是说可能会 interval 过短可能会导致连续。**
## Backoff 类,根据 Backoff 来计算函数执行的时间间隔
Backoff 类型的函数会根据 `Backoff` 或者 `BackoffManager` 返回的时间间隔来决定函数的执行间隔
```go
type Backoff struct {
// 时间间隔,用于调用 Step 方法时返回的时间间隔
Duration time.Duration
// 用于计算下次的时间间隔
// Factor 大于 0 时,Backoff 在计算下次的时间间隔时都会根据 Duration * Factor,Factor * Duration 不能大于 Cap
// 不能为负数
Factor float64
// Jitter > 0 时,每次迭代的时间间隔都会额外加上 0 - Duration * Jitter 的随机时间,并且抖动出的时间不会设置为 Duration,而且不受 Caps 的限制
Jitter float64
// 进行指数回退(*Factor) 操作的次数
// 当 Factor * Duration > Cap 时 Steps 会被设置为 0, Duration 设置为 Cap
// 也就是说后续的迭代时间间隔都会返回 Duration
Steps int
// 最大的时间间隔
Cap time.Duration
}
```
`Backoff` 的 `Step` 方法返回通过 `Factor`, `Cap`, `Jitter` 计算的时间间隔
```go
func (b *Backoff) Step() time.Duration {
// Steps < 1 不再进行指数回退操作
if b.Steps < 1 {
if b.Jitter > 0 {
return Jitter(b.Duraion, b.Jitter)
}
return b.Duration
}
b.Steps--
duration := b.Duraion
// 如果 Factor 等于 0,那就不需要修改 Duration
if b.Factor != 0{
// 修改下次调用 Step Duration,作为下次指数回退的 base
b.Duraion = time.Duration(float64(b.Duration) * b.Factor)
if b.Cap > 0 && b.Duration > b.Cap {
// 到达 Cap 限制,以后默认返回 Cap 或者 Jitter(Cap, b.Jitter)
b.Duration = b.Cap
b.Steps = 0
}
}
if b.Jitter > 0{
return Jitter(b.Duration, b.Jitter)
}
return duration
}
```
`BackoffManger` 定义了 `Backoff` 方法,返回一个定时器,用于触发函数的执行
```go
type BackoffManager interface {
Backoff() clock.Timer
}
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager
```
`ExponentialBackoffManger` 会使用 `initBackoff`, `maxBackoff`, `backoffFactor`, `jitter` 来初始化一个 `Backoff` 对象
它的 Backoff 方法时间返回了一个使用 `Backoff.Step` 作为时间间隔的定时器
**不过两次 `Backoff.Step` 的时间间隔不能大于 `resetDuration` 否则会重置 `Backoff`**
```go
func (b *expontialBackoffManagerImpl) getNextBackoff() time.Duration {
if b.clock.Now().Sub(b.lastBackoffStart) > b.backResetDuration {
b.backoff.Steps = math.MaxInt32
b.backoff.Duration = b.initialBackoff
}
b.lastBackoffStart = b.clock.Now()
return b.backoff.Step()
}
```
也就是说如果 `backoff.Step()` 返回的值大于 `resetDuration`,那么下次返回的时间间隔便是 `initBackoff`
**通常用于上游状态异常用来较少负载**
另外一种 `BackoffManager` 是 `JitterBackoffManager`,返回的是根据 jitter 抖动的时间间隔
```go
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
if j.jitter > 0.0{
// 计算出来的抖动的时间间隔并不会影响 j.duration
return Jitter(j.duration, j.jitter)
}
return j.duration
}
```
## Backoff 类轮询函数
|函数|作用|
|---|---|
|func **`BackUntil`**(`f` func(), `backoff` BackoffManager, `sliding` book, `stopCh` <-chan struct{})|根据 Backoff 的定时器来循环触发 `f`,直到 `stopCh` 关闭
|func **`ExponentialBackoff`**(`backoff` Backoff, `condition` ConditionFunc) error |根据 `backoff` 的时间间隔来循环执行 `f`
|func **`ExponentialBackoffWithContext`**(`ctx` context.Context, `backoff` Backoff, `condition` ConditionFunc) error ||
`ExponentialBackoff*` 函数中 `condition` 会执行 `backoff.Steps` 次,到达次数并且 `condition` 没有返回 True 或 error,那么就返回 ErrWaitTimeout
如果 `backoff.Steps` 为 0 会直接返回 `ErrWaitTimeout`
```go
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
for backoff.Steps > 0 {
if ok ,err := condition(); err != nil || ok {
return err
}
if backoff.Steps == 1 {
break
}
time.Sleep(backoff.Step())
}
return ErrWaitTimeout
}
```
深入 kubernetes 的 Wait 工具包