14 min read
「kubefed 00」从入门到入坑

Kubernetes 多集群搞了那么多年了,貌似也没倒腾出比较 stable 的项目,看看现在 kubefed 怎么样。本篇是 kubefed 的开篇。

FBI WARNING: 不喜欢看代码分析的就直接去最后一篇看结论就好

前言

当需要考虑到 Kubernetes 多集群的时候,我们往往是什么样的场景?

  • 集群容灾:当某个集群异常的时候迁移到另一个集群中
  • 多云容灾:当前在业界的另一个解释是不让云厂控制「你」
  • 边缘计算:边缘计算中常常会有多地自治域的存在,所以常常是边缘上有一坨的 Kubernetes 集群(没用 Kubernetes 当我没说)
  • 普普通通的多集群:真就很普通的多集群,服务需要部署多集群(虽然也是容灾,但不是为了 Kubernetes 的容灾而是服务的容灾)

k8s 在这个 SIG 的路确实走的有点久,从最开始的 Kubernetes Federation 再到推倒重写 kubefed,虽然浩浩荡荡但是还没有 GA。有些厂提供了 kubefed 的能力个用户,也有些许大厂自己做多集群,比如 Anthos、Rancher 等等。 所以究竟 kubefed 是怎么样的一个东西,值得去用吗?不知道,但里面一定有值得学的。

本文基于 kubefed v0.5.1 8a2f4a47 进行分析,因为有代码解读所以会又臭又长,如果不想看代码就直接拉到最后

代码解读

以下忽略几个内容

  • DNS 相关部分

概念一口闷

concepts

其实这些概念 repo README 也有拉,不过为了阅读方便就搬过来翻译下

Type configuration:对应 FederatedTypeConfig CRD,主要是用来描述什么 API Type 需要被 federated(然后这个 CRD 里主要有一个字段 Propagation 用来标明是否开启传播) Cluster Configuration:不严格的对应 KubefedCluster CRD,主要是一个集群的集合,但实际上概念中这个还包括了操作对应集群的 KubeConfig 等信息

Federated Type:这货是当我们对某个对象开启 Federate 的时候由 kubefed 生成的一个 CRD,比如给 Deployment 开 Federated,那就会生成一个 FederatedDeployment。在一众的 FederatedType 中不变的就是三件套

  • Template(动态生成):实际我们需要创建的 Object 的 Schema,比如 FederatedDeployment 中的 Template 就是个 Deployment。
  • Placement(固定):指定需要放置的集群,比如通过名字或者通过标签
  • Override(固定):针对于各个集群对 Template 的 patch/override

Scheduling:通过 SchedulingPreference 实现控制各个集群的调度情况,比如当前提供了 Deployment、ReplicaSet 的实例数调度支持,以控制不同集群调度的配比。

当前多集群的结构是:一个 Host Cluster 负责主持大局,然后有无数的 Member Cluster 注册到 Host Cluster 被管理,当然 Host Cluster 可以将自己注册为 Member Cluster。 概括起来就是:一主(Cluster)多从,一挂全挂,横屏 kubefed?

基于上述的概念,咱们可以猜猜是怎么实现的?大概是

  1. 实现 FederatedCRD(FCRD) 里面包括了我们期望创建的 API Type
  2. 当我们创建这个 FCRD 时帮我们调度下然后发布到对应的集群上
  3. 采集点 status

那是事实是?就是这样没错

上代码

咱们看看代码结构吧,不重要的包或者目录我就扔掉了

├── cmd  一切 binary 的入口
│   ├── controller-manager
│   │   └── app
│   │       ├── leaderelection
│   │       └── options
│   ├── hyperfed
│   ├── kubefedctl
│   └── webhook 启动 webhook
│       └── app
├── pkg
│   ├── apis  定义 CRD 的地方
│   │   ├── core
│   │   │   ├── common
│   │   │   ├── typeconfig
│   │   │   ├── v1alpha1
│   │   │   └── v1beta1
│   │   │       ├── defaults
│   │   │       └── validation
│   │   ├── multiclusterdns
│   │   │   └── v1alpha1
│   │   └── scheduling
│   │       └── v1alpha1
│   ├── client  实现的一个基于 runtime.Object receiver 的 Kubernetes client
│   │   └── generic
│   │       └── scheme
│   ├── controller  ControllerManager 的主要组成
│   │   ├── dnsendpoint
│   │   ├── federatedtypeconfig
│   │   ├── ingressdns
│   │   ├── kubefedcluster
│   │   ├── schedulingmanager
│   │   ├── schedulingpreference
│   │   ├── servicedns
│   │   ├── status
│   │   ├── sync 核心同步逻辑
│   │   │   ├── dispatch
│   │   │   ├── status
│   │   │   └── version
│   │   ├── testdata
│   │   │   └── fixtures
│   │   ├── util
│   │   │   ├── finalizers
│   │   │   ├── planner
│   │   │   └── podanalyzer
│   │   └── webhook
│   │       ├── federatedtypeconfig
│   │       ├── kubefedcluster
│   │       └── kubefedconfig
│   ├── features  存放 Feature 的基本信息和开关
│   ├── kubefedctl  kubefedctl 的实际实现包
│   │   ├── enable
│   │   ├── federate
│   │   ├── options
│   │   ├── orphaning
│   │   └── util
│   ├── metrics  各项暴露的监测指标
│   └── schedulingtypes  给 SchedulingManager 提供各种 API Type 做 SchedulingPreference 计算的小弟

以上基本可以看到整个代码组成的结构:

  • kubefedctl:提供 cli 来控制各种 CRD 创建修改
  • webhook:Kubernetes Admission Webhook,当前用了 mutating 和 validating 的 hook,接管的对象有 federatedTypeConfig、kubefedCluster、kuebfedConfig
  • controller manager:kubefed 核心所在

咱们就不扯 kuebfedctlwebhook 了,直接上 controller 部分,也就是所谓的 kubefed control-panel。不过在这之前,先来看下 kubefed 的数据结构和一些名词定义(比如缩写省得写一大串)

主要数据结构

metav1.APIResource

这货是定义一个 Type 的基本元信息(就不翻译了)

// APIResource specifies the name of a resource and whether it is namespaced.
type APIResource struct {
	// name is the plural name of the resource.
	Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
	// singularName is the singular name of the resource.  This allows clients to handle plural and singular opaquely.
	// The singularName is more correct for reporting status on a single item and both singular and plural are allowed
	// from the kubectl CLI interface.
	SingularName string `json:"singularName" protobuf:"bytes,6,opt,name=singularName"`
	// namespaced indicates if a resource is namespaced or not.
	Namespaced bool `json:"namespaced" protobuf:"varint,2,opt,name=namespaced"`
	// group is the preferred group of the resource.  Empty implies the group of the containing resource list.
	// For subresources, this may have a different value, for example: Scale".
	Group string `json:"group,omitempty" protobuf:"bytes,8,opt,name=group"`
	// version is the preferred version of the resource.  Empty implies the version of the containing resource list
	// For subresources, this may have a different value, for example: v1 (while inside a v1beta1 version of the core resource's group)".
	Version string `json:"version,omitempty" protobuf:"bytes,9,opt,name=version"`
	// kind is the kind for the resource (e.g. 'Foo' is the kind for a resource 'foo')
	Kind string `json:"kind" protobuf:"bytes,3,opt,name=kind"`
	// verbs is a list of supported kube verbs (this includes get, list, watch, create,
	// update, patch, delete, deletecollection, and proxy)
	Verbs Verbs `json:"verbs" protobuf:"bytes,4,opt,name=verbs"`
	// shortNames is a list of suggested short names of the resource.
	ShortNames []string `json:"shortNames,omitempty" protobuf:"bytes,5,rep,name=shortNames"`
	// categories is a list of the grouped resources this resource belongs to (e.g. 'all')
	Categories []string `json:"categories,omitempty" protobuf:"bytes,7,rep,name=categories"`
	// The hash value of the storage version, the version this resource is
	// converted to when written to the data store. Value must be treated
	// as opaque by clients. Only equality comparison on the value is valid.
	// This is an alpha feature and may change or be removed in the future.
	// The field is populated by the apiserver only if the
	// StorageVersionHash feature gate is enabled.
	// This field will remain optional even if it graduates.
	// +optional
	StorageVersionHash string `json:"storageVersionHash,omitempty" protobuf:"bytes,10,opt,name=storageVersionHash"`
}

然后 kubefed 自己撸了一个 APIResource 两者在关键信息上差别不大

// APIResource defines how to configure the dynamic client for an API resource.
type APIResource struct {
	// metav1.GroupVersion is not used since the json annotation of
	// the fields enforces them as mandatory.

	// Group of the resource.
	// +optional
	Group string `json:"group,omitempty"`
	// Version of the resource.
	Version string `json:"version"`
	// Camel-cased singular name of the resource (e.g. ConfigMap)
	Kind string `json:"kind"`
	// Lower-cased plural name of the resource (e.g. configmaps).  If
	// not provided, it will be computed by lower-casing the kind and
	// suffixing an 's'.
	PluralName string `json:"pluralName"`
	// Scope of the resource.
	Scope apiextv1b1.ResourceScope `json:"scope"`
}

FederatedTypeConfig

缩写 FTC,用于生成 FederatedType

type FederatedTypeConfig struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec FederatedTypeConfigSpec `json:"spec"`
	// +optional
	Status FederatedTypeConfigStatus `json:"status,omitempty"`
}

// 主要的定义都在这
// FederatedTypeConfigSpec defines the desired state of FederatedTypeConfig.
type FederatedTypeConfigSpec struct {
	// The configuration of the target type. If not set, the pluralName and
	// groupName fields will be set from the metadata.name of this resource. The
	// kind field must be set.
  // 指定我们期望 Federated 的目标类型
	TargetType APIResource `json:"targetType"`
	// Whether or not propagation to member clusters should be enabled.
  // 表明是否需要 sync,当前只有 Enable 和 Disable
	Propagation PropagationMode `json:"propagation"`
	// Configuration for the federated type that defines (via
	// template, placement and overrides fields) how the target type
	// should appear in multiple cluster.
  // 指定我们生成的 APIResource 是什么样的,说白就是生成的 FederatedCRD 中名字、group、version 等信息
	FederatedType APIResource `json:"federatedType"`
	// Configuration for the status type that holds information about which type
	// holds the status of the federated resource. If not provided, the group
	// and version will default to those provided for the federated type api
	// resource.
	// +optional
  // target type 的 Status 类型
	StatusType *APIResource `json:"statusType,omitempty"`
	// Whether or not Status object should be populated.
	// +optional
  // 是否进行 Target Type Status 做采集
	StatusCollection *StatusCollectionMode `json:"statusCollection,omitempty"`
}

KubefedCluster

这个结构体是 member cluster 注册到 Host cluster 时的产物。在 Spec 中主要存储对应集群的 APIServer 和访问凭证, 而在 Status 中主要存储的是集群的健康状态以及如包含的 Region、Zone 等。

type KubeFedCluster struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec KubeFedClusterSpec `json:"spec"`
	// +optional
	Status KubeFedClusterStatus `json:"status,omitempty"`
}

// KubeFedClusterSpec defines the desired state of KubeFedCluster
type KubeFedClusterSpec struct {
	// The API endpoint of the member cluster. This can be a hostname,
	// hostname:port, IP or IP:port.
	APIEndpoint string `json:"apiEndpoint"`

	// CABundle contains the certificate authority information.
	// +optional
	CABundle []byte `json:"caBundle,omitempty"`

	// Name of the secret containing the token required to access the
	// member cluster. The secret needs to exist in the same namespace
	// as the control plane and should have a "token" key.
	SecretRef LocalSecretReference `json:"secretRef"`

	// DisabledTLSValidations defines a list of checks to ignore when validating
	// the TLS connection to the member cluster.  This can be any of *, SubjectName, or ValidityPeriod.
	// If * is specified, it is expected to be the only option in list.
	// +optional
	DisabledTLSValidations []TLSValidation `json:"disabledTLSValidations,omitempty"`
}

// KubeFedClusterStatus contains information about the current status of a
// cluster updated periodically by cluster controller.
type KubeFedClusterStatus struct {
	// Conditions is an array of current cluster conditions.
	Conditions []ClusterCondition `json:"conditions"`
	// Zones are the names of availability zones in which the nodes of the cluster exist, e.g. 'us-east1-a'.
	// +optional
	Zones []string `json:"zones,omitempty"`
	// Region is the name of the region in which all of the nodes in the cluster exist.  e.g. 'us-east1'.
	// +optional
	Region *string `json:"region,omitempty"`
}

KubefedConfig

这个是 kubefed 的配置,不太像深入,这个大家各自看代码就好,本身涉及不多

FederatedType 是怎么创建的

最开始,我们会手动或者通过 kubefed client 或者什么其他姿势,反正就是创建了一个 FederatedTypeConfig 就像下面

apiVersion: core.kubefed.io/v1beta1
kind: FederatedTypeConfig
metadata:
  annotations:
    meta.helm.sh/release-name: kubefed
    meta.helm.sh/release-namespace: kube-federation-system
  finalizers:
  - core.kubefed.io/federated-type-config
  generation: 1
  labels:
    app.kubernetes.io/managed-by: Helm
  name: deployments.apps
  namespace: kube-federation-system
spec:
  federatedType:
    group: types.kubefed.io
    kind: FederatedDeployment
    pluralName: federateddeployments
    scope: Namespaced
    version: v1beta1
  propagation: Enabled
  targetType:
    group: apps
    kind: Deployment
    pluralName: deployments
    scope: Namespaced
    version: v1

apps/v1.Deployment federated 了,federatedType 表明最后生成的 FCRD 的基本信息,targetType 就是我们想要 federated 的 Type 信息,propagation 这边设为开启。当这样一个 Object 被创建或者更新时,FederatedTypeConfigController 也就对应 repo 中的 pkg/controller/federatedtypeconfig 就会开始工作(如果启动了的话,他由 cmd 里的 controller-manager 拉起,不赘述)。

FederatedTypeConfig 的创建

注意:FTC 的 name 是为 targetType 的 name.group 组成(core group 可省略 group,毕竟其可为「空串」)

启动时在 cmd 中 controller-manager 初始化 FederatedTypeConfigController

// newController returns a new controller to manage FederatedTypeConfig objects.
func newController(config *util.ControllerConfig) (*Controller, error) {
  ...创建 genericClient...

	c := &Controller{
		controllerConfig: config,
		client:           genericclient,
		stopChannels:     make(map[string]chan struct{}),
	}
  // 创建了一个延迟队列包裹的 c.reconcile,后续我们通过往 worker 塞东西后入延迟队列再到 c.reconcile 处理
	c.worker = util.NewReconcileWorker(c.reconcile, util.WorkerTiming{})

  // 根据用户设定的 `KubeFedNamespace` 去起 FTC 的 informer,这边之所以是特定而非全部 namespace
  // 主要是因为 kubefed 可以是 namespace level 的 control,如果是直接监听全部的话当是 namespace scope 的时候会有权限问题
  // 当 informer 有新东西的时候就会通过 c.worker.EnqueueObject 入到 c.reconcile 去处理。
  //
  // **其他几个 controller 也是类似的套路,如果没有特别的就不赘述了**
  //
  // 这边的 store 是 `cache.Store` 也就是原生 cache 存储的地方
  // c.controller 实际为 InformerController 可以不用过多关注
	c.store, c.controller, err = util.NewGenericInformer(
		kubeConfig,
		config.KubeFedNamespace,
		&corev1b1.FederatedTypeConfig{},
		util.NoResyncPeriod,
		c.worker.EnqueueObject,
	)
	if err != nil {
		return nil, err
	}

	return c, nil
}

初始化后通过 Controller.Run(stopCh) 启动这个 Controller。

当我们创建或者修改了一个 FederatedTypeConfig 的时候,informer 会发现并通过 EnqueueObject 塞到 FederatedTypeConfigController.concile 做处理,那就看看这货现在怎么处理 FTC 的吧。

func (c *Controller) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
	// 省略一坨从 cache 拿 FTC 的过程,没啥特别的

	syncEnabled := typeConfig.GetPropagationEnabled()
	statusEnabled := typeConfig.GetStatusEnabled()

	limitedScope := c.controllerConfig.TargetNamespace != metav1.NamespaceAll
  // 对于开了 sync 并且当前 kubefed 限制在特定 namespace 的,然后 target type 又是非 namesapced。这种情况就是非预期,需要关闭他的 sync。
	if limitedScope && syncEnabled && !typeConfig.GetNamespaced() {
    // 如果没取到 stop ch 则构造一个
    // 问题点:没有 get 到为什么需要造一个,因为从总体语义来说,如果能 get 到说明对应 sync Contoller是 running 的
		_, ok := c.getStopChannel(typeConfig.Name)
		if !ok {
      // 塞个新的到 StopChannel
		}

    // 更新 FTC 的 status 并 return 终止流程
	}

	statusKey := typeConfig.Name + "/status"
	syncStopChan, syncRunning := c.getStopChannel(typeConfig.Name)
	statusStopChan, statusRunning := c.getStopChannel(statusKey)

	deleted := typeConfig.DeletionTimestamp != nil
	if deleted {
    // 如果 sync controller 和 status controller 开着,就关掉它们

    // 如果 federatedNamespace 要被删除的话,按总体的设计来说,namespace 内的 Namespaced FederatedTypes 需要全部重新回炉下
		if typeConfig.IsNamespace() {
      // 这个函数会将 非 Namespace 对象但是 namespaced 的 FederatedTypes 全部重新入 reconcile 处理下
      c.reconcileOnNamespaceFTCUpdate()
		}

    // 移除 finalizer
		err := c.removeFinalizer(typeConfig)
		return util.StatusAllOK
	}
  // 确保 finalizer 都设置到位了
	updated, err := c.ensureFinalizer(typeConfig)
	if err != nil {
		return util.StatusError
	} else if updated && typeConfig.IsNamespace() {
		// 这边是新建的 Namespace FTC 的时候会进入的流程,这时候会把之前 Namespaced 的 FTC 都过一把,
    // 因为早前因为没有 Namespace 的存在,所以他们都不会创建 sync controller,这时候需要全部拉起来跑。
    // 问题点:但其实这货根据这个作为条件也很奇怪,因为我如果手动去掉 FTC 的 finalizer 也会触发 updated。。。
		c.reconcileOnNamespaceFTCUpdate()
	}

	startNewSyncController := !syncRunning && syncEnabled
  // 如果 sync controller 在运行,但是不需要再开启了或者当前是个 namespaced 的 type 而且对应的 namespace FTC 不存在,也需要把 sync controller 停掉
	stopSyncController := syncRunning && (!syncEnabled || (typeConfig.GetNamespaced() && !c.namespaceFTCExists()))
	if startNewSyncController {
    // 通过 c.startSyncController(typeConfig) 启动 sync controller
	} else if stopSyncController {
		// 通过 c.stopController(typeConfig.Name, syncStopChan) 关停 sync controller
	}

	startNewStatusController := !statusRunning && statusEnabled
	stopStatusController := statusRunning && !statusEnabled
	if startNewStatusController {
		// c.startStatusController(statusKey, typeConfig) 启动 status controller
	} else if stopStatusController {
		c.stopController(statusKey, statusStopChan)
	}
  // sync controller 已经开着了,但是 FTC status 的 ObservedGeneration 跟 FTC 的 Generation 不一致,
  // 那么说明可能 out-of-sync,所以重启下 sync controller
	if !startNewSyncController && !stopSyncController &&
		typeConfig.Status.ObservedGeneration != typeConfig.Generation {
		if err := c.refreshSyncController(typeConfig); err != nil {
			runtime.HandleError(err)
			return util.StatusError
		}
	}

	// 更新 SyncController 和 StatusController 到 FTC
	return util.StatusAllOK
}

KubeFedSyncController

在上文中,当有一个 FTC 存在且需要 sync 的时候,就会通过 startSyncController(tc *corev1b1.FederatedTypeConfig) 去启动一个 SyncController,接下来看看 SyncController 的启动和工作内容。

func (c *Controller) startSyncController(tc *corev1b1.FederatedTypeConfig) error {
	ftc := tc.DeepCopyObject().(*corev1b1.FederatedTypeConfig)
	kind := ftc.Spec.FederatedType.Kind

  // 如果是一个 namespaced target type,那么我们需要在启动前确保 FederatedNamespace 已经开启了,不然无法进行
	var fedNamespaceAPIResource *metav1.APIResource
	if ftc.GetNamespaced() {
		var err error
		fedNamespaceAPIResource, err = c.getFederatedNamespaceAPIResource()
		if err != nil {
			return errors.Wrapf(err, "Unable to start sync controller for %q due to missing FederatedTypeConfig for namespaces", kind)
		}
	}

	stopChan := make(chan struct{})
  // 实际启动一个 SyncController
	err := synccontroller.StartKubeFedSyncController(c.controllerConfig, stopChan, ftc, fedNamespaceAPIResource)
	if err != nil {
		close(stopChan)
		return errors.Wrapf(err, "Error starting sync controller for %q", kind)
	}
	klog.Infof("Started sync controller for %q", kind)
  // 塞到 stopChannels 标明这个 FTC 已经有了 running 的 SyncController
	c.lock.Lock()
	defer c.lock.Unlock()
	c.stopChannels[ftc.Name] = stopChan
	return nil
}


// pkg/controller/sync/controller.go
以下
func StartKubeFedSyncController(controllerConfig *util.ControllerConfig, stopChan <-chan struct{}, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) error {
	controller, err := newKubeFedSyncController(controllerConfig, typeConfig, fedNamespaceAPIResource)
	if err != nil {
		return err
	}
	// 问题点:这是一个非常诡异的东西,从注释看仅仅为了测试。。。
	if controllerConfig.MinimizeLatency {
		controller.minimizeLatency()
	}
	klog.Infof(fmt.Sprintf("Starting sync controller for %q", typeConfig.GetFederatedType().Kind))
	controller.Run(stopChan)
	return nil
}

// newKubeFedSyncController returns a new sync controller for the configuration
func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) (*KubeFedSyncController, error) {
	// 初始化 generic client 和一个 event recorder
	...

	s := &KubeFedSyncController{
		clusterAvailableDelay:   controllerConfig.ClusterAvailableDelay,
		clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
		smallDelay:              time.Second * 3,
		eventRecorder:           recorder,
		typeConfig:              typeConfig,
		hostClusterClient:       client,
		skipAdoptingResources:   controllerConfig.SkipAdoptingResources,
		limitedScope:            controllerConfig.LimitedScope(),
	}

	s.worker = util.NewReconcileWorker(s.reconcile, util.WorkerTiming{
		ClusterSyncDelay: s.clusterAvailableDelay,
	})

	// Build deliverer for triggering cluster reconciliations.
	s.clusterDeliverer = util.NewDelayingDeliverer()

	targetAPIResource := typeConfig.GetTargetType()

	// 起一个 informer watch 在 HostCluster 的 TargetType 上,适时的如 worker 队列。
	// 重点:并处理集群的健康状态变更,当 member cluster 有状态变更的时候,直接重新过一遍全部的 Target Type Object
	var err error
	s.informer, err = util.NewFederatedInformer(
		controllerConfig,
		client,
		&targetAPIResource,
		func(obj pkgruntime.Object) {
			qualifiedName := util.NewQualifiedName(obj)
			s.worker.EnqueueForRetry(qualifiedName)
		},
		&util.ClusterLifecycleHandlerFuncs{
			ClusterAvailable: func(cluster *fedv1b1.KubeFedCluster) {
				// When new cluster becomes available process all the target resources again.
				s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
			},
			// When a cluster becomes unavailable process all the target resources again.
			ClusterUnavailable: func(cluster *fedv1b1.KubeFedCluster, _ []interface{}) {
				s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
			},
		},
	)
	if err != nil {
		return nil, err
	}

	// 重点
	s.fedAccessor, err = NewFederatedResourceAccessor(
		controllerConfig, typeConfig, fedNamespaceAPIResource,
		client, s.worker.EnqueueObject, recorder)
	if err != nil {
		return nil, err
	}

	return s, nil
}

在 FTCController 部分的 SyncController 启动逻辑不会太难理解,总体可以理解为:如果是 Namespaced Type 那么需要保证 FederatedNamespace 存在,不然就不进行创建。如果只是普通的 Type 则直接开启,最后在 channel 注册下。

FederatedResourceAccessor
// pkg/controller/sync/accessor.go

type FederatedResourceAccessor interface {
	Run(stopChan <-chan struct{})
	HasSynced() bool
	FederatedResource(qualifiedName util.QualifiedName) (federatedResource FederatedResource, possibleOrphan bool, err error)
	VisitFederatedResources(visitFunc func(obj interface{}))
}

func NewFederatedResourceAccessor(
	controllerConfig *util.ControllerConfig,
	typeConfig typeconfig.Interface,
	fedNamespaceAPIResource *metav1.APIResource,
	client genericclient.Client,
	enqueueObj func(pkgruntime.Object),
	eventRecorder record.EventRecorder) (FederatedResourceAccessor, error) {
	// 这个函数的基本流程会是
	// 获取 FederatedType 的 client 和 informer
	// 如果 TargetType 是个 Namespace 就再初始化下 对应 namespace 的 client 和 informer
	// 接着,如果 TargetType 是个 Namespaced Type 那么会再初始化一下 KubeFedNamespace 的 client 和 informer 在 FederatedNamespace 对象变动的时候需要整个 namespace 内的 FederatedResource 全部重新进队列处理
	// 再起一个 Version Manager
}

Version Manager 这边暂时不细展开代码,主要的工作是

  • 当创建 FederatedObject 的时候,会去创建一个 PropagatedVersion 其中记录了对应 resource(target resource) 和 template、override 的 version
  • 基于此 version 我们可以判定一个完整的 FederatedObject 是否需要更新

回到 Accessor,在初始化完后会将 versionManager 及相关的 informer 启动起来。以下是几个 Accessor 的方法

以下判断一个 FederatedType 是否已同步
func (a *resourceAccessor) HasSynced() bool {
	kind := a.typeConfig.GetFederatedType().Kind
	if !a.versionManager.HasSynced() {
		return false
	}
	if !a.federatedController.HasSynced() {
		return false
	}
	if a.namespaceController != nil && !a.namespaceController.HasSynced() {
		return false
	}
	if a.fedNamespaceController != nil && !a.fedNamespaceController.HasSynced() {
		return false
	}
	return true
}

// 从 eventSource 中获取对应的 FederatedSource 也就是 FederatedType 的实际对象
func (a *resourceAccessor) FederatedResource(eventSource util.QualifiedName) (FederatedResource, bool, error) {
	// 问题点:忽略了系统的 namespace,暂时没 get 到为啥
	if a.targetIsNamespace && a.isSystemNamespace(eventSource.Name) {
		klog.V(7).Infof("Ignoring system namespace %q", eventSource.Name)
		return nil, false, nil
	}
	// Most federated resources have the same name as their targets.
	// NamespaceForResource 当前实际是直接 return eventSource.Namespace 所以两个 name 是一样的
	targetName := util.QualifiedName{
		Namespace: eventSource.Namespace,
		Name:      eventSource.Name,
	}
	federatedName := util.QualifiedName{
		Namespace: util.NamespaceForResource(eventSource.Namespace, a.fedNamespace),
		Name:      eventSource.Name,
	}
	// 这边对 namespace 类型的 FederatedType 做了一个特殊处理。
	// A federated type for namespace "foo" is namespaced
	// (e.g. "foo/foo"). An event sourced from a namespace in the host
	// or member clusters will have the name "foo", and an event
	// sourced from a federated resource will have the name "foo/foo".
	// In order to ensure object retrieval from the informers, it is
	// necessary to derive the target name and federated name from the
	// event source.
	if a.targetIsNamespace {
		// eventSource 是否来自 core.Namespace(就是没有 federated 的那个 object),因为那么 namespace 是非 namespaced,所以这边通过判断 namespace 为空确认
		eventSourceIsTarget := eventSource.Namespace == ""
		if eventSourceIsTarget {
			// Ensure the federated name is namespace qualified.
			// 因为当前 eventSource 是 core.Namespace 所以其 federated 的对象所在 namespace 就是其下属
			federatedName.Namespace = federatedName.Name
		} else {
			// Ensure the target name is not namespace qualified.
			// 当前 eventSource 是非 core.Namespace 也就是 FederatedNamespace。他的 target 是 core.Namespace 非 namespaced 所以需要置空
			targetName.Namespace = ""
		}
		// 这边为啥不直接就覆盖掉的了?为了凑代码吗?
	}
	key := federatedName.String()

	resource, err := util.ObjFromCache(a.federatedStore, kind, key)
	if err != nil {
		return nil, false, err
	}
	// 如果从 federatedType Cache 里取不到并且不是 FederatedNamespace 那么则有可能是 orphan
	// 疑问:为啥是 FNS 就不会是 orphan???
	if resource == nil {

		sourceIsFederatedNamespace := a.targetIsNamespace && eventSource.Namespace != ""

		possibleOrphan := !sourceIsFederatedNamespace
		return nil, possibleOrphan, nil
	}

	var namespace *unstructured.Unstructured
	if a.targetIsNamespace {
		// 这边会确认下 host cluster 中对应需要 propagate 的 namespace 存在
	}

	var fedNamespace *unstructured.Unstructured
	if a.typeConfig.GetNamespaced() {
		// 对于 namespaced 的需要确认下 federated namespace 是否开启,不开也没法进行
		fedNamespaceName := util.QualifiedName{Namespace: federatedName.Namespace, Name: federatedName.Namespace}
		fedNamespace, err = util.ObjFromCache(a.fedNamespaceStore, a.fedNamespaceAPIResource.Kind, fedNamespaceName.String())
		if err != nil {
			return nil, false, err
		}
		// If fedNamespace is nil, the resources in member clusters
		// will be removed.
	}

	return &federatedResource{...}, false, nil
}
KubeFedSyncController.reconcile

终于回来了,继续理 sync controller 的 reconcile

func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
	if !s.isSynced() {
		return util.StatusNotSynced
	}

	kind := s.typeConfig.GetFederatedType().Kind

	fedResource, possibleOrphan, err := s.fedAccessor.FederatedResource(qualifiedName)
	if err != nil {
		runtime.HandleError(errors.Wrapf(err, "Error creating FederatedResource helper for %s %q", kind, qualifiedName))
		return util.StatusError
	}
	// 可能是 orphan ,在所有集群理把 kubefed.io/managed=true 的标签删除
	if possibleOrphan {
		apiResource := s.typeConfig.GetTargetType()
		gvk := apiResourceToGVK(&apiResource)
		klog.V(2).Infof("Ensuring the removal of the label %q from %s %q in member clusters.", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
		err = s.removeManagedLabel(gvk, qualifiedName)
		if err != nil {
			wrappedErr := errors.Wrapf(err, "failed to remove the label %q from %s %q in member clusters", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
			runtime.HandleError(wrappedErr)
			return util.StatusError
		}

		return util.StatusAllOK
	}
	if fedResource == nil {
		return util.StatusAllOK
	}

	key := fedResource.FederatedName().String()

	klog.V(4).Infof("Starting to reconcile %s %q", kind, key)
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished reconciling %s %q (duration: %v)", kind, key, time.Since(startTime))
		metrics.ReconcileFederatedResourcesDurationFromStart(startTime)
	}()

	if fedResource.Object().GetDeletionTimestamp() != nil {
		return s.ensureDeletion(fedResource)
	}
	// 确保下对应的 FederatedObject 有 finalizer
	err = s.ensureFinalizer(fedResource)
	if err != nil {
		fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer"))
		return util.StatusError
	}
	// 推到各个集群
	return s.syncToClusters(fedResource)
}

删除部分需要特别说明下,因为这边涉及到了 kubefed.io/orphan 标签

func (s *KubeFedSyncController) ensureDeletion(fedResource FederatedResource) util.ReconciliationStatus {
	fedResource.DeleteVersions()

	key := fedResource.FederatedName().String()
	kind := fedResource.FederatedKind()

	klog.V(2).Infof("Ensuring deletion of %s %q", kind, key)

	obj := fedResource.Object()

	// 没有 finalizer 就不管了,丢那了
	finalizers := sets.NewString(obj.GetFinalizers()...)
	if !finalizers.Has(FinalizerSyncController) {
		klog.V(2).Infof("%s %q does not have the %q finalizer. Nothing to do.", kind, key, FinalizerSyncController)
		return util.StatusAllOK
	}

	// 如果设定了 kubefed.io/orphan=true 的 annotation 就不会做实际的删除,但是会去掉 finalizer 和 managed 的标签
	if util.IsOrphaningEnabled(obj) {
		klog.V(2).Infof("Found %q annotation on %s %q. Removing the finalizer.",
			util.OrphanManagedResourcesAnnotation, kind, key)
		err := s.removeFinalizer(fedResource)
		if err != nil {
			wrappedErr := errors.Wrapf(err, "failed to remove finalizer %q from %s %q", FinalizerSyncController, kind, key)
			runtime.HandleError(wrappedErr)
			return util.StatusError
		}
		klog.V(2).Infof("Initiating the removal of the label %q from resources previously managed by %s %q.", util.ManagedByKubeFedLabelKey, kind, key)
		err = s.removeManagedLabel(fedResource.TargetGVK(), fedResource.TargetName())
		if err != nil {
			wrappedErr := errors.Wrapf(err, "failed to remove the label %q from all resources previously managed by %s %q", util.ManagedByKubeFedLabelKey, kind, key)
			runtime.HandleError(wrappedErr)
			return util.StatusError
		}
		return util.StatusAllOK
	}
	// 真的需要删除的。
	// 这边之所以会有 recheck 是因为有些资源如果已经在 deletion 了(DeletionTimestamp != nil) 那就不会再删除。
	// deleteFromClusters 里面还有个特殊处理是,如果是删 namespaced 不会需要 recheck,也就是只有删 federatedNamespace 需要
	klog.V(2).Infof("Deleting resources managed by %s %q from member clusters.", kind, key)
	recheckRequired, err := s.deleteFromClusters(fedResource)
	if err != nil {
		wrappedErr := errors.Wrapf(err, "failed to delete %s %q", kind, key)
		runtime.HandleError(wrappedErr)
		return util.StatusError
	}
	if recheckRequired {
		return util.StatusNeedsRecheck
	}
	return util.StatusAllOK
}

总算要推到各个集群了,这部分我觉得是写的最干脆的。

// syncToClusters ensures that the state of the given object is
// synchronized to member clusters.
func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) util.ReconciliationStatus {
	clusters, err := s.informer.GetClusters()
	if err != nil {
		fedResource.RecordError(string(status.ClusterRetrievalFailed), errors.Wrap(err, "Failed to retrieve list of clusters"))
		return s.setFederatedStatus(fedResource, status.ClusterRetrievalFailed, nil)
	}

	selectedClusterNames, err := fedResource.ComputePlacement(clusters)
	if err != nil {
		fedResource.RecordError(string(status.ComputePlacementFailed), errors.Wrap(err, "Failed to compute placement"))
		return s.setFederatedStatus(fedResource, status.ComputePlacementFailed, nil)
	}
	// 上述两个步骤取了所有集群和相关 FederatedObject 中 placement 涉及的集群

	kind := fedResource.TargetKind()
	key := fedResource.TargetName().String()
	klog.V(4).Infof("Ensuring %s %q in clusters: %s", kind, key, strings.Join(selectedClusterNames.List(), ","))

	dispatcher := dispatch.NewManagedDispatcher(s.informer.GetClientForCluster, fedResource, s.skipAdoptingResources)

	// Range 所有集群,因为我们需要做回收,所以不能单单 range selectedCluster
	for _, cluster := range clusters {
		clusterName := cluster.Name
		selectedCluster := selectedClusterNames.Has(clusterName)

		// 糟糕,选到的集群 NotReady 了,记个错误
		if !util.IsClusterReady(&cluster.Status) {
			if selectedCluster {
				// Cluster state only needs to be reported in resource
				// status for clusters selected for placement.
				err := errors.New("Cluster not ready")
				dispatcher.RecordClusterError(status.ClusterNotReady, clusterName, err)
			}
			continue
		}

		rawClusterObj, _, err := s.informer.GetTargetStore().GetByKey(clusterName, key)
		if err != nil {
			wrappedErr := errors.Wrap(err, "Failed to retrieve cached cluster object")
			dispatcher.RecordClusterError(status.CachedRetrievalFailed, clusterName, wrappedErr)
			continue
		}

		var clusterObj *unstructured.Unstructured
		if rawClusterObj != nil {
			clusterObj = rawClusterObj.(*unstructured.Unstructured)
		}

		// 如果没选上的集群里有,Target Object 那就干掉
		// 潜在风险:多集群加入 kubefed 的时候如果正好命中,那就 GG 了
		// Resource should not exist in the named cluster
		if !selectedCluster {
			if clusterObj == nil {
				// Resource does not exist in the cluster
				continue
			}
			if clusterObj.GetDeletionTimestamp() != nil {
				// Resource is marked for deletion
				dispatcher.RecordStatus(clusterName, status.WaitingForRemoval)
				continue
			}
			if fedResource.IsNamespaceInHostCluster(clusterObj) {
				// Host cluster namespace needs to have the managed
				// label removed so it won't be cached anymore.
				dispatcher.RemoveManagedLabel(clusterName, clusterObj)
			} else {
				dispatcher.Delete(clusterName)
			}
			continue
		}

		// Resource should appear in the named cluster
		// 创建或者更新对象
		// TODO(marun) Consider waiting until the result of resource
		// creation has reached the target store before attempting
		// subsequent operations.  Otherwise the object won't be found
		// but an add operation will fail with AlreadyExists.
		if clusterObj == nil {
			dispatcher.Create(clusterName)
		} else {
			dispatcher.Update(clusterName, clusterObj)
		}
	}
	// 等 dispatcher 刚刚的工作都完成了,然后更新下 version map
	_, timeoutErr := dispatcher.Wait()
	if timeoutErr != nil {
		fedResource.RecordError("OperationTimeoutError", timeoutErr)
	}

	// Write updated versions to the API.
	updatedVersionMap := dispatcher.VersionMap()
	err = fedResource.UpdateVersions(selectedClusterNames.List(), updatedVersionMap)
	if err != nil {
		// Versioning of federated resources is an optimization to
		// avoid unnecessary updates, and failure to record version
		// information does not indicate a failure of propagation.
		runtime.HandleError(err)
	}

	// 更新下传播状态 -> PropagationStatus
	collectedStatus := dispatcher.CollectedStatus()
	return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus)
}

从上述这段代码来说,这边没有处理集群挂掉时候的 failover(比如对于 Deployment、ReplicaSet 这类 ReplicaedObject),这部分会留到 SchedulingPreference 那边处理。

Dispatcher

// ManagedDispatcher dispatches operations to member clusters for resources
// managed by a federated resource.
type ManagedDispatcher interface {
	UnmanagedDispatcher

	Create(clusterName string)
	Update(clusterName string, clusterObj *unstructured.Unstructured)
	VersionMap() map[string]string
	CollectedStatus() status.CollectedPropagationStatus

	RecordClusterError(propStatus status.PropagationStatus, clusterName string, err error)
	RecordStatus(clusterName string, propStatus status.PropagationStatus)
}

// UnmanagedDispatcher dispatches operations to member clusters for
// resources that are no longer managed by a federated resource.
type UnmanagedDispatcher interface {
	OperationDispatcher

	Delete(clusterName string)
	RemoveManagedLabel(clusterName string, clusterObj *unstructured.Unstructured)
}

// OperationDispatcher provides an interface to wait for operations
// dispatched to member clusters.
type OperationDispatcher interface {
	// Wait returns true for ok if all operations completed
	// successfully and false if only some operations completed
	// successfully.  An error is returned on timeout.
	Wait() (ok bool, timeoutErr error)
}

来看看 Dispatcher 都定义了什么 func

  • 增、删、改
  • 采集状态、版本
  • 等待 dispatch 完成
  • 移除 ManagedLabel

咱们主要关注点在增删和 wait ,其他就不看了

Dispatcher.Create

func (d *managedDispatcherImpl) Create(clusterName string) {
	// Default the status to an operation-specific timeout.  Otherwise
	// when a timeout occurs it won't be possible to determine which
	// operation timed out.  The timeout status will be cleared by
	// Wait() if a timeout does not occur.
	// 开篇先来个骚动作。
	// 以后咱们对于超时的这种处理状态反馈,可以先默认超时,后面成了再改回去。万一漏改了没事,下次再改
	d.RecordStatus(clusterName, status.CreationTimedOut)
	start := time.Now()
	d.dispatcher.incrementOperationsInitiated()
	const op = "create"
	go d.dispatcher.clusterOperation(clusterName, op, func(client generic.Client) util.ReconciliationStatus {
		d.recordEvent(clusterName, op, "Creating")

		obj, err := d.fedResource.ObjectForCluster(clusterName)
		if err != nil {
			return d.recordOperationError(status.ComputeResourceFailed, clusterName, op, err)
		}

		err = d.fedResource.ApplyOverrides(obj, clusterName)
		if err != nil {
			return d.recordOperationError(status.ApplyOverridesFailed, clusterName, op, err)
		}

		// 以上两个步骤:根据 FederatedObject 获取 Spec 然后 apply 下对应集群的 overrides。然后创建一波我们要的 Object

		err = client.Create(context.Background(), obj)
		if err == nil {
			version := util.ObjectVersion(obj)
			d.recordVersion(clusterName, version)
			metrics.DispatchOperationDurationFromStart("create", start)
			return util.StatusAllOK
		}

		// TODO(marun) Figure out why attempting to create a namespace that
		// already exists indicates ServerTimeout instead of AlreadyExists.
		// 因为 controller-runtime 的 client 实现关系,所以这边的 namespace 重复创建会反馈为 timeout
		alreadyExists := apierrors.IsAlreadyExists(err) || d.fedResource.TargetKind() == util.NamespaceKind && apierrors.IsServerTimeout(err)
		if !alreadyExists {
			return d.recordOperationError(status.CreationFailed, clusterName, op, err)
		}

		// Attempt to update the existing resource to ensure that it
		// is labeled as a managed resource.
		err = client.Get(context.Background(), obj, obj.GetNamespace(), obj.GetName())
		if err != nil {
			wrappedErr := errors.Wrapf(err, "failed to retrieve object potentially requiring adoption")
			return d.recordOperationError(status.RetrievalFailed, clusterName, op, wrappedErr)
		}
		// 完全不知道 adoptingResource 啥意思啊。。。盲猜:已存在的资源。
		// 跟其他同学聊了下,这货比较准确应该翻译为:从别处接管来的资源。
		if d.skipAdoptingResources && !d.fedResource.IsNamespaceInHostCluster(obj) {
			_ = d.recordOperationError(status.AlreadyExists, clusterName, op, errors.Errorf("Resource pre-exist in cluster"))
			return util.StatusAllOK
		}

		d.recordError(clusterName, op, errors.Errorf("An update will be attempted instead of a creation due to an existing resource"))
		d.Update(clusterName, obj)
		metrics.DispatchOperationDurationFromStart("update", start)
		return util.StatusAllOK
	})
}
Dispatcher.Update

func (d *managedDispatcherImpl) Update(clusterName string, clusterObj *unstructured.Unstructured) {
	d.RecordStatus(clusterName, status.UpdateTimedOut)

	d.dispatcher.incrementOperationsInitiated()
	const op = "update"
	go d.dispatcher.clusterOperation(clusterName, op, func(client generic.Client) util.ReconciliationStatus {
		if util.IsExplicitlyUnmanaged(clusterObj) {
			err := errors.Errorf("Unable to manage the object which has label %s: %s", util.ManagedByKubeFedLabelKey, util.UnmanagedByKubeFedLabelValue)
			return d.recordOperationError(status.ManagedLabelFalse, clusterName, op, err)
		}

		obj, err := d.fedResource.ObjectForCluster(clusterName)
		if err != nil {
			return d.recordOperationError(status.ComputeResourceFailed, clusterName, op, err)
		}

		// 保留了原有的 Version、finalizer、annotation 和实例数,以及部分保留 service 和 service account
		err = RetainClusterFields(d.fedResource.TargetKind(), obj, clusterObj, d.fedResource.Object())
		if err != nil {
			wrappedErr := errors.Wrapf(err, "failed to retain fields")
			return d.recordOperationError(status.FieldRetentionFailed, clusterName, op, wrappedErr)
		}

		err = d.fedResource.ApplyOverrides(obj, clusterName)
		if err != nil {
			return d.recordOperationError(status.ApplyOverridesFailed, clusterName, op, err)
		}
		// 比对版本确认是否需要更新,避免盲目更新
		version, err := d.fedResource.VersionForCluster(clusterName)
		if err != nil {
			return d.recordOperationError(status.VersionRetrievalFailed, clusterName, op, err)
		}
		if !util.ObjectNeedsUpdate(obj, clusterObj, version) {
			// Resource is current
			return util.StatusAllOK
		}

		// Only record an event if the resource is not current
		d.recordEvent(clusterName, op, "Updating")

		err = client.Update(context.Background(), obj)
		if err != nil {
			return d.recordOperationError(status.UpdateFailed, clusterName, op, err)
		}
		d.setResourcesUpdated()
		version = util.ObjectVersion(obj)
		d.recordVersion(clusterName, version)
		return util.StatusAllOK
	})
}

Dispatcher.Wait
func (d *managedDispatcherImpl) Wait() (bool, error) {
	...
	Wait 主要是干两件事情,等待 update、delete 等等操作结束,然后更新下 status
}

这部分没啥坑,不赘述

小结

以上基本是可以 cover 整个 kubefed 的在多集群传播工作(propagation)原理了,然后还剩下调度部分、潜在坑、场景和一些感受留到下篇写。

整体来说除了一些蜜汁代码倒还好,有些 namespace、cluster level 的下层 object reconcile 比较蛋疼不过也确实没有特别好的办法