想问怎么配置 KPA?看 Serving 文档 Configuring autoscaling

大概弹性的流程还是贴贴官网图片就好

serving-resources

以上就是 Serving 中四个重要 CRD 的关系,实际使用过程中我们只定义 Service 这个 Object。

其中 Serving 组件提供了弹性能力,而其 workflow 大体如下

   +---------------------+
   | ROUTE               |
   |                     |
   |   +-------------+   |
   |   | Istio Route |---------------+
   |   +-------------+   |           |
   |         |           |           |
   +---------|-----------+           |
             |                       |
             |                       |
             | inactive              | active
             |  route                | route
             |                       |
             |                       |
             |                +------|---------------------------------+
             V         watch  |      V                                 |
       +-----------+   first  |   +- ----+  create   +------------+    |
       | Activator |------------->| Pods |<----------| Deployment |<--------------+
       +-----------+          |   +------+           +------------+    |          |
             |                |       |                                |          | resize
             |   activate     |       |                                |          |
             +--------------->|       |                                |          |
                              |       |               metrics          |   +------------+
                              |       +----------------------------------->| Autoscaler |
                              |                                        |   +------------+
                              |                                        |
                              | REVISION                               |
                              +----------------------------------------+

(以上摘自 Autoscaling

Serving 提供了基于 KPA 和 HPA 两种弹性方式,但以下只讨论 KPA 方式。

本文基于 [email protected] 展开。

上点代码

咱们先不扯别的直接先撸计算 DesiredReplicas 的代码。

serving/pkg/autoscaler/autoscaler.go 里有这么一个函数

// Scale calculates the desired scale based on current statistics given the current time.
// desiredPodCount is the calculated pod count the autoscaler would like to set.
// validScale signifies whether the desiredPodCount should be applied or not.
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {
...
}

看大意就是说计算下当下(now)时间点期望的 Pod 数以及 额外突发量(EBC、excessBC)。

期望 Pod 能理解,但 EBC 是个什么玩意。。咱们先继续往下看

func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {
    // 获取当前 DeciderSpec 和 实例数
    spec, podCounter := a.currentSpecAndPC()
    originalReadyPodsCount, err := podCounter.ReadyCount()
    // If the error is NotFound, then presume 0.
    if err != nil && !apierrors.IsNotFound(err) {
        return 0, 0, false
    }
    // 最小 ready 设为 1
    readyPodsCount := math.Max(1, float64(originalReadyPodsCount))

    metricKey := types.NamespacedName{Namespace: a.namespace, Name: a.revision}

    metricName := spec.ScalingMetric
    var observedStableValue, observedPanicValue float64
    // 决策 Scaling 类型,获取 Stable 和 Panic 两种模式下的指标平均值
    switch spec.ScalingMetric {
    case autoscaling.RPS:
...
    default:
...
    }

    maxScaleUp := math.Ceil(spec.MaxScaleUpRate * readyPodsCount)
    maxScaleDown := math.Floor(readyPodsCount / spec.MaxScaleDownRate)

    // 根据阈值计算下 Stable 和 Panic 分别期望的实例数
    dspc := math.Ceil(observedStableValue / spec.TargetValue)
    dppc := math.Ceil(observedPanicValue / spec.TargetValue)

    // 修整下实例数边界
    desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))
    desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))

    // 判定 Panic 值是否已经超过了 Panic 阈值(是否应该进入 Panic 模式)    isOverPanicThreshold := observedPanicValue/readyPodsCount >= spec.PanicThreshold

    a.stateMux.Lock()
    defer a.stateMux.Unlock()
    if a.panicTime.IsZero() && isOverPanicThreshold {
        // 进入 Panic 模式
        logger.Info("PANICKING")
        a.panicTime = now
        a.reporter.ReportPanic(1)
    } else if !a.panicTime.IsZero() && !isOverPanicThreshold && a.panicTime.Add(spec.StableWindow).Before(now) {
        // 需要解除 Panic,已经超过 stable 窗口时间了
        logger.Info("Un-panicking.")
        a.panicTime = time.Time{}
        a.maxPanicPods = 0
        a.reporter.ReportPanic(0)
    }

    // 设置最终期望的 Pod 数
    if !a.panicTime.IsZero() {
        logger.Debug("Operating in panic mode.")
        // panic 模式下只扩不缩
        if desiredPanicPodCount > a.maxPanicPods {
            logger.Infof("Increasing pods from %d to %d.", originalReadyPodsCount, desiredPanicPodCount)
            a.panicTime = now
            a.maxPanicPods = desiredPanicPodCount
        } else if desiredPanicPodCount < a.maxPanicPods {
            logger.Debugf("Skipping decrease from %d to %d.", a.maxPanicPods, desiredPanicPodCount)
        }
        desiredPodCount = a.maxPanicPods
    } else {
        // Stable 模式,取 stable 计算后的 pod 数
        logger.Debug("Operating in stable mode.")
        desiredPodCount = desiredStablePodCount
    }

    // Compute the excess burst capacity based on stable value for now, since we don't want to
    // be making knee-jerk decisions about Activator in the request path. Negative EBC means
    // that the deployment does not have enough capacity to serve the desired burst off hand.
    // EBC = TotCapacity - Cur#ReqInFlight - TargetBurstCapacity
    
    // EBC = 当前所有 Ready Pod 的总设定容量 - 当前稳定量 - 突发容忍量
    // 所以这玩意也就是:超了多少,后面用于熔断场景。    excessBC = int32(-1)
    switch {
    case a.deciderSpec.TargetBurstCapacity == 0:
        excessBC = 0
    case a.deciderSpec.TargetBurstCapacity >= 0:
        excessBC = int32(math.Floor(float64(originalReadyPodsCount)*a.deciderSpec.TotalValue - observedStableValue -
            a.deciderSpec.TargetBurstCapacity))
        logger.Infof("PodCount=%v Total1PodCapacity=%v ObservedStableValue=%v TargetBC=%v ExcessBC=%v",
            originalReadyPodsCount,
            a.deciderSpec.TotalValue,
            observedStableValue, a.deciderSpec.TargetBurstCapacity, excessBC)
    }

    a.reporter.ReportExcessBurstCapacity(float64(excessBC))
    a.reporter.ReportDesiredPodCount(int64(desiredPodCount))

    return desiredPodCount, excessBC, true
}

过完这把,也差不多知道这个是怎么算,算的是啥的。不过还需要详细看下 stableValuepanicValue 的计算方式。RPS 和 Concurrency 模式的两个计算都是差不多的,所以我们只看并发模式下的计算就好。

计算核心在 serving/pkg/autoscaler/collector.go:L332

// stableAndPanicStats calculates both stable and panic concurrency based on the
// given stats buckets.
func (c *collection) stableAndPanicStats(now time.Time, buckets *aggregation.TimedFloat64Buckets) (float64, float64, error) {
    spec := c.currentMetric().Spec
    var (
        panicAverage  aggregation.Average
        stableAverage aggregation.Average
    )
    if !buckets.ForEachBucket(
        aggregation.YoungerThan(now.Add(-spec.PanicWindow), panicAverage.Accumulate),
        aggregation.YoungerThan(now.Add(-spec.StableWindow), stableAverage.Accumulate),
    ) {
        return 0, 0, ErrNoData
    }

    return stableAverage.Value(), panicAverage.Value(), nil
}

func YoungerThan(oldest time.Time, acc Accumulator) Accumulator {
    return func(time time.Time, bucket float64Bucket) {
        if !time.Before(oldest) {
            acc(time, bucket)
        }
    }
}

// Average is used to keep the values necessary to compute an average.
type Average struct {
    sum   float64
    count float64
}

// Accumulate accumulates the values needed to compute an average.
func (a *Average) Accumulate(_ time.Time, bucket float64Bucket) {
    a.sum += bucket.sum()
    a.count++
}

// Value returns the average or 0 if no buckets have been accumulated.
func (a *Average) Value() float64 {
    if a.count == 0 {
        return 0
    }
    return a.sum / a.count
}

看完这波代码懂了,算不同时间窗口(Stable 和 Panic) 里的 metric 的均值,函数名都写得那么高级害我以为又是什么高端代码。

然后往上查,看从那边调用(此处省略无数代码),确定是在 serving/pkg/reconciler/autoscaling/kpa/controller.go:[email protected] 调用了,而 cmd 中调用了 NewController

结论

通过 Stable 和 Panic 两种时间窗口来优化弹性的效率,感觉还是挺赞的

over