本篇是入坑后的填土工作, 主要会放在比如像 SchedulingPreference 这种玩意上。
SchedulingManager
当前主要面向的是 ReplicatedTypes 比如说 Deployment、ReplicaSet 这样一类的对象,因为他们有实例数结合多集群需要做实例数调度。先来看看相关的新结构有什么
// ReplicaSchedulingPreferenceSpec defines the desired state of ReplicaSchedulingPreference
type ReplicaSchedulingPreferenceSpec struct {
// TODO (@irfanurrehman); upgrade this to label selector only if need be.
// The idea of this API is to have a a set of preferences which can
// be used for a target FederatedDeployment or FederatedReplicaset.
// Although the set of preferences in question can be applied to multiple
// target objects using label selectors, but there are no clear advantages
// of doing that as of now.
// To keep the implementation and usage simple, matching ns/name of RSP
// resource to the target resource is sufficient and only additional information
// needed in RSP resource is a target kind (FederatedDeployment or FederatedReplicaset).
TargetKind string `json:"targetKind"`
// Total number of pods desired across federated clusters.
// Replicas specified in the spec for target deployment template or replicaset
// template will be discarded/overridden when scheduling preferences are
// specified.
TotalReplicas int32 `json:"totalReplicas"`
// If set to true then already scheduled and running replicas may be moved to other clusters
// in order to match current state to the specified preferences. Otherwise, if set to false,
// up and running replicas will not be moved.
// +optional
Rebalance bool `json:"rebalance,omitempty"`
// A mapping between cluster names and preferences regarding a local workload object (dep, rs, .. ) in
// these clusters.
// "*" (if provided) applies to all clusters if an explicit mapping is not provided.
// If omitted, clusters without explicit preferences should not have any replicas scheduled.
// +optional
Clusters map[string]ClusterPreferences `json:"clusters,omitempty"`
}
// Preferences regarding number of replicas assigned to a cluster workload object (dep, rs, ..) within
// a federated workload object.
type ClusterPreferences struct {
// Minimum number of replicas that should be assigned to this cluster workload object. 0 by default.
// +optional
MinReplicas int64 `json:"minReplicas,omitempty"`
// Maximum number of replicas that should be assigned to this cluster workload object.
// Unbounded if no value provided (default).
// +optional
MaxReplicas *int64 `json:"maxReplicas,omitempty"`
// A number expressing the preference to put an additional replica to this cluster workload object.
// 0 by default.
Weight int64 `json:"weight,omitempty"`
}
// ReplicaSchedulingPreferenceStatus defines the observed state of ReplicaSchedulingPreference
type ReplicaSchedulingPreferenceStatus struct {
}
// +kubebuilder:object:root=true
// +kubebuilder:resource:path=replicaschedulingpreferences,shortName=rsp
type ReplicaSchedulingPreference struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ReplicaSchedulingPreferenceSpec `json:"spec,omitempty"`
Status ReplicaSchedulingPreferenceStatus `json:"status,omitempty"`
}
SchedulingManagerController
其实跟其他几个 controller 基本的套路是差不多的
- 初始化 Reconcile Worker
- 初始化 FTC(FederatedTypeConfig)Informer
- Run 将 Informer 和 Worker 拉起
所以上述就不多说了,直接奔向 func (*SchedulingManager) reconcile
,因为 Scheduling 的部分比较解耦,所以会穿插着看代码,写起来也比较零碎
reconcile
获取 SchedulingType
// pkg/controller/schedulingmanager/controller.go
func (c *SchedulingManager) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
key := qualifiedName.String()
klog.V(3).Infof("Running reconcile FederatedTypeConfig %q in scheduling manager", key)
typeConfigName := qualifiedName.Name
schedulingType := schedulingtypes.GetSchedulingType(typeConfigName)
if schedulingType == nil {
// No scheduler supported for this resource
return util.StatusAllOK
}
schedulingKind := schedulingType.Kind
...
}
上面的部分是 reconcile 的开篇,根据 qualifiedName(其实也就是 FederatedTypeConfig 的 meta)去取 SchedulingType 也可以认为就是「拿 FederatedTypeConfig.name 去取」(下文 SchedulingType 即为缩写 ST),如果没有取到则不对 FTC 做任何处理。
那这部分 ST 又是怎么来的?是通过代码预注册来的,来看看。
SchedulingType
type SchedulingType struct {
Kind string
SchedulerFactory SchedulerFactory
}
// Mapping of qualified target name (e.g. deployment.apps) targeted
// for scheduling with scheduling type
var typeRegistry = make(map[string]SchedulingType)
func RegisterSchedulingType(kind string, schedulingType SchedulingType) {
existing, ok := typeRegistry[kind]
if ok {
panic(fmt.Sprintf("Kind %q is already registered for scheduling with %q", kind, existing.Kind))
}
typeRegistry[kind] = schedulingType
}
func SchedulingTypes() map[string]SchedulingType {
result := make(map[string]SchedulingType)
for key, value := range typeRegistry {
result[key] = value
}
return result
}
func GetSchedulingType(kind string) *SchedulingType {
schedulingType, ok := typeRegistry[kind]
if ok {
return &schedulingType
}
return nil
}
SchedulingType
就两个变量,一个是对于 FederatedType 的 name(这边写为 kind,也可以认为是 target type 的 GK 形式),另一个是对应的 Scheduler Factory。外部通过 RegisterSchedulingType
注册到一个包内变量。
当前有且仅有两种 ST 就在 pkg/schedulingtypes/replicasscheduler.go
里注册的,分别是 deployments.apps
和 replicasets.apps
,他们的 factory 都是 ReplicaScheduler
(这个咱们放后面说)。
注意点
我们不难发现在 ST 中并不关注 API 的版本,只关注了其名字和所在的 Group。也就是说就 APIResource 升级来说
- 同 Group 内版本升级无感:假如 appsv1.Deployment 升级成 appsv2.Deployment 的情况这边是无感的。但如果两个版本对于 ReplicaScheduler 不兼容就需要区分化处理,就需要调整注册 type 和 FTC 的 name 来实现
- 跨 Group 有感:比如 extensionsv1beta2.Deployment 升级成 appsv1.Deployment 这时候得另外加注册,莫得办法
回到 reconcile
func (c *SchedulingManager) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
key := qualifiedName.String()
...
cachedObj, exist, err := c.store.GetByKey(key)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Failed to query FederatedTypeConfig store for %q in scheduling manager", key))
return util.StatusError
}
if !exist {
c.stopScheduler(schedulingKind, typeConfigName)
return util.StatusAllOK
}
typeConfig := cachedObj.(*corev1b1.FederatedTypeConfig)
if !typeConfig.GetPropagationEnabled() || typeConfig.DeletionTimestamp != nil {
c.stopScheduler(schedulingKind, typeConfigName)
return util.StatusAllOK
}
// 以上从 cache 中获取对应的 FTC 对象
// 下面这个 set 其实是比较迷惑的,在里面设了 TargetType.PluralName, TargetType.Group, FederatedType.PluralName, StatusType
// 然后还写了个 TODO(marun) will name always be populated?
// 疑问点:为什么需要 SET?一脸懵逼,可能是担心某些地方漏了所以糊一糊?但这应由 webhook 处理来着。。。
// set name and group for the type config target
corev1b1.SetFederatedTypeConfigDefaults(typeConfig)
// 以下将会以 ST 创建一个 SchedulingPreferenceController 然后判断是否已经启用对应 FTC name 的 Plugin,如果没有就开起来
// Scheduling preference controller is started on demand
abstractScheduler, ok := c.schedulers.Get(schedulingKind)
if !ok {
klog.Infof("Starting schedulingpreference controller for %s", schedulingKind)
stopChan := make(chan struct{})
schedulerInterface, err := schedulingpreference.StartSchedulingPreferenceController(c.config, *schedulingType, stopChan)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Error starting schedulingpreference controller for %s", schedulingKind))
return util.StatusError
}
abstractScheduler = newSchedulerWrapper(schedulerInterface, stopChan)
c.schedulers.Store(schedulingKind, abstractScheduler)
}
scheduler := abstractScheduler.(*SchedulerWrapper)
if scheduler.HasPlugin(typeConfigName) {
// Scheduler and plugin already running for this target typeConfig
return util.StatusAllOK
}
federatedKind := typeConfig.GetFederatedType().Kind
klog.Infof("Starting plugin %s for %s", federatedKind, schedulingKind)
err = scheduler.StartPlugin(typeConfig)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Error starting plugin %s for %s", federatedKind, schedulingKind))
return util.StatusError
}
scheduler.pluginMap.Store(typeConfigName, federatedKind)
return util.StatusAllOK
以上基本就是 SchedulingManager 的全部了,那接下来就是去看看启动的这一坨 Factory 和 Controller。
SchedulingPreferenceController
// SchedulingPreferenceController starts a new controller for given type of SchedulingPreferences
func StartSchedulingPreferenceController(config *util.ControllerConfig, schedulingType schedulingtypes.SchedulingType, stopChannel <-chan struct{}) (schedulingtypes.Scheduler, error) {
controller, err := newSchedulingPreferenceController(config, schedulingType)
if err != nil {
return nil, err
}
if config.MinimizeLatency {
controller.minimizeLatency()
}
klog.Infof("Starting replicaschedulingpreferences controller")
controller.Run(stopChannel)
return controller.scheduler, nil
}
// newSchedulingPreferenceController returns a new SchedulingPreference Controller for the given type
func newSchedulingPreferenceController(config *util.ControllerConfig, schedulingType schedulingtypes.SchedulingType) (*SchedulingPreferenceController, error) {
userAgent := fmt.Sprintf("%s-controller", schedulingType.Kind)
kubeConfig := restclient.CopyConfig(config.KubeConfig)
restclient.AddUserAgent(kubeConfig, userAgent)
kubeClient, err := kubeclientset.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "replicaschedulingpreference-controller"})
s := &SchedulingPreferenceController{
clusterAvailableDelay: config.ClusterAvailableDelay,
clusterUnavailableDelay: config.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
eventRecorder: recorder,
}
s.worker = util.NewReconcileWorker(s.reconcile, util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
})
eventHandlers := schedulingtypes.SchedulerEventHandlers{
KubeFedEventHandler: s.worker.EnqueueObject,
ClusterEventHandler: func(obj pkgruntime.Object) {
qualifiedName := util.NewQualifiedName(obj)
s.worker.EnqueueForRetry(qualifiedName)
},
ClusterLifecycleHandlers: &util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedv1b1.KubeFedCluster) {
// When new cluster becomes available process all the target resources again.
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
},
// When a cluster becomes unavailable process all the target resources again.
ClusterUnavailable: func(cluster *fedv1b1.KubeFedCluster, _ []interface{}) {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
},
},
}
// 在此之前都是老套路,client、informer、ClusterLifecycleHandler,不赘述了
// 拿 ST.Factory 初始化了一个 scheduler 出来
scheduler, err := schedulingType.SchedulerFactory(config, eventHandlers)
if err != nil {
return nil, err
}
s.scheduler = scheduler
// Build deliverer for triggering cluster reconciliations.
s.clusterDeliverer = util.NewDelayingDeliverer()
// 下面的 ObjectType 是 kuebfed 自己的一个 CRD,当前从 ReplicaScheduler 获取的都是 `&fedschedulingv1a1.ReplicaSchedulingPreference{}`
s.store, s.controller, err = util.NewGenericInformer(
config.KubeConfig,
config.TargetNamespace,
s.scheduler.ObjectType(),
util.NoResyncPeriod,
s.worker.EnqueueObject,
)
if err != nil {
return nil, err
}
return s, nil
}
// 那就启动咯~~
func (s *SchedulingPreferenceController) Run(stopChan <-chan struct{}) {
go s.controller.Run(stopChan)
s.scheduler.Start()
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
s.reconcileOnClusterChange()
})
s.worker.Run(stopChan)
// Ensure all goroutines are cleaned up when the stop channel closes
go func() {
<-stopChan
s.clusterDeliverer.Stop()
s.scheduler.Stop()
}()
}
reconcile
集群部分的 reconcile 就是在集群状态变更的时候所有 Object 都进一把队列,暴力但是简单
func (s *SchedulingPreferenceController) reconcileOnClusterChange() {
if !s.isSynced() {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
}
for _, obj := range s.store.List() {
qualifiedName := util.NewQualifiedName(obj.(pkgruntime.Object))
s.worker.EnqueueWithDelay(qualifiedName, s.smallDelay)
}
}
func (s *SchedulingPreferenceController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
defer metrics.UpdateControllerReconcileDurationFromStart("schedulingpreferencecontroller", time.Now())
if !s.isSynced() {
return util.StatusNotSynced
}
kind := s.scheduler.SchedulingKind()
key := qualifiedName.String()
klog.V(4).Infof("Starting to reconcile %s controller triggered key named %v", kind, key)
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished reconciling %s controller triggered key named %v (duration: %v)", kind, key, time.Since(startTime))
}()
obj, err := s.objFromCache(s.store, kind, key)
if err != nil {
return util.StatusAllOK
}
if obj == nil {
// Nothing to do
return util.StatusAllOK
}
return s.scheduler.Reconcile(obj, qualifiedName)
}
最后还是靠 Scheduler 的 Reconcile 执行实际的东西,也合乎预期。
Scheduler
当前也就只有一个 ReplicaScheduler,那就从他下手咯
// pkg/schedulingtypes/interface.go
type Scheduler interface {
// 对应 ST 的 kind
SchedulingKind() string
// 对应的 SchedulingPreference
ObjectType() pkgruntime.Object
Start()
HasSynced() bool
Stop()
Reconcile(obj pkgruntime.Object, qualifiedName util.QualifiedName) util.ReconciliationStatus
StartPlugin(typeConfig typeconfig.Interface) error
StopPlugin(kind string)
}
type SchedulerEventHandlers struct {
KubeFedEventHandler func(pkgruntime.Object)
ClusterEventHandler func(pkgruntime.Object)
ClusterLifecycleHandlers *util.ClusterLifecycleHandlerFuncs
}
type SchedulerFactory func(controllerConfig *util.ControllerConfig, eventHandlers SchedulerEventHandlers) (Scheduler, error)
ReplicaScheduler
初始化方面没有什么特别,就起了个 pod informer 然后注入了外部提供的 eventHandlers 中的 ClusterLifecycleHandler,然后忽略掉 pod changes 的 trigger(预期不作处理)。
StartPlugin & StopPlugin
当上层 Controller 发现有新的 FTC 且有注册到 ST 的时候就会通过 ReplicaScheduler.StartPlugin
启动一个对应的插件,那这货是干啥的,奇怪的名字?看看咯
func (s *ReplicaScheduler) StartPlugin(typeConfig typeconfig.Interface) error {
kind := typeConfig.GetFederatedType().Kind
// TODO(marun) Return an error if the kind is not supported
plugin, err := NewPlugin(s.controllerConfig, s.eventHandlers, typeConfig)
if err != nil {
return errors.Wrapf(err, "Failed to initialize replica scheduling plugin for %q", kind)
}
plugin.Start()
s.plugins.Store(kind, plugin)
return nil
}
func (s *ReplicaScheduler) StopPlugin(kind string) {
plugin, ok := s.plugins.Get(kind)
if !ok {
return
}
plugin.(*Plugin).Stop()
s.plugins.Delete(kind)
}
看到上面代码的第一眼是懵逼的,从注释和变量名里完全看不出什么东西来。。。继续深入看看 Plugin 的实现。
// pkg/schedulingtypes/plugin.go
func NewPlugin(controllerConfig *util.ControllerConfig, eventHandlers SchedulerEventHandlers, typeConfig typeconfig.Interface) (*Plugin, error) {
targetAPIResource := typeConfig.GetTargetType()
userAgent := fmt.Sprintf("%s-replica-scheduler", strings.ToLower(targetAPIResource.Kind))
client := genericclient.NewForConfigOrDieWithUserAgent(controllerConfig.KubeConfig, userAgent)
targetInformer, err := util.NewFederatedInformer(
controllerConfig,
client,
&targetAPIResource,
eventHandlers.ClusterEventHandler,
eventHandlers.ClusterLifecycleHandlers,
)
if err != nil {
return nil, err
}
// 以上启动了一个 Target Type 的 informer 并传入了 handler,比如 Deployment
p := &Plugin{
targetInformer: targetInformer,
typeConfig: typeConfig,
stopChannel: make(chan struct{}),
}
targetNamespace := controllerConfig.TargetNamespace
kubeFedEventHandler := eventHandlers.KubeFedEventHandler
federatedTypeAPIResource := typeConfig.GetFederatedType()
p.federatedTypeClient, err = util.NewResourceClient(controllerConfig.KubeConfig, &federatedTypeAPIResource)
if err != nil {
return nil, err
}
// 起了一个 FederatedType informer
p.federatedStore, p.federatedController = util.NewResourceInformer(p.federatedTypeClient, targetNamespace, &federatedTypeAPIResource, kubeFedEventHandler)
return p, nil
}
func (p *Plugin) Start() {启动前面初始化的两个 informer}
func (p *Plugin) Reconcile(qualifiedName util.QualifiedName, result map[string]int64) error {
fedObject, err := p.federatedTypeClient.Resources(qualifiedName.Namespace).Get(context.Background(), qualifiedName.Name, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
// Federated resource has been deleted - no further action required
return nil
}
if err != nil {
return err
}
isDirty := false
newClusterNames := []string{}
for name := range result {
newClusterNames = append(newClusterNames, name)
}
clusterNames, err := util.GetClusterNames(fedObject)
if err != nil {
return err
}
// 判断前后两个集群的名字是否变动了来决定是否改变了 placement
if PlacementUpdateNeeded(clusterNames, newClusterNames) {
if err := util.SetClusterNames(fedObject, newClusterNames); err != nil {
return err
}
isDirty = true
}
overridesMap, err := util.GetOverrides(fedObject)
if err != nil {
return errors.Wrapf(err, "Error reading cluster overrides for %s %q", p.typeConfig.GetFederatedType().Kind, qualifiedName)
}
if OverrideUpdateNeeded(overridesMap, result) {
err := setOverrides(fedObject, overridesMap, result)
if err != nil {
return err
}
isDirty = true
}
if isDirty {
// 如果需要更新就丢到 FederatedType Object 上
_, err := p.federatedTypeClient.Resources(qualifiedName.Namespace).Update(context.Background(), fedObject, metav1.UpdateOptions{})
if err != nil {
return err
}
}
return nil
}
好像 plugin 没做啥东西,就是判断是不是更新然后就更新上去。。。
ReplicaScheduler Start & Stop
func (s *ReplicaScheduler) Start() {
s.podInformer.Start()
}
func (s *ReplicaScheduler) Stop() {
for _, plugin := range s.plugins.GetAll() {
plugin.(*Plugin).Stop()
}
s.plugins.DeleteAll()
s.podInformer.Stop()
}
好像没啥玩意,那忽略吧
ReplicaScheduler.Reconcile
func (s *ReplicaScheduler) Reconcile(obj pkgruntime.Object, qualifiedName ctlutil.QualifiedName) ctlutil.ReconciliationStatus {
rsp, ok := obj.(*fedschedulingv1a1.ReplicaSchedulingPreference)
if !ok {
runtime.HandleError(errors.Errorf("Incorrect runtime object for RSP: %v", rsp))
return ctlutil.StatusError
}
clusterNames, err := s.clusterNames()
if err != nil {
runtime.HandleError(errors.Wrap(err, "Failed to get cluster list"))
return ctlutil.StatusError
}
if len(clusterNames) == 0 {
// no joined clusters, nothing to do
return ctlutil.StatusAllOK
}
// 根据 Fed Informer 获取所有 ready 的集群,如果没有可用的集群的话也就暂时什么操作也不做。
// 主动过滤下 Target
kind := rsp.Spec.TargetKind
if kind != "FederatedDeployment" && kind != "FederatedReplicaSet" {
runtime.HandleError(errors.Wrapf(err, "RSP target kind: %s is incorrect", kind))
return ctlutil.StatusNeedsRecheck
}
// 取下对应 kind 的 plugin
plugin, ok := s.plugins.Get(kind)
if !ok {
return ctlutil.StatusAllOK
}
if !plugin.(*Plugin).FederatedTypeExists(qualifiedName.String()) {
// target FederatedType does not exist, nothing to do
return ctlutil.StatusAllOK
}
key := qualifiedName.String()
// 获取计算调度的结果
result, err := s.GetSchedulingResult(rsp, qualifiedName, clusterNames)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Failed to compute the schedule information while reconciling RSP named %q", key))
return ctlutil.StatusError
}
// 分发让 plugin 去更新 Federated Object
err = plugin.(*Plugin).Reconcile(qualifiedName, result)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Failed to reconcile federated targets for RSP named %q", key))
return ctlutil.StatusError
}
return ctlutil.StatusAllOK
}
ReplicaScheduler.GetSchedulingResult
这部分负责计算调度的实例数分布,但也是写的最杂的一个部分(分配和平衡)
// 注意:这边传入的 ClusterNames 只有 Ready Cluster
func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference, qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, error) {
key := qualifiedName.String()
// 这边的 Target 是 Deployment 或者 ReplicaSet
objectGetter := func(clusterName, key string) (interface{}, bool, error) {
plugin, ok := s.plugins.Get(rsp.Spec.TargetKind)
if !ok {
return nil, false, nil
}
return plugin.(*Plugin).targetInformer.GetTargetStore().GetByKey(clusterName, key)
}
// 通过 Deployment 或者 ReplicaSet 的 selector 去获取 Namespace 下的 Pod List
podsGetter := func(clusterName string, unstructuredObj *unstructured.Unstructured) (*corev1.PodList, error) {
client, err := s.podInformer.GetClientForCluster(clusterName)
if err != nil {
return nil, err
}
// 风险:这边当前是固定的 path,如果哪天有变动的话,就需要去做差异化
selectorLabels, ok, err := unstructured.NestedStringMap(unstructuredObj.Object, "spec", "selector", "matchLabels")
if !ok {
return nil, errors.New("missing selector on object")
}
if err != nil {
return nil, errors.Wrap(err, "error retrieving selector from object")
}
podList := &corev1.PodList{}
err = client.List(context.Background(), podList, unstructuredObj.GetNamespace(), crclient.MatchingLabels(selectorLabels))
if err != nil {
return nil, err
}
return podList, nil
}
// 获取当前各个集群的 Pod 运行情况
currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
if err != nil {
return nil, err
}
// TODO: Move this to API defaulting logic
// 如果用户没有设定集群,就直接所有集群给同样的权重也就是平分
if len(rsp.Spec.Clusters) == 0 {
rsp.Spec.Clusters = map[string]fedschedulingv1a1.ClusterPreferences{
"*": {Weight: 1},
}
}
plnr := planner.NewPlanner(rsp)
return schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
}
clustersReplicaState
// clustersReplicaState returns information about the scheduling state of the pods running in the federated clusters.
func clustersReplicaState(
clusterNames []string,
key string,
objectGetter func(clusterName string, key string) (interface{}, bool, error),
podsGetter func(clusterName string, obj *unstructured.Unstructured) (*corev1.PodList, error)) (currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, err error) {
currentReplicasPerCluster = make(map[string]int64)
estimatedCapacity = make(map[string]int64)
for _, clusterName := range clusterNames {
// 取到 Deployment 或者 ReplicaSet
obj, exists, err := objectGetter(clusterName, key)
if err != nil {
return nil, nil, err
}
if !exists {
continue
}
unstructuredObj := obj.(*unstructured.Unstructured)
replicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "spec", "replicas")
if err != nil {
return nil, nil, errors.Wrap(err, "Error retrieving 'replicas' field")
}
if !ok {
replicas = int64(0)
}
// 当前 deploy 和 RS 都有这个
readyReplicas, ok, err := unstructured.NestedInt64(unstructuredObj.Object, "status", "readyreplicas")
if err != nil {
return nil, nil, errors.Wrap(err, "Error retrieving 'readyreplicas' field")
}
if !ok {
readyReplicas = int64(0)
}
// 达到稳定的状态。ready == 期望
if replicas == readyReplicas {
currentReplicasPerCluster[clusterName] = readyReplicas
} else {
currentReplicasPerCluster[clusterName] = int64(0)
podList, err := podsGetter(clusterName, unstructuredObj)
if err != nil {
return nil, nil, err
}
// 根据已有的 Pod Status 计算 Ready 和 Unschedulable Pod 的数量
podStatus := podanalyzer.AnalyzePods(podList, time.Now())
currentReplicasPerCluster[clusterName] = int64(podStatus.RunningAndReady) // include pending as well?
// 这边的计算其实没办法 cover 未创建部分的 pod(比如因为 resourceQuota 而被 webhook 拒绝创建的)
unschedulable := int64(podStatus.Unschedulable)
if unschedulable > 0 {
// 风险点:所以 estimatedCapacity = ready + scheduled but not ready + 未创建?理论上未创建的不应该算上,算上的话会比实际容量大
estimatedCapacity[clusterName] = replicas - unschedulable
}
}
}
return currentReplicasPerCluster, estimatedCapacity, nil
}
所以从上面函数我们可以得到两个信息
- currentReplicasPerCluster: 当前各个 Ready 集群里的 Ready replicas
- estimatedCapacity:预估的集群容量
得到这些信息后,Scheduler 就丢给了 Planner 和 func schedule(...)
去做实例分配
Planner & schedule function
func (s *ReplicaScheduler) GetSchedulingResult(rsp *fedschedulingv1a1.ReplicaSchedulingPreference, qualifiedName ctlutil.QualifiedName, clusterNames []string) (map[string]int64, error) {
...
currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
if err != nil {
return nil, err
}
// TODO: Move this to API defaulting logic
if len(rsp.Spec.Clusters) == 0 {
rsp.Spec.Clusters = map[string]fedschedulingv1a1.ClusterPreferences{
"*": {Weight: 1},
}
}
// 初始化的时候把 Preference 传入
plnr := planner.NewPlanner(rsp)
return schedule(plnr, key, clusterNames, currentReplicasPerCluster, estimatedCapacity)
}
在前文中,我们知道 GetSchedulingResult 的最后一步是 schedule on planner
,那就先来看看 Planner 干啥吧。
Planner
Planner 的唯一一个函数就是 Plan,里面一坨坨的计算,说实在真不想看他。。。但也无奈啊,这部分代码如果不看的话未来很可能翻车(不过如果有必要,到时候直接把 planner 换了)
// replicaSetKey 是一个 qualifiedName.String() 但不知道为什么就叫 replicaSetKey 应该 replicatedKey 或者什么更好点吧
func (p *Planner) Plan(availableClusters []string, currentReplicaCount map[string]int64,
estimatedCapacity map[string]int64, replicaSetKey string) (map[string]int64, map[string]int64, error) {
preferences := make([]*namedClusterPreferences, 0, len(availableClusters))
plan := make(map[string]int64, len(preferences))
overflow := make(map[string]int64, len(preferences))
named := func(name string, pref fedschedulingv1a1.ClusterPreferences) (*namedClusterPreferences, error) {
// Seems to work better than addler for our case.
hasher := fnv.New32()
if _, err := hasher.Write([]byte(name)); err != nil {
return nil, err
}
if _, err := hasher.Write([]byte(replicaSetKey)); err != nil {
return nil, err
}
return &namedClusterPreferences{
clusterName: name,
hash: hasher.Sum32(), // 后面排序用
ClusterPreferences: pref,
}, nil
}
for _, cluster := range availableClusters {
if localRSP, found := p.preferences.Spec.Clusters[cluster]; found {
preference, err := named(cluster, localRSP)
if err != nil {
return nil, nil, err
}
preferences = append(preferences, preference)
} else {
if localRSP, found := p.preferences.Spec.Clusters["*"]; found {
preference, err := named(cluster, localRSP)
if err != nil {
return nil, nil, err
}
preferences = append(preferences, preference)
} else {
plan[cluster] = int64(0)
}
}
}
// 上面这段差不多的意思就是:如果有特定名字的 cluster ClusterPreferences 或者是 *(通配)就算下 hash 丢 namedClusterPreference,否则置 0 (reset)
// 根据 降序 weight + 升序 hash(加入 hash 主要是为了避免因为同 weight 先后顺序的跳动)
sort.Sort(byWeight(preferences))
// This is the requested total replicas in preferences
remainingReplicas := int64(p.preferences.Spec.TotalReplicas)
// 按集群容量去给各个集群分配低保实例数
// Assign each cluster the minimum number of replicas it requested.
for _, preference := range preferences {
min := minInt64(preference.MinReplicas, remainingReplicas)
if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
min = minInt64(min, capacity)
}
remainingReplicas -= min
plan[preference.clusterName] = min
}
// This map contains information how many replicas were assigned to
// the cluster based only on the current replica count and
// rebalance=false preference. It will be later used in remaining replica
// distribution code.
preallocated := make(map[string]int64)
// 没有开启实例数平衡的情况下
if !p.preferences.Spec.Rebalance {
for _, preference := range preferences {
planned := plan[preference.clusterName]
count, hasSome := currentReplicaCount[preference.clusterName]
// ready 的比当前已有的规划实例数要多
if hasSome && count > planned {
// 按 ready 的初始化最终实例数
target := count
// 不超过最大实例数
if preference.MaxReplicas != nil {
target = minInt64(*preference.MaxReplicas, target)
}
// 不超过集群容纳容量
if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
target = minInt64(capacity, target)
}
// 新的实例数比前面规划的多的话,从 remain 里减一些过来(但不能超过 remain)
extra := minInt64(target-planned, remainingReplicas)
if extra < 0 {
extra = 0
}
remainingReplicas -= extra
// 所以实际存在 preallocated 里的都是相对前面 plan 算的需要多给的量(还需要再给的量)
preallocated[preference.clusterName] = extra
plan[preference.clusterName] = extra + planned
}
}
}
modified := true
// It is possible single pass of the loop is not enough to distribute all replicas among clusters due
// to weight, max and rounding corner cases. In such case we iterate until either
// there is no replicas or no cluster gets any more replicas or the number
// of attempts is less than available cluster count. If there is no preallocated pods
// every loop either distributes all remainingReplicas or maxes out at least one cluster.
// If there are preallocated then the replica spreading may take longer.
// We reduce the number of pending preallocated replicas by at least half with each iteration so
// we may need log(replicasAtStart) iterations.
// TODO: Prove that clusterCount * log(replicas) iterations solves the problem or adjust the number.
// TODO: This algorithm is O(clusterCount^2 * log(replicas)) which is good for up to 100 clusters.
// Find something faster.
// remain 的还有剩,需要再根据 weight 给分完
for trial := 0; modified && remainingReplicas > 0; trial++ {
modified = false
weightSum := int64(0)
for _, preference := range preferences {
weightSum += preference.Weight
}
newPreferences := make([]*namedClusterPreferences, 0, len(preferences))
// 复制下,因为后面都是需要基于一轮的剩余总量去算的,而不是实时剩余
distributeInThisLoop := remainingReplicas
for _, preference := range preferences {
if weightSum > 0 {
start := plan[preference.clusterName]
// Distribute the remaining replicas, rounding fractions always up.
// 取最小((剩余量 * 集群权重 + 总权重 - 1) / 总权重,剩余量)
// -> 最小(剩余量 * (集群权重/总权重) + 1 - (1/总权重), 剩余量)
// 疑点:没看懂 `+ 1 - (1/总权重)` 是什么操作,就先假装是要再给 cluster 的量吧
extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum
// 不能超过剩余量
extra = minInt64(extra, remainingReplicas)
// Account preallocated.
prealloc := preallocated[preference.clusterName]
// 疑点:看不懂这个,根据 weight 计算的多给的量跟关闭平衡下要多给的量做对比,哪个少用哪个?
usedPrealloc := minInt64(extra, prealloc)
preallocated[preference.clusterName] = prealloc - usedPrealloc
extra -= usedPrealloc
if usedPrealloc > 0 {
modified = true
}
// 以上这波计算大意是:
// 1. 得到这个集群还需要分配的实例数
// 2. 跟当前已经多给了的做比较(不过如果开了 Rebalance)这边的 usedPrealloc 永远等于 0(modified 就会 false 就没有下一轮了)
// 3. extra 去减掉 preallocated(最后的值最小是 0 )
// In total there should be the amount that was there at start plus whatever is due
// in this iteration
total := start + extra
// 以下是修正实例数
// Check if we don't overflow the cluster, and if yes don't consider this cluster
// in any of the following iterations.
full := false
// 不可超过上限
if preference.MaxReplicas != nil && total > *preference.MaxReplicas {
total = *preference.MaxReplicas
full = true
}
// 不可超过预估的集群容量
if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && total > capacity {
overflow[preference.clusterName] = total - capacity
total = capacity
full = true
}
if !full {
newPreferences = append(newPreferences, preference)
}
// Only total-start replicas were actually taken.
remainingReplicas -= (total - start)
plan[preference.clusterName] = total
// Something extra got scheduled on this cluster.
if total > start {
modified = true
}
} else {
break
}
}
preferences = newPreferences
}
// 返回 plan 和 overflow(针对于早前估算的集群容量)
if p.preferences.Spec.Rebalance {
return plan, overflow, nil
} else {
// If rebalance = false then overflow is trimmed at the level
// of replicas that it failed to place somewhere.
newOverflow := make(map[string]int64)
for key, value := range overflow {
value = minInt64(value, remainingReplicas)
if value > 0 {
newOverflow[key] = value
}
}
return plan, newOverflow, nil
}
}
schedule function
func schedule(planner *planner.Planner, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) (map[string]int64, error) {
scheduleResult, overflow, err := planner.Plan(clusterNames, currentReplicasPerCluster, estimatedCapacity, key)
if err != nil {
return nil, err
}
// 从 planner 拿到计算后的结果和超过的实例数
// TODO: Check if we really need to place the federated type in clusters
// with 0 replicas. Override replicas would be set to 0 in this case.
result := make(map[string]int64)
for clusterName := range currentReplicasPerCluster {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
// 构造最终的实例数
if klog.V(4) {
buf := bytes.NewBufferString(fmt.Sprintf("Schedule - %q\n", key))
sort.Strings(clusterNames)
for _, clusterName := range clusterNames {
cur := currentReplicasPerCluster[clusterName]
target := scheduleResult[clusterName]
fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
if over, found := overflow[clusterName]; found {
fmt.Fprintf(buf, " overflow: %d", over)
}
if capacity, found := estimatedCapacity[clusterName]; found {
fmt.Fprintf(buf, " capacity: %d", capacity)
}
fmt.Fprintf(buf, "\n")
}
klog.V(4).Infof(buf.String())
}
return result, nil
}
小结
对于 Planner 的部分真没怎么看懂,如果硬说要看懂,那就是说这部分基本可以认为是:根据最小策略和 weight 一层层铺下去,直到没有剩余实例。然后 rebalance 部分其实是会通过跟当前 ready 的实例数做 min 来保障下最低需要给到集群的数量。