Coffee Reactor

Tracks of Neo
x @neoz_
github oif
status hyping blog projects

「kubefed 01」从入坑到入土

· 11 min read

本篇是入坑后的填土工作, 主要会放在比如像 SchedulingPreference 这种玩意上。

SchedulingManager

当前主要面向的是 ReplicatedTypes 比如说 Deployment、ReplicaSet 这样一类的对象,因为他们有实例数结合多集群需要做实例数调度。先来看看相关的新结构有什么

// ReplicaSchedulingPreferenceSpec defines the desired state of ReplicaSchedulingPreference
type ReplicaSchedulingPreferenceSpec struct {
	// TODO (@irfanurrehman); upgrade this to label selector only if need be.
	// The idea of this API is to have a a set of preferences which can
	// be used for a target FederatedDeployment or FederatedReplicaset.
	// Although the set of preferences in question can be applied to multiple
	// target objects using label selectors, but there are no clear advantages
	// of doing that as of now.
	// To keep the implementation and usage simple, matching ns/name of RSP
	// resource to the target resource is sufficient and only additional information
	// needed in RSP resource is a target kind (FederatedDeployment or FederatedReplicaset).
	TargetKind string `json:"targetKind"`

	// Total number of pods desired across federated clusters.
	// Replicas specified in the spec for target deployment template or replicaset
	// template will be discarded/overridden when scheduling preferences are
	// specified.
	TotalReplicas int32 `json:"totalReplicas"`

	// If set to true then already scheduled and running replicas may be moved to other clusters
	// in order to match current state to the specified preferences. Otherwise, if set to false,
	// up and running replicas will not be moved.
	// +optional
	Rebalance bool `json:"rebalance,omitempty"`

	// A mapping between cluster names and preferences regarding a local workload object (dep, rs, .. ) in
	// these clusters.
	// "*" (if provided) applies to all clusters if an explicit mapping is not provided.
	// If omitted, clusters without explicit preferences should not have any replicas scheduled.
	// +optional
	Clusters map[string]ClusterPreferences `json:"clusters,omitempty"`
}

// Preferences regarding number of replicas assigned to a cluster workload object (dep, rs, ..) within
// a federated workload object.
type ClusterPreferences struct {
	// Minimum number of replicas that should be assigned to this cluster workload object. 0 by default.
	// +optional
	MinReplicas int64 `json:"minReplicas,omitempty"`

	// Maximum number of replicas that should be assigned to this cluster workload object.
	// Unbounded if no value provided (default).
	// +optional
	MaxReplicas *int64 `json:"maxReplicas,omitempty"`

	// A number expressing the preference to put an additional replica to this cluster workload object.
	// 0 by default.
	Weight int64 `json:"weight,omitempty"`
}

// ReplicaSchedulingPreferenceStatus defines the observed state of ReplicaSchedulingPreference
type ReplicaSchedulingPreferenceStatus struct {
}

// +kubebuilder:object:root=true
// +kubebuilder:resource:path=replicaschedulingpreferences,shortName=rsp

type ReplicaSchedulingPreference struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   ReplicaSchedulingPreferenceSpec   `json:"spec,omitempty"`
	Status ReplicaSchedulingPreferenceStatus `json:"status,omitempty"`
}

SchedulingManagerController

其实跟其他几个 controller 基本的套路是差不多的

  1. 初始化 Reconcile Worker
  2. 初始化 FTC(FederatedTypeConfig)Informer
  3. Run 将 Informer 和 Worker 拉起

所以上述就不多说了,直接奔向 func (*SchedulingManager) reconcile,因为 Scheduling 的部分比较解耦,所以会穿插着看代码,写起来也比较零碎

reconcile

获取 SchedulingType

// pkg/controller/schedulingmanager/controller.go

func (c *SchedulingManager) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
  key := qualifiedName.String()

  klog.V(3).Infof("Running reconcile FederatedTypeConfig %q in scheduling manager", key)

  typeConfigName := qualifiedName.Name
  schedulingType := schedulingtypes.GetSchedulingType(typeConfigName)
  if schedulingType == nil {
    // No scheduler supported for this resource
    return util.StatusAllOK
  }
  schedulingKind := schedulingType.Kind
  ...
}

上面的部分是 reconcile 的开篇,根据 qualifiedName(其实也就是 FederatedTypeConfig 的 meta)去取 SchedulingType 也可以认为就是「拿 FederatedTypeConfig.name 去取」(下文 SchedulingType 即为缩写 ST),如果没有取到则不对 FTC 做任何处理。

那这部分 ST 又是怎么来的?是通过代码预注册来的,来看看。

SchedulingType

type SchedulingType struct {
	Kind             string
	SchedulerFactory SchedulerFactory
}

// Mapping of qualified target name (e.g. deployment.apps) targeted
// for scheduling with scheduling type
var typeRegistry = make(map[string]SchedulingType)

func RegisterSchedulingType(kind string, schedulingType SchedulingType) {
	existing, ok := typeRegistry[kind]
	if ok {
		panic(fmt.Sprintf("Kind %q is already registered for scheduling with %q", kind, existing.Kind))
	}
	typeRegistry[kind] = schedulingType
}

func SchedulingTypes() map[string]SchedulingType {
	result := make(map[string]SchedulingType)
	for key, value := range typeRegistry {
		result[key] = value
	}
	return result
}

func GetSchedulingType(kind string) *SchedulingType {
	schedulingType, ok := typeRegistry[kind]
	if ok {
		return &schedulingType
	}
	return nil
}

SchedulingType 就两个变量,一个是对于 FederatedType 的 name(这边写为 kind,也可以认为是 target type 的 GK 形式),另一个是对应的 Scheduler Factory。外部通过 RegisterSchedulingType 注册到一个包内变量。

当前有且仅有两种 ST 就在 pkg/schedulingtypes/replicasscheduler.go 里注册的,分别是 deployments.appsreplicasets.apps,他们的 factory 都是 ReplicaScheduler(这个咱们放后面说)。

注意点

我们不难发现在 ST 中并不关注 API 的版本,只关注了其名字和所在的 Group。也就是说就 APIResource 升级来说

回到 reconcile

func (c *SchedulingManager) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
  key := qualifiedName.String()
  ...
  cachedObj, exist, err := c.store.GetByKey(key)
  if err != nil {
    runtime.HandleError(errors.Wrapf(err, "Failed to query FederatedTypeConfig store for %q in scheduling manager", key))
    return util.StatusError
  }

  if !exist {
    c.stopScheduler(schedulingKind, typeConfigName)
    return util.StatusAllOK
  }

  typeConfig := cachedObj.(*corev1b1.FederatedTypeConfig)
  if !typeConfig.GetPropagationEnabled() || typeConfig.DeletionTimestamp != nil {
    c.stopScheduler(schedulingKind, typeConfigName)
    return util.StatusAllOK
  }
  // 以上从 cache 中获取对应的 FTC 对象

  // 下面这个 set 其实是比较迷惑的,在里面设了 TargetType.PluralName, TargetType.Group, FederatedType.PluralName, StatusType
  // 然后还写了个 TODO(marun) will name always be populated?
  // 疑问点:为什么需要 SET?一脸懵逼,可能是担心某些地方漏了所以糊一糊?但这应由 webhook 处理来着。。。
  // set name and group for the type config target
  corev1b1.SetFederatedTypeConfigDefaults(typeConfig)

  // 以下将会以 ST 创建一个 SchedulingPreferenceController 然后判断是否已经启用对应 FTC name 的 Plugin,如果没有就开起来
  // Scheduling preference controller is started on demand
  abstractScheduler, ok := c.schedulers.Get(schedulingKind)
  if !ok {
    klog.Infof("Starting schedulingpreference controller for %s", schedulingKind)
    stopChan := make(chan struct{})
    schedulerInterface, err := schedulingpreference.StartSchedulingPreferenceController(c.config, *schedulingType, stopChan)
    if err != nil {
      runtime.HandleError(errors.Wrapf(err, "Error starting schedulingpreference controller for %s", schedulingKind))
      return util.StatusError
    }
    abstractScheduler = newSchedulerWrapper(schedulerInterface, stopChan)
    c.schedulers.Store(schedulingKind, abstractScheduler)
  }

  scheduler := abstractScheduler.(*SchedulerWrapper)
  if scheduler.HasPlugin(typeConfigName) {
    // Scheduler and plugin already running for this target typeConfig
    return util.StatusAllOK
  }

  federatedKind := typeConfig.GetFederatedType().Kind
  klog.Infof("Starting plugin %s for %s", federatedKind, schedulingKind)
  err = scheduler.StartPlugin(typeConfig)
  if err != nil {
    runtime.HandleError(errors.Wrapf(err, "Error starting plugin %s for %s", federatedKind, schedulingKind))
    return util.StatusError
  }
  scheduler.pluginMap.Store(typeConfigName, federatedKind)

  return util.StatusAllOK

以上基本就是 SchedulingManager 的全部了,那接下来就是去看看启动的这一坨 Factory 和 Controller。

SchedulingPreferenceController

// SchedulingPreferenceController starts a new controller for given type of SchedulingPreferences
func StartSchedulingPreferenceController(config *util.ControllerConfig, schedulingType schedulingtypes.SchedulingType, stopChannel <-chan struct{}) (schedulingtypes.Scheduler, error) {
	controller, err := newSchedulingPreferenceController(config, schedulingType)
	if err != nil {
		return nil, err
	}
	if config.MinimizeLatency {
		controller.minimizeLatency()
	}
	klog.Infof("Starting replicaschedulingpreferences controller")
	controller.Run(stopChannel)
	return controller.scheduler, nil
}

// newSchedulingPreferenceController returns a new SchedulingPreference Controller for the given type
func newSchedulingPreferenceController(config *util.ControllerConfig, schedulingType schedulingtypes.SchedulingType) (*SchedulingPreferenceController, error) {
	userAgent := fmt.Sprintf("%s-controller", schedulingType.Kind)
	kubeConfig := restclient.CopyConfig(config.KubeConfig)
	restclient.AddUserAgent(kubeConfig, userAgent)
	kubeClient, err := kubeclientset.NewForConfig(kubeConfig)
	if err != nil {
		return nil, err
	}

	broadcaster := record.NewBroadcaster()
	broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
	recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "replicaschedulingpreference-controller"})

	s := &SchedulingPreferenceController{
		clusterAvailableDelay:   config.ClusterAvailableDelay,
		clusterUnavailableDelay: config.ClusterUnavailableDelay,
		smallDelay:              time.Second * 3,
		eventRecorder:           recorder,
	}

	s.worker = util.NewReconcileWorker(s.reconcile, util.WorkerTiming{
		ClusterSyncDelay: s.clusterAvailableDelay,
	})

	eventHandlers := schedulingtypes.SchedulerEventHandlers{
		KubeFedEventHandler: s.worker.EnqueueObject,
		ClusterEventHandler: func(obj pkgruntime.Object) {
			qualifiedName := util.NewQualifiedName(obj)
			s.worker.EnqueueForRetry(qualifiedName)
		},
		ClusterLifecycleHandlers: &util.ClusterLifecycleHandlerFuncs{
			ClusterAvailable: func(cluster *fedv1b1.KubeFedCluster) {
				// When new cluster becomes available process all the target resources again.
				s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
			},
			// When a cluster becomes unavailable process all the target resources again.
			ClusterUnavailable: func(cluster *fedv1b1.KubeFedCluster, _ []interface{}) {
				s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
			},
		},
	}
  // 在此之前都是老套路,client、informer、ClusterLifecycleHandler,不赘述了

  // 拿 ST.Factory 初始化了一个 scheduler 出来
	scheduler, err := schedulingType.SchedulerFactory(config, eventHandlers)
	if err != nil {
		return nil, err
	}
	s.scheduler = scheduler

	// Build deliverer for triggering cluster reconciliations.
	s.clusterDeliverer = util.NewDelayingDeliverer()

  // 下面的 ObjectType 是 kuebfed 自己的一个 CRD,当前从 ReplicaScheduler 获取的都是 `&fedschedulingv1a1.ReplicaSchedulingPreference{}`
	s.store, s.controller, err = util.NewGenericInformer(
		config.KubeConfig,
		config.TargetNamespace,
		s.scheduler.ObjectType(),
		util.NoResyncPeriod,
		s.worker.EnqueueObject,
	)
	if err != nil {
		return nil, err
	}

	return s, nil
}

// 那就启动咯~~
func (s *SchedulingPreferenceController) Run(stopChan <-chan struct{}) {
	go s.controller.Run(stopChan)
	s.scheduler.Start()

	s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
		s.reconcileOnClusterChange()
	})

	s.worker.Run(stopChan)

	// Ensure all goroutines are cleaned up when the stop channel closes
	go func() {
		<-stopChan
		s.clusterDeliverer.Stop()
		s.scheduler.Stop()
	}()
}

reconcile

集群部分的 reconcile 就是在集群状态变更的时候所有 Object 都进一把队列,暴力但是简单

func (s *SchedulingPreferenceController) reconcileOnClusterChange() {
	if !s.isSynced() {
		s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
	}
	for _, obj := range s.store.List() {
		qualifiedName := util.NewQualifiedName(obj.(pkgruntime.Object))
		s.worker.EnqueueWithDelay(qualifiedName, s.smallDelay)
	}
}
func (s *SchedulingPreferenceController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
	defer metrics.UpdateControllerReconcileDurationFromStart("schedulingpreferencecontroller", time.Now())

	if !s.isSynced() {
		return util.StatusNotSynced
	}

	kind := s.scheduler.SchedulingKind()
	key := qualifiedName.String()

	klog.V(4).Infof("Starting to reconcile %s controller triggered key named %v", kind, key)
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished reconciling %s controller triggered key named %v (duration: %v)", kind, key, time.Since(startTime))
	}()

	obj, err := s.objFromCache(s.store, kind, key)
	if err != nil {
		return util.StatusAllOK
	}
	if obj == nil {
		// Nothing to do
		return util.StatusAllOK
	}

	return s.scheduler.Reconcile(obj, qualifiedName)
}

最后还是靠 Scheduler 的 Reconcile 执行实际的东西,也合乎预期。

Scheduler

当前也就只有一个 ReplicaScheduler,那就从他下手咯

// pkg/schedulingtypes/interface.go

type Scheduler interface {
  // 对应 ST 的 kind
	SchedulingKind() string
  // 对应的 SchedulingPreference
	ObjectType() pkgruntime.Object

	Start()
	HasSynced() bool
	Stop()
	Reconcile(obj pkgruntime.Object, qualifiedName util.QualifiedName) util.ReconciliationStatus

	StartPlugin(typeConfig typeconfig.Interface) error
	StopPlugin(kind string)
}

type SchedulerEventHandlers struct {
	KubeFedEventHandler      func(pkgruntime.Object)
	ClusterEventHandler      func(pkgruntime.Object)
	ClusterLifecycleHandlers *util.ClusterLifecycleHandlerFuncs
}

type SchedulerFactory func(controllerConfig *util.ControllerConfig, eventHandlers SchedulerEventHandlers) (Scheduler, error)

ReplicaScheduler

初始化方面没有什么特别,就起了个 pod informer 然后注入了外部提供的 eventHandlers 中的 ClusterLifecycleHandler,然后忽略掉 pod changes 的 trigger(预期不作处理)。

StartPlugin & StopPlugin

当上层 Controller 发现有新的 FTC 且有注册到 ST 的时候就会通过 ReplicaScheduler.StartPlugin 启动一个对应的插件,那这货是干啥的,奇怪的名字?看看咯

func (s *ReplicaScheduler) StartPlugin(typeConfig typeconfig.Interface) error {
	kind := typeConfig.GetFederatedType().Kind
	// TODO(marun) Return an error if the kind is not supported

	plugin, err := NewPlugin(s.controllerConfig, s.eventHandlers, typeConfig)
	if err != nil {
		return errors.Wrapf(err, "Failed to initialize replica scheduling plugin for %q", kind)
	}

	plugin.Start()
	s.plugins.Store(kind, plugin)

	return nil
}

func (s *ReplicaScheduler) StopPlugin(kind string) {
	plugin, ok := s.plugins.Get(kind)
	if !ok {
		return
	}

	plugin.(*Plugin).Stop()
	s.plugins.Delete(kind)
}

看到上面代码的第一眼是懵逼的,从注释和变量名里完全看不出什么东西来。。。继续深入看看 Plugin 的实现。

// pkg/schedulingtypes/plugin.go

func NewPlugin(controllerConfig *util.ControllerConfig, eventHandlers SchedulerEventHandlers, typeConfig typeconfig.Interface) (*Plugin, error) {
	targetAPIResource := typeConfig.GetTargetType()
	userAgent := fmt.Sprintf("%s-replica-scheduler", strings.ToLower(targetAPIResource.Kind))
	client := genericclient.NewForConfigOrDieWithUserAgent(controllerConfig.KubeConfig, userAgent)

	targetInformer, err := util.NewFederatedInformer(
		controllerConfig,
		client,
		&targetAPIResource,
		eventHandlers.ClusterEventHandler,
		eventHandlers.ClusterLifecycleHandlers,
	)
	if err != nil {
		return nil, err
	}

  // 以上启动了一个 Target Type 的 informer 并传入了 handler,比如 Deployment

	p := &Plugin{
		targetInformer: targetInformer,
		typeConfig:     typeConfig,
		stopChannel:    make(chan struct{}),
	}

	targetNamespace := controllerConfig.TargetNamespace
	kubeFedEventHandler := eventHandlers.KubeFedEventHandler

	federatedTypeAPIResource := typeConfig.GetFederatedType()
	p.federatedTypeClient, err = util.NewResourceClient(controllerConfig.KubeConfig, &federatedTypeAPIResource)
	if err != nil {
		return nil, err
	}
  // 起了一个 FederatedType informer
	p.federatedStore, p.federatedController = util.NewResourceInformer(p.federatedTypeClient, targetNamespace, &federatedTypeAPIResource, kubeFedEventHandler)

	return p, nil
}

func (p *Plugin) Start() {启动前面初始化的两个 informer}
func (p *Plugin) Reconcile(qualifiedName util.QualifiedName, result map[string]int64) error {
	fedObject, err := p.federatedTypeClient.Resources(qualifiedName.Namespace).Get(context.Background(), qualifiedName.Name, metav1.GetOptions{})
	if err != nil && apierrors.IsNotFound(err) {
		// Federated resource has been deleted - no further action required
		return nil
	}
	if err != nil {
		return err
	}

	isDirty := false

	newClusterNames := []string{}
	for name := range result {
		newClusterNames = append(newClusterNames, name)
	}
	clusterNames, err := util.GetClusterNames(fedObject)
	if err != nil {
		return err
	}
  // 判断前后两个集群的名字是否变动了来决定是否改变了 placement
	if PlacementUpdateNeeded(clusterNames, newClusterNames) {
		if err := util.SetClusterNames(fedObject, newClusterNames); err != nil {
			return err
		}

		isDirty = true
	}

	overridesMap, err := util.GetOverrides(fedObject)
	if err != nil {
		return errors.Wrapf(err, "Error reading cluster overrides for %s %q", p.typeConfig.GetFederatedType().Kind, qualifiedName)
	}
	if OverrideUpdateNeeded(overridesMap, result) {
		err := setOverrides(fedObject, overridesMap, result)
		if err != nil {
			return err
		}
		isDirty = true
	}

	if isDirty {
    // 如果需要更新就丢到 FederatedType Object 上
		_, err := p.federatedTypeClient.Resources(qualifiedName.Namespace).Update(context.Background(), fedObject, metav1.UpdateOptions{})
		if err != nil {
			return err
		}
	}

	return nil
}

好像 plugin 没做啥东西,就是判断是不是更新然后就更新上去。。。

ReplicaScheduler Start & Stop


func (s *ReplicaScheduler) Start() {
	s.podInformer.Start()
}

func (s *ReplicaScheduler) Stop() {
	for _, plugin := range s.plugins.GetAll() {
		plugin.(*Plugin).Stop()
	}
	s.plugins.DeleteAll()
	s.podInformer.Stop()
}

好像没啥玩意,那忽略吧

ReplicaScheduler.Reconcile

func (s *ReplicaScheduler) Reconcile(obj pkgruntime.Object, qualifiedName ctlutil.QualifiedName) ctlutil.ReconciliationStatus {
	rsp, ok := obj.(*fedschedulingv1a1.ReplicaSchedulingPreference)
	if !ok {
		runtime.HandleError(errors.Errorf("Incorrect runtime object for RSP: %v", rsp))
		return ctlutil.StatusError
	}

	clusterNames, err := s.clusterNames()
	if err != nil {
		runtime.HandleError(errors.Wrap(err, "Failed to get cluster list"))
		return ctlutil.StatusError
	}
	if len(clusterNames) == 0 {
		// no joined clusters, nothing to do
		return ctlutil.StatusAllOK
	}
  // 根据 Fed Informer 获取所有 ready 的集群,如果没有可用的集群的话也就暂时什么操作也不做。

  // 主动过滤下 Target
	kind := rsp.Spec.TargetKind
	if kind != "FederatedDeployment" && kind != "FederatedReplicaSet" {
		runtime.HandleError(errors.Wrapf(err, "RSP target kind: %s is incorrect", kind))
		return ctlutil.StatusNeedsRecheck
	}
  // 取下对应 kind 的 plugin
	plugin, ok := s.plugins.Get(kind)
	if !ok {
		return ctlutil.StatusAllOK
	}

	if !plugin.(*Plugin).FederatedTypeExists(qualifiedName.String()) {
		// target FederatedType does not exist, nothing to do
		return ctlutil.StatusAllOK
	}

	key := qualifiedName.String()
  // 获取计算调度的结果
	result, err := s.GetSchedulingResult(rsp, qualifiedName, clusterNames)
	if err != nil {
		runtime.HandleError(errors.Wrapf(err, "Failed to compute the schedule information while reconciling RSP named %q", key))
		return ctlutil.StatusError
	}
  // 分发让 plugin 去更新 Federated Object
	err = plugin.(*Plugin).Reconcile(qualifiedName, result)
	if err != nil {
		runtime.HandleError(errors.Wrapf(err, "Failed to reconcile federated targets for RSP named %q", key))
		return ctlutil.StatusError
	}

	return ctlutil.StatusAllOK
}

ReplicaScheduler.GetSchedulingResult

这部分负责计算调度的实例数分布,但也是写的最杂的一个部分(分配和平衡)

// 注意:这边传入的 ClusterNames 只有 Ready Cluster
func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference, qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, error) {
	key := qualifiedName.String()

  // 这边的 Target 是 Deployment 或者 ReplicaSet
	objectGetter := func(clusterName, key string) (interface{}, bool, error) {
		plugin, ok := s.plugins.Get(rsp.Spec.TargetKind)
		if !ok {
			return nil, false, nil
		}
		return plugin.(*Plugin).targetInformer.GetTargetStore().GetByKey(clusterName, key)
	}
  // 通过 Deployment 或者 ReplicaSet 的 selector 去获取 Namespace 下的 Pod List
	podsGetter := func(clusterName string, unstructuredObj *unstructured.Unstructured) (*corev1.PodList, error) {
		client, err := s.podInformer.GetClientForCluster(clusterName)
		if err != nil {
			return nil, err
		}
    // 风险:这边当前是固定的 path,如果哪天有变动的话,就需要去做差异化
		selectorLabels, ok, err := unstructured.NestedStringMap(unstructuredObj.Object, "spec", "selector", "matchLabels")
		if !ok {
			return nil, errors.New("missing selector on object")
		}
		if err != nil {
			return nil, errors.Wrap(err, "error retrieving selector from object")
		}

		podList := &corev1.PodList{}
		err = client.List(context.Background(), podList, unstructuredObj.GetNamespace(), crclient.MatchingLabels(selectorLabels))
		if err != nil {
			return nil, err
		}
		return podList, nil
	}
  // 获取当前各个集群的 Pod 运行情况
	currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
	if err != nil {
		return nil, err
	}

	// TODO: Move this to API defaulting logic
  // 如果用户没有设定集群,就直接所有集群给同样的权重也就是平分
	if len(rsp.Spec.Clusters) == 0 {
		rsp.Spec.Clusters = map[string]fedschedulingv1a1.ClusterPreferences{
			"*": {Weight: 1},
		}
	}

	plnr := planner.NewPlanner(rsp)
	return schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
}

clustersReplicaState


// clustersReplicaState returns information about the scheduling state of the pods running in the federated clusters.
func clustersReplicaState(
	clusterNames []string,
	key string,
	objectGetter func(clusterName string, key string) (interface{}, bool, error),
	podsGetter func(clusterName string, obj *unstructured.Unstructured) (*corev1.PodList, error)) (currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, err error) {
	currentReplicasPerCluster = make(map[string]int64)
	estimatedCapacity = make(map[string]int64)

	for _, clusterName := range clusterNames {
    // 取到 Deployment 或者 ReplicaSet
		obj, exists, err := objectGetter(clusterName, key)
		if err != nil {
			return nil, nil, err
		}
		if !exists {
			continue
		}

		unstructuredObj := obj.(*unstructured.Unstructured)
		replicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "spec", "replicas")
		if err != nil {
			return nil, nil, errors.Wrap(err, "Error retrieving 'replicas' field")
		}
		if !ok {
			replicas = int64(0)
		}
    // 当前 deploy 和 RS 都有这个
		readyReplicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "status", "readyreplicas")
		if err != nil {
			return nil, nil, errors.Wrap(err, "Error retrieving 'readyreplicas' field")
		}
		if !ok {
			readyReplicas = int64(0)
		}
    // 达到稳定的状态。ready == 期望
		if replicas == readyReplicas {
			currentReplicasPerCluster[clusterName] = readyReplicas
		} else {
			currentReplicasPerCluster[clusterName] = int64(0)
			podList, err := podsGetter(clusterName, unstructuredObj)
			if err != nil {
				return nil, nil, err
			}

      // 根据已有的 Pod Status 计算 Ready 和 Unschedulable Pod 的数量
			podStatus := podanalyzer.AnalyzePods(podList, time.Now())
			currentReplicasPerCluster[clusterName] = int64(podStatus.RunningAndReady) // include pending as well?
      // 这边的计算其实没办法 cover 未创建部分的 pod(比如因为 resourceQuota 而被 webhook 拒绝创建的)
			unschedulable := int64(podStatus.Unschedulable)
			if unschedulable > 0 {
        // 风险点:所以  estimatedCapacity = ready + scheduled but not ready + 未创建?理论上未创建的不应该算上,算上的话会比实际容量大
				estimatedCapacity[clusterName] = replicas - unschedulable
			}
		}
	}
	return currentReplicasPerCluster, estimatedCapacity, nil
}

所以从上面函数我们可以得到两个信息

得到这些信息后,Scheduler 就丢给了 Planner 和 func schedule(...) 去做实例分配

Planner & schedule function

func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference, qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, error) {
  ...
	currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
	if err != nil {
		return nil, err
	}

	// TODO: Move this to API defaulting logic
	if len(rsp.Spec.Clusters) == 0 {
		rsp.Spec.Clusters = map[string]fedschedulingv1a1.ClusterPreferences{
			"*": {Weight: 1},
		}
	}

  // 初始化的时候把 Preference 传入
	plnr := planner.NewPlanner(rsp)
	return schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
}

在前文中,我们知道 GetSchedulingResult 的最后一步是 schedule on planner,那就先来看看 Planner 干啥吧。

Planner

Planner 的唯一一个函数就是 Plan,里面一坨坨的计算,说实在真不想看他。。。但也无奈啊,这部分代码如果不看的话未来很可能翻车(不过如果有必要,到时候直接把 planner 换了)

// replicaSetKey 是一个 qualifiedName.String() 但不知道为什么就叫 replicaSetKey 应该 replicatedKey 或者什么更好点吧
func (p *Planner) Plan(availableClusters []string, currentReplicaCount map[string]int64,
	estimatedCapacity map[string]int64, replicaSetKey string) (map[string]int64, map[string]int64, error) {
	preferences := make([]*namedClusterPreferences, 0, len(availableClusters))
	plan := make(map[string]int64, len(preferences))
	overflow := make(map[string]int64, len(preferences))

	named := func(name string, pref fedschedulingv1a1.ClusterPreferences) (*namedClusterPreferences, error) {
		// Seems to work better than addler for our case.
		hasher := fnv.New32()
		if _, err := hasher.Write([]byte(name)); err != nil {
			return nil, err
		}
		if _, err := hasher.Write([]byte(replicaSetKey)); err != nil {
			return nil, err
		}

		return &namedClusterPreferences{
			clusterName:        name,
			hash:               hasher.Sum32(),  // 后面排序用
			ClusterPreferences: pref,
		}, nil
	}

	for _, cluster := range availableClusters {
		if localRSP, found := p.preferences.Spec.Clusters[cluster]; found {
			preference, err := named(cluster, localRSP)
			if err != nil {
				return nil, nil, err
			}

			preferences = append(preferences, preference)
		} else {
			if localRSP, found := p.preferences.Spec.Clusters["*"]; found {
				preference, err := named(cluster, localRSP)
				if err != nil {
					return nil, nil, err
				}

				preferences = append(preferences, preference)
			} else {
				plan[cluster] = int64(0)
			}
		}
	}
  // 上面这段差不多的意思就是:如果有特定名字的 cluster ClusterPreferences 或者是 *(通配)就算下 hash 丢 namedClusterPreference,否则置 0 (reset)
  // 根据 降序 weight + 升序 hash(加入 hash 主要是为了避免因为同 weight 先后顺序的跳动)
	sort.Sort(byWeight(preferences))

	// This is the requested total replicas in preferences
	remainingReplicas := int64(p.preferences.Spec.TotalReplicas)

  // 按集群容量去给各个集群分配低保实例数
	// Assign each cluster the minimum number of replicas it requested.
	for _, preference := range preferences {
		min := minInt64(preference.MinReplicas, remainingReplicas)
		if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
			min = minInt64(min, capacity)
		}
		remainingReplicas -= min
		plan[preference.clusterName] = min
	}

	// This map contains information how many replicas were assigned to
	// the cluster based only on the current replica count and
	// rebalance=false preference. It will be later used in remaining replica
	// distribution code.
	preallocated := make(map[string]int64)
  // 没有开启实例数平衡的情况下
	if !p.preferences.Spec.Rebalance {
		for _, preference := range preferences {
			planned := plan[preference.clusterName]
			count, hasSome := currentReplicaCount[preference.clusterName]
      // ready 的比当前已有的规划实例数要多
			if hasSome && count > planned {
        // 按 ready 的初始化最终实例数
				target := count
        // 不超过最大实例数
				if preference.MaxReplicas != nil {
					target = minInt64(*preference.MaxReplicas, target)
				}
        // 不超过集群容纳容量
				if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
					target = minInt64(capacity, target)
				}
        // 新的实例数比前面规划的多的话,从 remain 里减一些过来(但不能超过 remain)
				extra := minInt64(target-planned, remainingReplicas)
				if extra < 0 {
					extra = 0
				}
				remainingReplicas -= extra
        // 所以实际存在 preallocated 里的都是相对前面 plan 算的需要多给的量(还需要再给的量)
				preallocated[preference.clusterName] = extra
				plan[preference.clusterName] = extra + planned
			}
		}
	}

	modified := true

	// It is possible single pass of the loop is not enough to distribute all replicas among clusters due
	// to weight, max and rounding corner cases. In such case we iterate until either
	// there is no replicas or no cluster gets any more replicas or the number
	// of attempts is less than available cluster count. If there is no preallocated pods
	// every loop either distributes all remainingReplicas or maxes out at least one cluster.
	// If there are preallocated then the replica spreading may take longer.
	// We reduce the number of pending preallocated replicas by at least half with each iteration so
	// we may need log(replicasAtStart) iterations.
	// TODO: Prove that clusterCount * log(replicas) iterations solves the problem or adjust the number.
	// TODO: This algorithm is O(clusterCount^2 * log(replicas)) which is good for up to 100 clusters.
	// Find something faster.
  // remain 的还有剩,需要再根据 weight 给分完
	for trial := 0; modified && remainingReplicas > 0; trial++ {
		modified = false
		weightSum := int64(0)
		for _, preference := range preferences {
			weightSum += preference.Weight
		}
		newPreferences := make([]*namedClusterPreferences, 0, len(preferences))

    // 复制下,因为后面都是需要基于一轮的剩余总量去算的,而不是实时剩余
		distributeInThisLoop := remainingReplicas

		for _, preference := range preferences {
			if weightSum > 0 {
				start := plan[preference.clusterName]
				// Distribute the remaining replicas, rounding fractions always up.
        // 取最小((剩余量 * 集群权重 + 总权重 - 1) / 总权重,剩余量)
        // -> 最小(剩余量 * (集群权重/总权重)  + 1 - (1/总权重), 剩余量)
        // 疑点:没看懂 `+ 1 - (1/总权重)` 是什么操作,就先假装是要再给 cluster 的量吧
				extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum
        // 不能超过剩余量
				extra = minInt64(extra, remainingReplicas)

				// Account preallocated.
				prealloc := preallocated[preference.clusterName]
        // 疑点:看不懂这个,根据 weight 计算的多给的量跟关闭平衡下要多给的量做对比,哪个少用哪个?
				usedPrealloc := minInt64(extra, prealloc)
				preallocated[preference.clusterName] = prealloc - usedPrealloc
				extra -= usedPrealloc
				if usedPrealloc > 0 {
					modified = true
				}
        // 以上这波计算大意是:
        // 1. 得到这个集群还需要分配的实例数
        // 2. 跟当前已经多给了的做比较(不过如果开了 Rebalance)这边的 usedPrealloc 永远等于 0(modified 就会 false 就没有下一轮了)
        // 3. extra 去减掉 preallocated(最后的值最小是 0 )

				// In total there should be the amount that was there at start plus whatever is due
				// in this iteration
				total := start + extra
        // 以下是修正实例数
				// Check if we don't overflow the cluster, and if yes don't consider this cluster
				// in any of the following iterations.
				full := false
        // 不可超过上限
				if preference.MaxReplicas != nil && total > *preference.MaxReplicas {
					total = *preference.MaxReplicas
					full = true
				}
        // 不可超过预估的集群容量
				if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && total > capacity {
					overflow[preference.clusterName] = total - capacity
					total = capacity
					full = true
				}

				if !full {
					newPreferences = append(newPreferences, preference)
				}

				// Only total-start replicas were actually taken.
				remainingReplicas -= (total - start)
				plan[preference.clusterName] = total

				// Something extra got scheduled on this cluster.
				if total > start {
					modified = true
				}
			} else {
				break
			}
		}
		preferences = newPreferences
	}
  // 返回 plan 和 overflow(针对于早前估算的集群容量)
	if p.preferences.Spec.Rebalance {
		return plan, overflow, nil
	} else {
		// If rebalance = false then overflow is trimmed at the level
		// of replicas that it failed to place somewhere.
		newOverflow := make(map[string]int64)
		for key, value := range overflow {
			value = minInt64(value, remainingReplicas)
			if value > 0 {
				newOverflow[key] = value
			}
		}
		return plan, newOverflow, nil
	}
}
schedule function
func schedule(planner *planner.Planner, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) (map[string]int64, error) {
	scheduleResult, overflow, err := planner.Plan(clusterNames, currentReplicasPerCluster, estimatedCapacity, key)
	if err != nil {
		return nil, err
	}
  // 从 planner 拿到计算后的结果和超过的实例数
	// TODO: Check if we really need to place the federated type in clusters
	// with 0 replicas. Override replicas would be set to 0 in this case.
	result := make(map[string]int64)
	for clusterName := range currentReplicasPerCluster {
		result[clusterName] = 0
	}

	for clusterName, replicas := range scheduleResult {
		result[clusterName] = replicas
	}
	for clusterName, replicas := range overflow {
		result[clusterName] += replicas
	}
  // 构造最终的实例数

	if klog.V(4) {
		buf := bytes.NewBufferString(fmt.Sprintf("Schedule - %q\n", key))
		sort.Strings(clusterNames)
		for _, clusterName := range clusterNames {
			cur := currentReplicasPerCluster[clusterName]
			target := scheduleResult[clusterName]
			fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
			if over, found := overflow[clusterName]; found {
				fmt.Fprintf(buf, " overflow: %d", over)
			}
			if capacity, found := estimatedCapacity[clusterName]; found {
				fmt.Fprintf(buf, " capacity: %d", capacity)
			}
			fmt.Fprintf(buf, "\n")
		}
		klog.V(4).Infof(buf.String())
	}
	return result, nil
}

小结

对于 Planner 的部分真没怎么看懂,如果硬说要看懂,那就是说这部分基本可以认为是:根据最小策略和 weight 一层层铺下去,直到没有剩余实例。然后 rebalance 部分其实是会通过跟当前 ready 的实例数做 min 来保障下最低需要给到集群的数量。

EOF © 2024