本篇是入坑后的填土工作, 主要会放在比如像 SchedulingPreference 这种玩意上。

SchedulingManager

当前主要面向的是 ReplicatedTypes 比如说 Deployment、ReplicaSet 这样一类的对象,因为他们有实例数结合多集群需要做实例数调度。先来看看相关的新结构有什么

// ReplicaSchedulingPreferenceSpec defines the desired state of ReplicaSchedulingPreference
type ReplicaSchedulingPreferenceSpec struct {
    // TODO (@irfanurrehman); upgrade this to label selector only if need be.
    // The idea of this API is to have a a set of preferences which can
    // be used for a target FederatedDeployment or FederatedReplicaset.
    // Although the set of preferences in question can be applied to multiple
    // target objects using label selectors, but there are no clear advantages
    // of doing that as of now.
    // To keep the implementation and usage simple, matching ns/name of RSP
    // resource to the target resource is sufficient and only additional information
    // needed in RSP resource is a target kind (FederatedDeployment or FederatedReplicaset).
    TargetKind string `json:"targetKind"`

    // Total number of pods desired across federated clusters.
    // Replicas specified in the spec for target deployment template or replicaset
    // template will be discarded/overridden when scheduling preferences are
    // specified.
    TotalReplicas int32 `json:"totalReplicas"`

    // If set to true then already scheduled and running replicas may be moved to other clusters
    // in order to match current state to the specified preferences. Otherwise, if set to false,
    // up and running replicas will not be moved.
    // +optional
    Rebalance bool `json:"rebalance,omitempty"`

    // A mapping between cluster names and preferences regarding a local workload object (dep, rs, .. ) in
    // these clusters.
    // "*" (if provided) applies to all clusters if an explicit mapping is not provided.
    // If omitted, clusters without explicit preferences should not have any replicas scheduled.
    // +optional
    Clusters map[string]ClusterPreferences `json:"clusters,omitempty"`
}

// Preferences regarding number of replicas assigned to a cluster workload object (dep, rs, ..) within
// a federated workload object.
type ClusterPreferences struct {
    // Minimum number of replicas that should be assigned to this cluster workload object. 0 by default.
    // +optional
    MinReplicas int64 `json:"minReplicas,omitempty"`

    // Maximum number of replicas that should be assigned to this cluster workload object.
    // Unbounded if no value provided (default).
    // +optional
    MaxReplicas *int64 `json:"maxReplicas,omitempty"`

    // A number expressing the preference to put an additional replica to this cluster workload object.
    // 0 by default.
    Weight int64 `json:"weight,omitempty"`
}

// ReplicaSchedulingPreferenceStatus defines the observed state of ReplicaSchedulingPreference
type ReplicaSchedulingPreferenceStatus struct {
}

// +kubebuilder:object:root=true
// +kubebuilder:resource:path=replicaschedulingpreferences,shortName=rsp

type ReplicaSchedulingPreference struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   ReplicaSchedulingPreferenceSpec   `json:"spec,omitempty"`
    Status ReplicaSchedulingPreferenceStatus `json:"status,omitempty"`
}

SchedulingManagerController

其实跟其他几个 controller 基本的套路是差不多的

  1. 初始化 Reconcile Worker
  2. 初始化 FTC(FederatedTypeConfig)Informer
  3. Run 将 Informer 和 Worker 拉起

所以上述就不多说了,直接奔向 func (*SchedulingManager) reconcile,因为 Scheduling 的部分比较解耦,所以会穿插着看代码,写起来也比较零碎

reconcile

获取 SchedulingType

// pkg/controller/schedulingmanager/controller.go

func (c *SchedulingManager) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
  key := qualifiedName.String()

  klog.V(3).Infof("Running reconcile FederatedTypeConfig %q in scheduling manager", key)

  typeConfigName := qualifiedName.Name
  schedulingType := schedulingtypes.GetSchedulingType(typeConfigName)
  if schedulingType == nil {
    // No scheduler supported for this resource
    return util.StatusAllOK
  }
  schedulingKind := schedulingType.Kind
  ...
}

上面的部分是 reconcile 的开篇,根据 qualifiedName(其实也就是 FederatedTypeConfig 的 meta)去取 SchedulingType 也可以认为就是「拿 FederatedTypeConfig.name 去取」(下文 SchedulingType 即为缩写 ST),如果没有取到则不对 FTC 做任何处理。

那这部分 ST 又是怎么来的?是通过代码预注册来的,来看看。

SchedulingType

type SchedulingType struct {
    Kind             string
    SchedulerFactory SchedulerFactory
}

// Mapping of qualified target name (e.g. deployment.apps) targeted
// for scheduling with scheduling type
var typeRegistry = make(map[string]SchedulingType)

func RegisterSchedulingType(kind string, schedulingType SchedulingType) {
    existing, ok := typeRegistry[kind]
    if ok {
        panic(fmt.Sprintf("Kind %q is already registered for scheduling with %q", kind, existing.Kind))
    }
    typeRegistry[kind] = schedulingType
}

func SchedulingTypes() map[string]SchedulingType {
    result := make(map[string]SchedulingType)
    for key, value := range typeRegistry {
        result[key] = value
    }
    return result
}

func GetSchedulingType(kind string) *SchedulingType {
    schedulingType, ok := typeRegistry[kind]
    if ok {
        return &schedulingType
    }
    return nil
}

SchedulingType 就两个变量,一个是对于 FederatedType 的 name(这边写为 kind,也可以认为是 target type 的 GK 形式),另一个是对应的 Scheduler Factory。外部通过 RegisterSchedulingType 注册到一个包内变量。

当前有且仅有两种 ST 就在 pkg/schedulingtypes/replicasscheduler.go 里注册的,分别是 deployments.appsreplicasets.apps,他们的 factory 都是 ReplicaScheduler(这个咱们放后面说)。

注意点

我们不难发现在 ST 中并不关注 API 的版本,只关注了其名字和所在的 Group。也就是说就 APIResource 升级来说

回到 reconcile

func (c *SchedulingManager) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
  key := qualifiedName.String()
  ...
  cachedObj, exist, err := c.store.GetByKey(key)
  if err != nil {
    runtime.HandleError(errors.Wrapf(err, "Failed to query FederatedTypeConfig store for %q in scheduling manager", key))
    return util.StatusError
  }

  if !exist {
    c.stopScheduler(schedulingKind, typeConfigName)
    return util.StatusAllOK
  }

  typeConfig := cachedObj.(*corev1b1.FederatedTypeConfig)
  if !typeConfig.GetPropagationEnabled() || typeConfig.DeletionTimestamp != nil {
    c.stopScheduler(schedulingKind, typeConfigName)
    return util.StatusAllOK
  }
  // 以上从 cache 中获取对应的 FTC 对象

  // 下面这个 set 其实是比较迷惑的,在里面设了 TargetType.PluralName, TargetType.Group, FederatedType.PluralName, StatusType
  // 然后还写了个 TODO(marun) will name always be populated?
  // 疑问点:为什么需要 SET?一脸懵逼,可能是担心某些地方漏了所以糊一糊?但这应由 webhook 处理来着。。。  // set name and group for the type config target
  corev1b1.SetFederatedTypeConfigDefaults(typeConfig)

  // 以下将会以 ST 创建一个 SchedulingPreferenceController 然后判断是否已经启用对应 FTC name 的 Plugin,如果没有就开起来
  // Scheduling preference controller is started on demand
  abstractScheduler, ok := c.schedulers.Get(schedulingKind)
  if !ok {
    klog.Infof("Starting schedulingpreference controller for %s", schedulingKind)
    stopChan := make(chan struct{})
    schedulerInterface, err := schedulingpreference.StartSchedulingPreferenceController(c.config, *schedulingType, stopChan)
    if err != nil {
      runtime.HandleError(errors.Wrapf(err, "Error starting schedulingpreference controller for %s", schedulingKind))
      return util.StatusError
    }
    abstractScheduler = newSchedulerWrapper(schedulerInterface, stopChan)
    c.schedulers.Store(schedulingKind, abstractScheduler)
  }

  scheduler := abstractScheduler.(*SchedulerWrapper)
  if scheduler.HasPlugin(typeConfigName) {
    // Scheduler and plugin already running for this target typeConfig
    return util.StatusAllOK
  }

  federatedKind := typeConfig.GetFederatedType().Kind
  klog.Infof("Starting plugin %s for %s", federatedKind, schedulingKind)
  err = scheduler.StartPlugin(typeConfig)
  if err != nil {
    runtime.HandleError(errors.Wrapf(err, "Error starting plugin %s for %s", federatedKind, schedulingKind))
    return util.StatusError
  }
  scheduler.pluginMap.Store(typeConfigName, federatedKind)

  return util.StatusAllOK

以上基本就是 SchedulingManager 的全部了,那接下来就是去看看启动的这一坨 Factory 和 Controller。

SchedulingPreferenceController

// SchedulingPreferenceController starts a new controller for given type of SchedulingPreferences
func StartSchedulingPreferenceController(config *util.ControllerConfig, schedulingType schedulingtypes.SchedulingType, stopChannel <-chan struct{}) (schedulingtypes.Scheduler, error) {
    controller, err := newSchedulingPreferenceController(config, schedulingType)
    if err != nil {
        return nil, err
    }
    if config.MinimizeLatency {
        controller.minimizeLatency()
    }
    klog.Infof("Starting replicaschedulingpreferences controller")
    controller.Run(stopChannel)
    return controller.scheduler, nil
}

// newSchedulingPreferenceController returns a new SchedulingPreference Controller for the given type
func newSchedulingPreferenceController(config *util.ControllerConfig, schedulingType schedulingtypes.SchedulingType) (*SchedulingPreferenceController, error) {
    userAgent := fmt.Sprintf("%s-controller", schedulingType.Kind)
    kubeConfig := restclient.CopyConfig(config.KubeConfig)
    restclient.AddUserAgent(kubeConfig, userAgent)
    kubeClient, err := kubeclientset.NewForConfig(kubeConfig)
    if err != nil {
        return nil, err
    }

    broadcaster := record.NewBroadcaster()
    broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "replicaschedulingpreference-controller"})

    s := &SchedulingPreferenceController{
        clusterAvailableDelay:   config.ClusterAvailableDelay,
        clusterUnavailableDelay: config.ClusterUnavailableDelay,
        smallDelay:              time.Second * 3,
        eventRecorder:           recorder,
    }

    s.worker = util.NewReconcileWorker(s.reconcile, util.WorkerTiming{
        ClusterSyncDelay: s.clusterAvailableDelay,
    })

    eventHandlers := schedulingtypes.SchedulerEventHandlers{
        KubeFedEventHandler: s.worker.EnqueueObject,
        ClusterEventHandler: func(obj pkgruntime.Object) {
            qualifiedName := util.NewQualifiedName(obj)
            s.worker.EnqueueForRetry(qualifiedName)
        },
        ClusterLifecycleHandlers: &util.ClusterLifecycleHandlerFuncs{
            ClusterAvailable: func(cluster *fedv1b1.KubeFedCluster) {
                // When new cluster becomes available process all the target resources again.
                s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
            },
            // When a cluster becomes unavailable process all the target resources again.
            ClusterUnavailable: func(cluster *fedv1b1.KubeFedCluster, _ []interface{}) {
                s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
            },
        },
    }
  // 在此之前都是老套路,client、informer、ClusterLifecycleHandler,不赘述了

  // 拿 ST.Factory 初始化了一个 scheduler 出来
    scheduler, err := schedulingType.SchedulerFactory(config, eventHandlers)
    if err != nil {
        return nil, err
    }
    s.scheduler = scheduler

    // Build deliverer for triggering cluster reconciliations.
    s.clusterDeliverer = util.NewDelayingDeliverer()

  // 下面的 ObjectType 是 kuebfed 自己的一个 CRD,当前从 ReplicaScheduler 获取的都是 `&fedschedulingv1a1.ReplicaSchedulingPreference{}`
    s.store, s.controller, err = util.NewGenericInformer(
        config.KubeConfig,
        config.TargetNamespace,
        s.scheduler.ObjectType(),
        util.NoResyncPeriod,
        s.worker.EnqueueObject,
    )
    if err != nil {
        return nil, err
    }

    return s, nil
}

// 那就启动咯~~
func (s *SchedulingPreferenceController) Run(stopChan <-chan struct{}) {
    go s.controller.Run(stopChan)
    s.scheduler.Start()

    s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
        s.reconcileOnClusterChange()
    })

    s.worker.Run(stopChan)

    // Ensure all goroutines are cleaned up when the stop channel closes
    go func() {
        <-stopChan
        s.clusterDeliverer.Stop()
        s.scheduler.Stop()
    }()
}

reconcile

集群部分的 reconcile 就是在集群状态变更的时候所有 Object 都进一把队列,暴力但是简单

func (s *SchedulingPreferenceController) reconcileOnClusterChange() {
    if !s.isSynced() {
        s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
    }
    for _, obj := range s.store.List() {
        qualifiedName := util.NewQualifiedName(obj.(pkgruntime.Object))
        s.worker.EnqueueWithDelay(qualifiedName, s.smallDelay)
    }
}
func (s *SchedulingPreferenceController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
    defer metrics.UpdateControllerReconcileDurationFromStart("schedulingpreferencecontroller", time.Now())

    if !s.isSynced() {
        return util.StatusNotSynced
    }

    kind := s.scheduler.SchedulingKind()
    key := qualifiedName.String()

    klog.V(4).Infof("Starting to reconcile %s controller triggered key named %v", kind, key)
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished reconciling %s controller triggered key named %v (duration: %v)", kind, key, time.Since(startTime))
    }()

    obj, err := s.objFromCache(s.store, kind, key)
    if err != nil {
        return util.StatusAllOK
    }
    if obj == nil {
        // Nothing to do
        return util.StatusAllOK
    }

    return s.scheduler.Reconcile(obj, qualifiedName)
}

最后还是靠 Scheduler 的 Reconcile 执行实际的东西,也合乎预期。

Scheduler

当前也就只有一个 ReplicaScheduler,那就从他下手咯

// pkg/schedulingtypes/interface.go

type Scheduler interface {
  // 对应 ST 的 kind
    SchedulingKind() string
  // 对应的 SchedulingPreference
    ObjectType() pkgruntime.Object

    Start()
    HasSynced() bool
    Stop()
    Reconcile(obj pkgruntime.Object, qualifiedName util.QualifiedName) util.ReconciliationStatus

    StartPlugin(typeConfig typeconfig.Interface) error
    StopPlugin(kind string)
}

type SchedulerEventHandlers struct {
    KubeFedEventHandler      func(pkgruntime.Object)
    ClusterEventHandler      func(pkgruntime.Object)
    ClusterLifecycleHandlers *util.ClusterLifecycleHandlerFuncs
}

type SchedulerFactory func(controllerConfig *util.ControllerConfig, eventHandlers SchedulerEventHandlers) (Scheduler, error)

ReplicaScheduler

初始化方面没有什么特别,就起了个 pod informer 然后注入了外部提供的 eventHandlers 中的 ClusterLifecycleHandler,然后忽略掉 pod changes 的 trigger(预期不作处理)。

StartPlugin & StopPlugin

当上层 Controller 发现有新的 FTC 且有注册到 ST 的时候就会通过 ReplicaScheduler.StartPlugin 启动一个对应的插件,那这货是干啥的,奇怪的名字?看看咯

func (s *ReplicaScheduler) StartPlugin(typeConfig typeconfig.Interface) error {
    kind := typeConfig.GetFederatedType().Kind
    // TODO(marun) Return an error if the kind is not supported

    plugin, err := NewPlugin(s.controllerConfig, s.eventHandlers, typeConfig)
    if err != nil {
        return errors.Wrapf(err, "Failed to initialize replica scheduling plugin for %q", kind)
    }

    plugin.Start()
    s.plugins.Store(kind, plugin)

    return nil
}

func (s *ReplicaScheduler) StopPlugin(kind string) {
    plugin, ok := s.plugins.Get(kind)
    if !ok {
        return
    }

    plugin.(*Plugin).Stop()
    s.plugins.Delete(kind)
}

看到上面代码的第一眼是懵逼的,从注释和变量名里完全看不出什么东西来。。。继续深入看看 Plugin 的实现。

// pkg/schedulingtypes/plugin.go

func NewPlugin(controllerConfig *util.ControllerConfig, eventHandlers SchedulerEventHandlers, typeConfig typeconfig.Interface) (*Plugin, error) {
    targetAPIResource := typeConfig.GetTargetType()
    userAgent := fmt.Sprintf("%s-replica-scheduler", strings.ToLower(targetAPIResource.Kind))
    client := genericclient.NewForConfigOrDieWithUserAgent(controllerConfig.KubeConfig, userAgent)

    targetInformer, err := util.NewFederatedInformer(
        controllerConfig,
        client,
        &targetAPIResource,
        eventHandlers.ClusterEventHandler,
        eventHandlers.ClusterLifecycleHandlers,
    )
    if err != nil {
        return nil, err
    }

  // 以上启动了一个 Target Type 的 informer 并传入了 handler,比如 Deployment

    p := &Plugin{
        targetInformer: targetInformer,
        typeConfig:     typeConfig,
        stopChannel:    make(chan struct{}),
    }

    targetNamespace := controllerConfig.TargetNamespace
    kubeFedEventHandler := eventHandlers.KubeFedEventHandler

    federatedTypeAPIResource := typeConfig.GetFederatedType()
    p.federatedTypeClient, err = util.NewResourceClient(controllerConfig.KubeConfig, &federatedTypeAPIResource)
    if err != nil {
        return nil, err
    }
  // 起了一个 FederatedType informer
    p.federatedStore, p.federatedController = util.NewResourceInformer(p.federatedTypeClient, targetNamespace, &federatedTypeAPIResource, kubeFedEventHandler)

    return p, nil
}

func (p *Plugin) Start() {启动前面初始化的两个 informer}
func (p *Plugin) Reconcile(qualifiedName util.QualifiedName, result map[string]int64) error {
    fedObject, err := p.federatedTypeClient.Resources(qualifiedName.Namespace).Get(context.Background(), qualifiedName.Name, metav1.GetOptions{})
    if err != nil && apierrors.IsNotFound(err) {
        // Federated resource has been deleted - no further action required
        return nil
    }
    if err != nil {
        return err
    }

    isDirty := false

    newClusterNames := []string{}
    for name := range result {
        newClusterNames = append(newClusterNames, name)
    }
    clusterNames, err := util.GetClusterNames(fedObject)
    if err != nil {
        return err
    }
  // 判断前后两个集群的名字是否变动了来决定是否改变了 placement
    if PlacementUpdateNeeded(clusterNames, newClusterNames) {
        if err := util.SetClusterNames(fedObject, newClusterNames); err != nil {
            return err
        }

        isDirty = true
    }

    overridesMap, err := util.GetOverrides(fedObject)
    if err != nil {
        return errors.Wrapf(err, "Error reading cluster overrides for %s %q", p.typeConfig.GetFederatedType().Kind, qualifiedName)
    }
    if OverrideUpdateNeeded(overridesMap, result) {
        err := setOverrides(fedObject, overridesMap, result)
        if err != nil {
            return err
        }
        isDirty = true
    }

    if isDirty {
    // 如果需要更新就丢到 FederatedType Object 上
        _, err := p.federatedTypeClient.Resources(qualifiedName.Namespace).Update(context.Background(), fedObject, metav1.UpdateOptions{})
        if err != nil {
            return err
        }
    }

    return nil
}

好像 plugin 没做啥东西,就是判断是不是更新然后就更新上去。。。

ReplicaScheduler Start & Stop


func (s *ReplicaScheduler) Start() {
    s.podInformer.Start()
}

func (s *ReplicaScheduler) Stop() {
    for _, plugin := range s.plugins.GetAll() {
        plugin.(*Plugin).Stop()
    }
    s.plugins.DeleteAll()
    s.podInformer.Stop()
}

好像没啥玩意,那忽略吧

ReplicaScheduler.Reconcile

func (s *ReplicaScheduler) Reconcile(obj pkgruntime.Object, qualifiedName ctlutil.QualifiedName) ctlutil.ReconciliationStatus {
    rsp, ok := obj.(*fedschedulingv1a1.ReplicaSchedulingPreference)
    if !ok {
        runtime.HandleError(errors.Errorf("Incorrect runtime object for RSP: %v", rsp))
        return ctlutil.StatusError
    }

    clusterNames, err := s.clusterNames()
    if err != nil {
        runtime.HandleError(errors.Wrap(err, "Failed to get cluster list"))
        return ctlutil.StatusError
    }
    if len(clusterNames) == 0 {
        // no joined clusters, nothing to do
        return ctlutil.StatusAllOK
    }
  // 根据 Fed Informer 获取所有 ready 的集群,如果没有可用的集群的话也就暂时什么操作也不做。
  // 主动过滤下 Target
    kind := rsp.Spec.TargetKind
    if kind != "FederatedDeployment" && kind != "FederatedReplicaSet" {
        runtime.HandleError(errors.Wrapf(err, "RSP target kind: %s is incorrect", kind))
        return ctlutil.StatusNeedsRecheck
    }
  // 取下对应 kind 的 plugin
    plugin, ok := s.plugins.Get(kind)
    if !ok {
        return ctlutil.StatusAllOK
    }

    if !plugin.(*Plugin).FederatedTypeExists(qualifiedName.String()) {
        // target FederatedType does not exist, nothing to do
        return ctlutil.StatusAllOK
    }

    key := qualifiedName.String()
  // 获取计算调度的结果
    result, err := s.GetSchedulingResult(rsp, qualifiedName, clusterNames)
    if err != nil {
        runtime.HandleError(errors.Wrapf(err, "Failed to compute the schedule information while reconciling RSP named %q", key))
        return ctlutil.StatusError
    }
  // 分发让 plugin 去更新 Federated Object
    err = plugin.(*Plugin).Reconcile(qualifiedName, result)
    if err != nil {
        runtime.HandleError(errors.Wrapf(err, "Failed to reconcile federated targets for RSP named %q", key))
        return ctlutil.StatusError
    }

    return ctlutil.StatusAllOK
}

ReplicaScheduler.GetSchedulingResult

这部分负责计算调度的实例数分布,但也是写的最杂的一个部分(分配和平衡)

// 注意:这边传入的 ClusterNames 只有 Ready Cluster
func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference, qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, error) {
    key := qualifiedName.String()

  // 这边的 Target 是 Deployment 或者 ReplicaSet
    objectGetter := func(clusterName, key string) (interface{}, bool, error) {
        plugin, ok := s.plugins.Get(rsp.Spec.TargetKind)
        if !ok {
            return nil, false, nil
        }
        return plugin.(*Plugin).targetInformer.GetTargetStore().GetByKey(clusterName, key)
    }
  // 通过 Deployment 或者 ReplicaSet 的 selector 去获取 Namespace 下的 Pod List
    podsGetter := func(clusterName string, unstructuredObj *unstructured.Unstructured) (*corev1.PodList, error) {
        client, err := s.podInformer.GetClientForCluster(clusterName)
        if err != nil {
            return nil, err
        }
    // 风险:这边当前是固定的 path,如果哪天有变动的话,就需要去做差异化
        selectorLabels, ok, err := unstructured.NestedStringMap(unstructuredObj.Object, "spec", "selector", "matchLabels")
        if !ok {
            return nil, errors.New("missing selector on object")
        }
        if err != nil {
            return nil, errors.Wrap(err, "error retrieving selector from object")
        }

        podList := &corev1.PodList{}
        err = client.List(context.Background(), podList, unstructuredObj.GetNamespace(), crclient.MatchingLabels(selectorLabels))
        if err != nil {
            return nil, err
        }
        return podList, nil
    }
  // 获取当前各个集群的 Pod 运行情况
    currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
    if err != nil {
        return nil, err
    }

    // TODO: Move this to API defaulting logic
  // 如果用户没有设定集群,就直接所有集群给同样的权重也就是平分
    if len(rsp.Spec.Clusters) == 0 {
        rsp.Spec.Clusters = map[string]fedschedulingv1a1.ClusterPreferences{
            "*": {Weight: 1},
        }
    }

    plnr := planner.NewPlanner(rsp)
    return schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
}

clustersReplicaState


// clustersReplicaState returns information about the scheduling state of the pods running in the federated clusters.
func clustersReplicaState(
    clusterNames []string,
    key string,
    objectGetter func(clusterName string, key string) (interface{}, bool, error),
    podsGetter func(clusterName string, obj *unstructured.Unstructured) (*corev1.PodList, error)) (currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, err error) {
    currentReplicasPerCluster = make(map[string]int64)
    estimatedCapacity = make(map[string]int64)

    for _, clusterName := range clusterNames {
    // 取到 Deployment 或者 ReplicaSet
        obj, exists, err := objectGetter(clusterName, key)
        if err != nil {
            return nil, nil, err
        }
        if !exists {
            continue
        }

        unstructuredObj := obj.(*unstructured.Unstructured)
        replicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "spec", "replicas")
        if err != nil {
            return nil, nil, errors.Wrap(err, "Error retrieving 'replicas' field")
        }
        if !ok {
            replicas = int64(0)
        }
    // 当前 deploy 和 RS 都有这个
        readyReplicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "status", "readyreplicas")
        if err != nil {
            return nil, nil, errors.Wrap(err, "Error retrieving 'readyreplicas' field")
        }
        if !ok {
            readyReplicas = int64(0)
        }
    // 达到稳定的状态。ready == 期望
        if replicas == readyReplicas {
            currentReplicasPerCluster[clusterName] = readyReplicas
        } else {
            currentReplicasPerCluster[clusterName] = int64(0)
            podList, err := podsGetter(clusterName, unstructuredObj)
            if err != nil {
                return nil, nil, err
            }

      // 根据已有的 Pod Status 计算 Ready 和 Unschedulable Pod 的数量
            podStatus := podanalyzer.AnalyzePods(podList, time.Now())
            currentReplicasPerCluster[clusterName] = int64(podStatus.RunningAndReady) // include pending as well?
      // 这边的计算其实没办法 cover 未创建部分的 pod(比如因为 resourceQuota 而被 webhook 拒绝创建的)            unschedulable := int64(podStatus.Unschedulable)
            if unschedulable > 0 {
        // 风险点:所以  estimatedCapacity = ready + scheduled but not ready + 未创建?理论上未创建的不应该算上,算上的话会比实际容量大
                estimatedCapacity[clusterName] = replicas - unschedulable
            }
        }
    }
    return currentReplicasPerCluster, estimatedCapacity, nil
}

所以从上面函数我们可以得到两个信息

得到这些信息后,Scheduler 就丢给了 Planner 和 func schedule(...) 去做实例分配

Planner & schedule function

func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference, qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, error) {
  ...
    currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
    if err != nil {
        return nil, err
    }

    // TODO: Move this to API defaulting logic
    if len(rsp.Spec.Clusters) == 0 {
        rsp.Spec.Clusters = map[string]fedschedulingv1a1.ClusterPreferences{
            "*": {Weight: 1},
        }
    }

  // 初始化的时候把 Preference 传入
    plnr := planner.NewPlanner(rsp)
    return schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
}

在前文中,我们知道 GetSchedulingResult 的最后一步是 schedule on planner,那就先来看看 Planner 干啥吧。

Planner

Planner 的唯一一个函数就是 Plan,里面一坨坨的计算,说实在真不想看他。。。但也无奈啊,这部分代码如果不看的话未来很可能翻车(不过如果有必要,到时候直接把 planner 换了)

// replicaSetKey 是一个 qualifiedName.String() 但不知道为什么就叫 replicaSetKey 应该 replicatedKey 或者什么更好点吧
func (p *Planner) Plan(availableClusters []string, currentReplicaCount map[string]int64,
    estimatedCapacity map[string]int64, replicaSetKey string) (map[string]int64, map[string]int64, error) {
    preferences := make([]*namedClusterPreferences, 0, len(availableClusters))
    plan := make(map[string]int64, len(preferences))
    overflow := make(map[string]int64, len(preferences))

    named := func(name string, pref fedschedulingv1a1.ClusterPreferences) (*namedClusterPreferences, error) {
        // Seems to work better than addler for our case.
        hasher := fnv.New32()
        if _, err := hasher.Write([]byte(name)); err != nil {
            return nil, err
        }
        if _, err := hasher.Write([]byte(replicaSetKey)); err != nil {
            return nil, err
        }

        return &namedClusterPreferences{
            clusterName:        name,
            hash:               hasher.Sum32(),  // 后面排序用
            ClusterPreferences: pref,
        }, nil
    }

    for _, cluster := range availableClusters {
        if localRSP, found := p.preferences.Spec.Clusters[cluster]; found {
            preference, err := named(cluster, localRSP)
            if err != nil {
                return nil, nil, err
            }

            preferences = append(preferences, preference)
        } else {
            if localRSP, found := p.preferences.Spec.Clusters["*"]; found {
                preference, err := named(cluster, localRSP)
                if err != nil {
                    return nil, nil, err
                }

                preferences = append(preferences, preference)
            } else {
                plan[cluster] = int64(0)
            }
        }
    }
  // 上面这段差不多的意思就是:如果有特定名字的 cluster ClusterPreferences 或者是 *(通配)就算下 hash 丢 namedClusterPreference,否则置 0 (reset)  // 根据 降序 weight + 升序 hash(加入 hash 主要是为了避免因为同 weight 先后顺序的跳动)    sort.Sort(byWeight(preferences))

    // This is the requested total replicas in preferences
    remainingReplicas := int64(p.preferences.Spec.TotalReplicas)

  // 按集群容量去给各个集群分配低保实例数
    // Assign each cluster the minimum number of replicas it requested.
    for _, preference := range preferences {
        min := minInt64(preference.MinReplicas, remainingReplicas)
        if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
            min = minInt64(min, capacity)
        }
        remainingReplicas -= min
        plan[preference.clusterName] = min
    }

    // This map contains information how many replicas were assigned to
    // the cluster based only on the current replica count and
    // rebalance=false preference. It will be later used in remaining replica
    // distribution code.
    preallocated := make(map[string]int64)
  // 没有开启实例数平衡的情况下
    if !p.preferences.Spec.Rebalance {
        for _, preference := range preferences {
            planned := plan[preference.clusterName]
            count, hasSome := currentReplicaCount[preference.clusterName]
      // ready 的比当前已有的规划实例数要多
            if hasSome && count > planned {
        // 按 ready 的初始化最终实例数
                target := count
        // 不超过最大实例数
                if preference.MaxReplicas != nil {
                    target = minInt64(*preference.MaxReplicas, target)
                }
        // 不超过集群容纳容量
                if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
                    target = minInt64(capacity, target)
                }
        // 新的实例数比前面规划的多的话,从 remain 里减一些过来(但不能超过 remain)                extra := minInt64(target-planned, remainingReplicas)
                if extra < 0 {
                    extra = 0
                }
                remainingReplicas -= extra
        // 所以实际存在 preallocated 里的都是相对前面 plan 算的需要多给的量(还需要再给的量)                preallocated[preference.clusterName] = extra
                plan[preference.clusterName] = extra + planned
            }
        }
    }

    modified := true

    // It is possible single pass of the loop is not enough to distribute all replicas among clusters due
    // to weight, max and rounding corner cases. In such case we iterate until either
    // there is no replicas or no cluster gets any more replicas or the number
    // of attempts is less than available cluster count. If there is no preallocated pods
    // every loop either distributes all remainingReplicas or maxes out at least one cluster.
    // If there are preallocated then the replica spreading may take longer.
    // We reduce the number of pending preallocated replicas by at least half with each iteration so
    // we may need log(replicasAtStart) iterations.
    // TODO: Prove that clusterCount * log(replicas) iterations solves the problem or adjust the number.
    // TODO: This algorithm is O(clusterCount^2 * log(replicas)) which is good for up to 100 clusters.
    // Find something faster.
  // remain 的还有剩,需要再根据 weight 给分完
    for trial := 0; modified && remainingReplicas > 0; trial++ {
        modified = false
        weightSum := int64(0)
        for _, preference := range preferences {
            weightSum += preference.Weight
        }
        newPreferences := make([]*namedClusterPreferences, 0, len(preferences))

    // 复制下,因为后面都是需要基于一轮的剩余总量去算的,而不是实时剩余
        distributeInThisLoop := remainingReplicas

        for _, preference := range preferences {
            if weightSum > 0 {
                start := plan[preference.clusterName]
                // Distribute the remaining replicas, rounding fractions always up.
        // 取最小((剩余量 * 集群权重 + 总权重 - 1) / 总权重,剩余量)
        // -> 最小(剩余量 * (集群权重/总权重)  + 1 - (1/总权重), 剩余量)
        // 疑点:没看懂 `+ 1 - (1/总权重)` 是什么操作,就先假装是要再给 cluster 的量吧
                extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum
        // 不能超过剩余量
                extra = minInt64(extra, remainingReplicas)

                // Account preallocated.
                prealloc := preallocated[preference.clusterName]
        // 疑点:看不懂这个,根据 weight 计算的多给的量跟关闭平衡下要多给的量做对比,哪个少用哪个?                usedPrealloc := minInt64(extra, prealloc)
                preallocated[preference.clusterName] = prealloc - usedPrealloc
                extra -= usedPrealloc
                if usedPrealloc > 0 {
                    modified = true
                }
        // 以上这波计算大意是:        // 1. 得到这个集群还需要分配的实例数
        // 2. 跟当前已经多给了的做比较(不过如果开了 Rebalance)这边的 usedPrealloc 永远等于 0(modified 就会 false 就没有下一轮了)        // 3. extra 去减掉 preallocated(最后的值最小是 0 )
                // In total there should be the amount that was there at start plus whatever is due
                // in this iteration
                total := start + extra
        // 以下是修正实例数
                // Check if we don't overflow the cluster, and if yes don't consider this cluster
                // in any of the following iterations.
                full := false
        // 不可超过上限
                if preference.MaxReplicas != nil && total > *preference.MaxReplicas {
                    total = *preference.MaxReplicas
                    full = true
                }
        // 不可超过预估的集群容量
                if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && total > capacity {
                    overflow[preference.clusterName] = total - capacity
                    total = capacity
                    full = true
                }

                if !full {
                    newPreferences = append(newPreferences, preference)
                }

                // Only total-start replicas were actually taken.
                remainingReplicas -= (total - start)
                plan[preference.clusterName] = total

                // Something extra got scheduled on this cluster.
                if total > start {
                    modified = true
                }
            } else {
                break
            }
        }
        preferences = newPreferences
    }
  // 返回 plan 和 overflow(针对于早前估算的集群容量)    if p.preferences.Spec.Rebalance {
        return plan, overflow, nil
    } else {
        // If rebalance = false then overflow is trimmed at the level
        // of replicas that it failed to place somewhere.
        newOverflow := make(map[string]int64)
        for key, value := range overflow {
            value = minInt64(value, remainingReplicas)
            if value > 0 {
                newOverflow[key] = value
            }
        }
        return plan, newOverflow, nil
    }
}
schedule function
func schedule(planner *planner.Planner, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) (map[string]int64, error) {
    scheduleResult, overflow, err := planner.Plan(clusterNames, currentReplicasPerCluster, estimatedCapacity, key)
    if err != nil {
        return nil, err
    }
  // 从 planner 拿到计算后的结果和超过的实例数
    // TODO: Check if we really need to place the federated type in clusters
    // with 0 replicas. Override replicas would be set to 0 in this case.
    result := make(map[string]int64)
    for clusterName := range currentReplicasPerCluster {
        result[clusterName] = 0
    }

    for clusterName, replicas := range scheduleResult {
        result[clusterName] = replicas
    }
    for clusterName, replicas := range overflow {
        result[clusterName] += replicas
    }
  // 构造最终的实例数

    if klog.V(4) {
        buf := bytes.NewBufferString(fmt.Sprintf("Schedule - %q\n", key))
        sort.Strings(clusterNames)
        for _, clusterName := range clusterNames {
            cur := currentReplicasPerCluster[clusterName]
            target := scheduleResult[clusterName]
            fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
            if over, found := overflow[clusterName]; found {
                fmt.Fprintf(buf, " overflow: %d", over)
            }
            if capacity, found := estimatedCapacity[clusterName]; found {
                fmt.Fprintf(buf, " capacity: %d", capacity)
            }
            fmt.Fprintf(buf, "\n")
        }
        klog.V(4).Infof(buf.String())
    }
    return result, nil
}

小结

对于 Planner 的部分真没怎么看懂,如果硬说要看懂,那就是说这部分基本可以认为是:根据最小策略和 weight 一层层铺下去,直到没有剩余实例。然后 rebalance 部分其实是会通过跟当前 ready 的实例数做 min 来保障下最低需要给到集群的数量。