Kubernetes 多集群搞了那么多年了,貌似也没倒腾出比较 stable 的项目,看看现在 kubefed 怎么样。本篇是 kubefed 的开篇。

FBI WARNING: 不喜欢看代码分析的就直接去最后一篇看结论就好


当需要考虑到 Kubernetes 多集群的时候,我们往往是什么样的场景?

k8s 在这个 SIG 的路确实走的有点久,从最开始的 Kubernetes Federation 再到推倒重写 kubefed,虽然浩浩荡荡但是还没有 GA。有些厂提供了 kubefed 的能力个用户,也有些许大厂自己做多集群,比如 Anthos、Rancher 等等。所以究竟 kubefed 是怎么样的一个东西,值得去用吗?不知道,但里面一定有值得学的。

本文基于 kubefed v0.5.1 8a2f4a47 进行分析,因为有代码解读所以会又臭又长,如果不想看代码就直接拉到最后





其实这些概念 repo README 也有拉,不过为了阅读方便就搬过来翻译下

Type configuration:对应 FederatedTypeConfig CRD,主要是用来描述什么 API Type 需要被 federated(然后这个 CRD 里主要有一个字段 Propagation 用来标明是否开启传播)Cluster Configuration:不严格的对应 KubefedCluster CRD,主要是一个集群的集合,但实际上概念中这个还包括了操作对应集群的 KubeConfig 等信息

Federated Type:这货是当我们对某个对象开启 Federate 的时候由 kubefed 生成的一个 CRD,比如给 Deployment 开 Federated,那就会生成一个 FederatedDeployment。在一众的 FederatedType 中不变的就是三件套

Scheduling:通过 SchedulingPreference 实现控制各个集群的调度情况,比如当前提供了 Deployment、ReplicaSet 的实例数调度支持,以控制不同集群调度的配比。

当前多集群的结构是:一个 Host Cluster 负责主持大局,然后有无数的 Member Cluster 注册到 Host Cluster 被管理,当然 Host Cluster 可以将自己注册为 Member Cluster。概括起来就是:一主(Cluster)多从,一挂全挂,横屏 kubefed?


  1. 实现 FederatedCRD(FCRD) 里面包括了我们期望创建的 API Type
  2. 当我们创建这个 FCRD 时帮我们调度下然后发布到对应的集群上
  3. 采集点 status




├── cmd  一切 binary 的入口
│   ├── controller-manager
│   │   └── app
│   │       ├── leaderelection
│   │       └── options
│   ├── hyperfed
│   ├── kubefedctl
│   └── webhook 启动 webhook
│       └── app
├── pkg
│   ├── apis  定义 CRD 的地方
│   │   ├── core
│   │   │   ├── common
│   │   │   ├── typeconfig
│   │   │   ├── v1alpha1
│   │   │   └── v1beta1
│   │   │       ├── defaults
│   │   │       └── validation
│   │   ├── multiclusterdns
│   │   │   └── v1alpha1
│   │   └── scheduling
│   │       └── v1alpha1
│   ├── client  实现的一个基于 runtime.Object receiver 的 Kubernetes client
│   │   └── generic
│   │       └── scheme
│   ├── controller  ControllerManager 的主要组成
│   │   ├── dnsendpoint
│   │   ├── federatedtypeconfig
│   │   ├── ingressdns
│   │   ├── kubefedcluster
│   │   ├── schedulingmanager
│   │   ├── schedulingpreference
│   │   ├── servicedns
│   │   ├── status
│   │   ├── sync 核心同步逻辑
│   │   │   ├── dispatch
│   │   │   ├── status
│   │   │   └── version
│   │   ├── testdata
│   │   │   └── fixtures
│   │   ├── util
│   │   │   ├── finalizers
│   │   │   ├── planner
│   │   │   └── podanalyzer
│   │   └── webhook
│   │       ├── federatedtypeconfig
│   │       ├── kubefedcluster
│   │       └── kubefedconfig
│   ├── features  存放 Feature 的基本信息和开关
│   ├── kubefedctl  kubefedctl 的实际实现包
│   │   ├── enable
│   │   ├── federate
│   │   ├── options
│   │   ├── orphaning
│   │   └── util
│   ├── metrics  各项暴露的监测指标
│   └── schedulingtypes  给 SchedulingManager 提供各种 API Type 做 SchedulingPreference 计算的小弟


咱们就不扯 kuebfedctlwebhook 了,直接上 controller 部分,也就是所谓的 kubefed control-panel。不过在这之前,先来看下 kubefed 的数据结构和一些名词定义(比如缩写省得写一大串)



这货是定义一个 Type 的基本元信息(就不翻译了)

// APIResource specifies the name of a resource and whether it is namespaced.
type APIResource struct {
    // name is the plural name of the resource.
    Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
    // singularName is the singular name of the resource.  This allows clients to handle plural and singular opaquely.
    // The singularName is more correct for reporting status on a single item and both singular and plural are allowed
    // from the kubectl CLI interface.
    SingularName string `json:"singularName" protobuf:"bytes,6,opt,name=singularName"`
    // namespaced indicates if a resource is namespaced or not.
    Namespaced bool `json:"namespaced" protobuf:"varint,2,opt,name=namespaced"`
    // group is the preferred group of the resource.  Empty implies the group of the containing resource list.
    // For subresources, this may have a different value, for example: Scale".
    Group string `json:"group,omitempty" protobuf:"bytes,8,opt,name=group"`
    // version is the preferred version of the resource.  Empty implies the version of the containing resource list
    // For subresources, this may have a different value, for example: v1 (while inside a v1beta1 version of the core resource's group)".
    Version string `json:"version,omitempty" protobuf:"bytes,9,opt,name=version"`
    // kind is the kind for the resource (e.g. 'Foo' is the kind for a resource 'foo')
    Kind string `json:"kind" protobuf:"bytes,3,opt,name=kind"`
    // verbs is a list of supported kube verbs (this includes get, list, watch, create,
    // update, patch, delete, deletecollection, and proxy)
    Verbs Verbs `json:"verbs" protobuf:"bytes,4,opt,name=verbs"`
    // shortNames is a list of suggested short names of the resource.
    ShortNames []string `json:"shortNames,omitempty" protobuf:"bytes,5,rep,name=shortNames"`
    // categories is a list of the grouped resources this resource belongs to (e.g. 'all')
    Categories []string `json:"categories,omitempty" protobuf:"bytes,7,rep,name=categories"`
    // The hash value of the storage version, the version this resource is
    // converted to when written to the data store. Value must be treated
    // as opaque by clients. Only equality comparison on the value is valid.
    // This is an alpha feature and may change or be removed in the future.
    // The field is populated by the apiserver only if the
    // StorageVersionHash feature gate is enabled.
    // This field will remain optional even if it graduates.
    // +optional
    StorageVersionHash string `json:"storageVersionHash,omitempty" protobuf:"bytes,10,opt,name=storageVersionHash"`

然后 kubefed 自己撸了一个 APIResource 两者在关键信息上差别不大

// APIResource defines how to configure the dynamic client for an API resource.
type APIResource struct {
    // metav1.GroupVersion is not used since the json annotation of
    // the fields enforces them as mandatory.

    // Group of the resource.
    // +optional
    Group string `json:"group,omitempty"`
    // Version of the resource.
    Version string `json:"version"`
    // Camel-cased singular name of the resource (e.g. ConfigMap)
    Kind string `json:"kind"`
    // Lower-cased plural name of the resource (e.g. configmaps).  If
    // not provided, it will be computed by lower-casing the kind and
    // suffixing an 's'.
    PluralName string `json:"pluralName"`
    // Scope of the resource.
    Scope apiextv1b1.ResourceScope `json:"scope"`


缩写 FTC,用于生成 FederatedType

type FederatedTypeConfig struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec FederatedTypeConfigSpec `json:"spec"`
    // +optional
    Status FederatedTypeConfigStatus `json:"status,omitempty"`

// 主要的定义都在这
// FederatedTypeConfigSpec defines the desired state of FederatedTypeConfig.
type FederatedTypeConfigSpec struct {
    // The configuration of the target type. If not set, the pluralName and
    // groupName fields will be set from the metadata.name of this resource. The
    // kind field must be set.
  // 指定我们期望 Federated 的目标类型
    TargetType APIResource `json:"targetType"`
    // Whether or not propagation to member clusters should be enabled.
  // 表明是否需要 sync,当前只有 Enable 和 Disable
    Propagation PropagationMode `json:"propagation"`
    // Configuration for the federated type that defines (via
    // template, placement and overrides fields) how the target type
    // should appear in multiple cluster.
  // 指定我们生成的 APIResource 是什么样的,说白就是生成的 FederatedCRD 中名字、group、version 等信息
    FederatedType APIResource `json:"federatedType"`
    // Configuration for the status type that holds information about which type
    // holds the status of the federated resource. If not provided, the group
    // and version will default to those provided for the federated type api
    // resource.
    // +optional
  // target type 的 Status 类型
    StatusType *APIResource `json:"statusType,omitempty"`
    // Whether or not Status object should be populated.
    // +optional
  // 是否进行 Target Type Status 做采集
    StatusCollection *StatusCollectionMode `json:"statusCollection,omitempty"`


这个结构体是 member cluster 注册到 Host cluster 时的产物。在 Spec 中主要存储对应集群的 APIServer 和访问凭证,而在 Status 中主要存储的是集群的健康状态以及如包含的 Region、Zone 等。

type KubeFedCluster struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec KubeFedClusterSpec `json:"spec"`
    // +optional
    Status KubeFedClusterStatus `json:"status,omitempty"`

// KubeFedClusterSpec defines the desired state of KubeFedCluster
type KubeFedClusterSpec struct {
    // The API endpoint of the member cluster. This can be a hostname,
    // hostname:port, IP or IP:port.
    APIEndpoint string `json:"apiEndpoint"`

    // CABundle contains the certificate authority information.
    // +optional
    CABundle []byte `json:"caBundle,omitempty"`

    // Name of the secret containing the token required to access the
    // member cluster. The secret needs to exist in the same namespace
    // as the control plane and should have a "token" key.
    SecretRef LocalSecretReference `json:"secretRef"`

    // DisabledTLSValidations defines a list of checks to ignore when validating
    // the TLS connection to the member cluster.  This can be any of *, SubjectName, or ValidityPeriod.
    // If * is specified, it is expected to be the only option in list.
    // +optional
    DisabledTLSValidations []TLSValidation `json:"disabledTLSValidations,omitempty"`

// KubeFedClusterStatus contains information about the current status of a
// cluster updated periodically by cluster controller.
type KubeFedClusterStatus struct {
    // Conditions is an array of current cluster conditions.
    Conditions []ClusterCondition `json:"conditions"`
    // Zones are the names of availability zones in which the nodes of the cluster exist, e.g. 'us-east1-a'.
    // +optional
    Zones []string `json:"zones,omitempty"`
    // Region is the name of the region in which all of the nodes in the cluster exist.  e.g. 'us-east1'.
    // +optional
    Region *string `json:"region,omitempty"`


这个是 kubefed 的配置,不太像深入,这个大家各自看代码就好,本身涉及不多

FederatedType 是怎么创建的

最开始,我们会手动或者通过 kubefed client 或者什么其他姿势,反正就是创建了一个 FederatedTypeConfig 就像下面

apiVersion: core.kubefed.io/v1beta1
kind: FederatedTypeConfig
    meta.helm.sh/release-name: kubefed
    meta.helm.sh/release-namespace: kube-federation-system
  - core.kubefed.io/federated-type-config
  generation: 1
    app.kubernetes.io/managed-by: Helm
  name: deployments.apps
  namespace: kube-federation-system
    group: types.kubefed.io
    kind: FederatedDeployment
    pluralName: federateddeployments
    scope: Namespaced
    version: v1beta1
  propagation: Enabled
    group: apps
    kind: Deployment
    pluralName: deployments
    scope: Namespaced
    version: v1

apps/v1.Deployment federated 了,federatedType 表明最后生成的 FCRD 的基本信息,targetType 就是我们想要 federated 的 Type 信息,propagation 这边设为开启。当这样一个 Object 被创建或者更新时,FederatedTypeConfigController 也就对应 repo 中的 pkg/controller/federatedtypeconfig 就会开始工作(如果启动了的话,他由 cmd 里的 controller-manager 拉起,不赘述)。

FederatedTypeConfig 的创建

注意:FTC 的 name 是为 targetType 的 name.group 组成(core group 可省略 group,毕竟其可为「空串」)

启动时在 cmd 中 controller-manager 初始化 FederatedTypeConfigController

// newController returns a new controller to manage FederatedTypeConfig objects.
func newController(config *util.ControllerConfig) (*Controller, error) {
  ...创建 genericClient...

    c := &Controller{
        controllerConfig: config,
        client:           genericclient,
        stopChannels:     make(map[string]chan struct{}),
  // 创建了一个延迟队列包裹的 c.reconcile,后续我们通过往 worker 塞东西后入延迟队列再到 c.reconcile 处理
    c.worker = util.NewReconcileWorker(c.reconcile, util.WorkerTiming{})

  // 根据用户设定的 `KubeFedNamespace` 去起 FTC 的 informer,这边之所以是特定而非全部 namespace
  // 主要是因为 kubefed 可以是 namespace level 的 control,如果是直接监听全部的话当是 namespace scope 的时候会有权限问题
  // 当 informer 有新东西的时候就会通过 c.worker.EnqueueObject 入到 c.reconcile 去处理。  //
  // **其他几个 controller 也是类似的套路,如果没有特别的就不赘述了**
  // 这边的 store 是 `cache.Store` 也就是原生 cache 存储的地方
  // c.controller 实际为 InformerController 可以不用过多关注
    c.store, c.controller, err = util.NewGenericInformer(
    if err != nil {
        return nil, err

    return c, nil

初始化后通过 Controller.Run(stopCh) 启动这个 Controller。

当我们创建或者修改了一个 FederatedTypeConfig 的时候,informer 会发现并通过 EnqueueObject 塞到 FederatedTypeConfigController.concile 做处理,那就看看这货现在怎么处理 FTC 的吧。

func (c *Controller) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
    // 省略一坨从 cache 拿 FTC 的过程,没啥特别的

    syncEnabled := typeConfig.GetPropagationEnabled()
    statusEnabled := typeConfig.GetStatusEnabled()

    limitedScope := c.controllerConfig.TargetNamespace != metav1.NamespaceAll
  // 对于开了 sync 并且当前 kubefed 限制在特定 namespace 的,然后 target type 又是非 namesapced。这种情况就是非预期,需要关闭他的 sync。    if limitedScope && syncEnabled && !typeConfig.GetNamespaced() {
    // 如果没取到 stop ch 则构造一个
    // 问题点:没有 get 到为什么需要造一个,因为从总体语义来说,如果能 get 到说明对应 sync Contoller是 running 的
        _, ok := c.getStopChannel(typeConfig.Name)
        if !ok {
      // 塞个新的到 StopChannel

    // 更新 FTC 的 status 并 return 终止流程

    statusKey := typeConfig.Name + "/status"
    syncStopChan, syncRunning := c.getStopChannel(typeConfig.Name)
    statusStopChan, statusRunning := c.getStopChannel(statusKey)

    deleted := typeConfig.DeletionTimestamp != nil
    if deleted {
    // 如果 sync controller 和 status controller 开着,就关掉它们

    // 如果 federatedNamespace 要被删除的话,按总体的设计来说,namespace 内的 Namespaced FederatedTypes 需要全部重新回炉下
        if typeConfig.IsNamespace() {
      // 这个函数会将 非 Namespace 对象但是 namespaced 的 FederatedTypes 全部重新入 reconcile 处理下

    // 移除 finalizer
        err := c.removeFinalizer(typeConfig)
        return util.StatusAllOK
  // 确保 finalizer 都设置到位了
    updated, err := c.ensureFinalizer(typeConfig)
    if err != nil {
        return util.StatusError
    } else if updated && typeConfig.IsNamespace() {
        // 这边是新建的 Namespace FTC 的时候会进入的流程,这时候会把之前 Namespaced 的 FTC 都过一把,    // 因为早前因为没有 Namespace 的存在,所以他们都不会创建 sync controller,这时候需要全部拉起来跑。    // 问题点:但其实这货根据这个作为条件也很奇怪,因为我如果手动去掉 FTC 的 finalizer 也会触发 updated。。。        c.reconcileOnNamespaceFTCUpdate()

    startNewSyncController := !syncRunning && syncEnabled
  // 如果 sync controller 在运行,但是不需要再开启了或者当前是个 namespaced 的 type 而且对应的 namespace FTC 不存在,也需要把 sync controller 停掉
    stopSyncController := syncRunning && (!syncEnabled || (typeConfig.GetNamespaced() && !c.namespaceFTCExists()))
    if startNewSyncController {
    // 通过 c.startSyncController(typeConfig) 启动 sync controller
    } else if stopSyncController {
        // 通过 c.stopController(typeConfig.Name, syncStopChan) 关停 sync controller

    startNewStatusController := !statusRunning && statusEnabled
    stopStatusController := statusRunning && !statusEnabled
    if startNewStatusController {
        // c.startStatusController(statusKey, typeConfig) 启动 status controller
    } else if stopStatusController {
        c.stopController(statusKey, statusStopChan)
  // sync controller 已经开着了,但是 FTC status 的 ObservedGeneration 跟 FTC 的 Generation 不一致,  // 那么说明可能 out-of-sync,所以重启下 sync controller
    if !startNewSyncController && !stopSyncController &&
        typeConfig.Status.ObservedGeneration != typeConfig.Generation {
        if err := c.refreshSyncController(typeConfig); err != nil {
            return util.StatusError

    // 更新 SyncController 和 StatusController 到 FTC
    return util.StatusAllOK


在上文中,当有一个 FTC 存在且需要 sync 的时候,就会通过 startSyncController(tc *corev1b1.FederatedTypeConfig) 去启动一个 SyncController,接下来看看 SyncController 的启动和工作内容。

func (c *Controller) startSyncController(tc *corev1b1.FederatedTypeConfig) error {
    ftc := tc.DeepCopyObject().(*corev1b1.FederatedTypeConfig)
    kind := ftc.Spec.FederatedType.Kind

  // 如果是一个 namespaced target type,那么我们需要在启动前确保 FederatedNamespace 已经开启了,不然无法进行
    var fedNamespaceAPIResource *metav1.APIResource
    if ftc.GetNamespaced() {
        var err error
        fedNamespaceAPIResource, err = c.getFederatedNamespaceAPIResource()
        if err != nil {
            return errors.Wrapf(err, "Unable to start sync controller for %q due to missing FederatedTypeConfig for namespaces", kind)

    stopChan := make(chan struct{})
  // 实际启动一个 SyncController
    err := synccontroller.StartKubeFedSyncController(c.controllerConfig, stopChan, ftc, fedNamespaceAPIResource)
    if err != nil {
        return errors.Wrapf(err, "Error starting sync controller for %q", kind)
    klog.Infof("Started sync controller for %q", kind)
  // 塞到 stopChannels 标明这个 FTC 已经有了 running 的 SyncController
    defer c.lock.Unlock()
    c.stopChannels[ftc.Name] = stopChan
    return nil

// pkg/controller/sync/controller.go
func StartKubeFedSyncController(controllerConfig *util.ControllerConfig, stopChan <-chan struct{}, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) error {
    controller, err := newKubeFedSyncController(controllerConfig, typeConfig, fedNamespaceAPIResource)
    if err != nil {
        return err
    // 问题点:这是一个非常诡异的东西,从注释看仅仅为了测试。。。    if controllerConfig.MinimizeLatency {
    klog.Infof(fmt.Sprintf("Starting sync controller for %q", typeConfig.GetFederatedType().Kind))
    return nil

// newKubeFedSyncController returns a new sync controller for the configuration
func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) (*KubeFedSyncController, error) {
    // 初始化 generic client 和一个 event recorder

    s := &KubeFedSyncController{
        clusterAvailableDelay:   controllerConfig.ClusterAvailableDelay,
        clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
        smallDelay:              time.Second * 3,
        eventRecorder:           recorder,
        typeConfig:              typeConfig,
        hostClusterClient:       client,
        skipAdoptingResources:   controllerConfig.SkipAdoptingResources,
        limitedScope:            controllerConfig.LimitedScope(),

    s.worker = util.NewReconcileWorker(s.reconcile, util.WorkerTiming{
        ClusterSyncDelay: s.clusterAvailableDelay,

    // Build deliverer for triggering cluster reconciliations.
    s.clusterDeliverer = util.NewDelayingDeliverer()

    targetAPIResource := typeConfig.GetTargetType()

    // 起一个 informer watch 在 HostCluster 的 TargetType 上,适时的如 worker 队列。    // 重点:并处理集群的健康状态变更,当 member cluster 有状态变更的时候,直接重新过一遍全部的 Target Type Object
    var err error
    s.informer, err = util.NewFederatedInformer(
        func(obj pkgruntime.Object) {
            qualifiedName := util.NewQualifiedName(obj)
            ClusterAvailable: func(cluster *fedv1b1.KubeFedCluster) {
                // When new cluster becomes available process all the target resources again.
                s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
            // When a cluster becomes unavailable process all the target resources again.
            ClusterUnavailable: func(cluster *fedv1b1.KubeFedCluster, _ []interface{}) {
                s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
    if err != nil {
        return nil, err

    // 重点
    s.fedAccessor, err = NewFederatedResourceAccessor(
        controllerConfig, typeConfig, fedNamespaceAPIResource,
        client, s.worker.EnqueueObject, recorder)
    if err != nil {
        return nil, err

    return s, nil

在 FTCController 部分的 SyncController 启动逻辑不会太难理解,总体可以理解为:如果是 Namespaced Type 那么需要保证 FederatedNamespace 存在,不然就不进行创建。如果只是普通的 Type 则直接开启,最后在 channel 注册下。

// pkg/controller/sync/accessor.go

type FederatedResourceAccessor interface {
    Run(stopChan <-chan struct{})
    HasSynced() bool
    FederatedResource(qualifiedName util.QualifiedName) (federatedResource FederatedResource, possibleOrphan bool, err error)
    VisitFederatedResources(visitFunc func(obj interface{}))

func NewFederatedResourceAccessor(
    controllerConfig *util.ControllerConfig,
    typeConfig typeconfig.Interface,
    fedNamespaceAPIResource *metav1.APIResource,
    client genericclient.Client,
    enqueueObj func(pkgruntime.Object),
    eventRecorder record.EventRecorder) (FederatedResourceAccessor, error) {
    // 这个函数的基本流程会是
    // 获取 FederatedType 的 client 和 informer
    // 如果 TargetType 是个 Namespace 就再初始化下 对应 namespace 的 client 和 informer
    // 接着,如果 TargetType 是个 Namespaced Type 那么会再初始化一下 KubeFedNamespace 的 client 和 informer 在 FederatedNamespace 对象变动的时候需要整个 namespace 内的 FederatedResource 全部重新进队列处理
    // 再起一个 Version Manager

Version Manager 这边暂时不细展开代码,主要的工作是

回到 Accessor,在初始化完后会将 versionManager 及相关的 informer 启动起来。以下是几个 Accessor 的方法

以下判断一个 FederatedType 是否已同步
func (a *resourceAccessor) HasSynced() bool {
    kind := a.typeConfig.GetFederatedType().Kind
    if !a.versionManager.HasSynced() {
        return false
    if !a.federatedController.HasSynced() {
        return false
    if a.namespaceController != nil && !a.namespaceController.HasSynced() {
        return false
    if a.fedNamespaceController != nil && !a.fedNamespaceController.HasSynced() {
        return false
    return true

// 从 eventSource 中获取对应的 FederatedSource 也就是 FederatedType 的实际对象
func (a *resourceAccessor) FederatedResource(eventSource util.QualifiedName) (FederatedResource, bool, error) {
    // 问题点:忽略了系统的 namespace,暂时没 get 到为啥
    if a.targetIsNamespace && a.isSystemNamespace(eventSource.Name) {
        klog.V(7).Infof("Ignoring system namespace %q", eventSource.Name)
        return nil, false, nil
    // Most federated resources have the same name as their targets.
    // NamespaceForResource 当前实际是直接 return eventSource.Namespace 所以两个 name 是一样的
    targetName := util.QualifiedName{
        Namespace: eventSource.Namespace,
        Name:      eventSource.Name,
    federatedName := util.QualifiedName{
        Namespace: util.NamespaceForResource(eventSource.Namespace, a.fedNamespace),
        Name:      eventSource.Name,
    // 这边对 namespace 类型的 FederatedType 做了一个特殊处理。    // A federated type for namespace "foo" is namespaced
    // (e.g. "foo/foo"). An event sourced from a namespace in the host
    // or member clusters will have the name "foo", and an event
    // sourced from a federated resource will have the name "foo/foo".
    // In order to ensure object retrieval from the informers, it is
    // necessary to derive the target name and federated name from the
    // event source.
    if a.targetIsNamespace {
        // eventSource 是否来自 core.Namespace(就是没有 federated 的那个 object),因为那么 namespace 是非 namespaced,所以这边通过判断 namespace 为空确认
        eventSourceIsTarget := eventSource.Namespace == ""
        if eventSourceIsTarget {
            // Ensure the federated name is namespace qualified.
            // 因为当前 eventSource 是 core.Namespace 所以其 federated 的对象所在 namespace 就是其下属
            federatedName.Namespace = federatedName.Name
        } else {
            // Ensure the target name is not namespace qualified.
            // 当前 eventSource 是非 core.Namespace 也就是 FederatedNamespace。他的 target 是 core.Namespace 非 namespaced 所以需要置空
            targetName.Namespace = ""
        // 这边为啥不直接就覆盖掉的了?为了凑代码吗?    }
    key := federatedName.String()

    resource, err := util.ObjFromCache(a.federatedStore, kind, key)
    if err != nil {
        return nil, false, err
    // 如果从 federatedType Cache 里取不到并且不是 FederatedNamespace 那么则有可能是 orphan
    // 疑问:为啥是 FNS 就不会是 orphan???    if resource == nil {

        sourceIsFederatedNamespace := a.targetIsNamespace && eventSource.Namespace != ""

        possibleOrphan := !sourceIsFederatedNamespace
        return nil, possibleOrphan, nil

    var namespace *unstructured.Unstructured
    if a.targetIsNamespace {
        // 这边会确认下 host cluster 中对应需要 propagate 的 namespace 存在

    var fedNamespace *unstructured.Unstructured
    if a.typeConfig.GetNamespaced() {
        // 对于 namespaced 的需要确认下 federated namespace 是否开启,不开也没法进行
        fedNamespaceName := util.QualifiedName{Namespace: federatedName.Namespace, Name: federatedName.Namespace}
        fedNamespace, err = util.ObjFromCache(a.fedNamespaceStore, a.fedNamespaceAPIResource.Kind, fedNamespaceName.String())
        if err != nil {
            return nil, false, err
        // If fedNamespace is nil, the resources in member clusters
        // will be removed.

    return &federatedResource{...}, false, nil

终于回来了,继续理 sync controller 的 reconcile

func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
    if !s.isSynced() {
        return util.StatusNotSynced

    kind := s.typeConfig.GetFederatedType().Kind

    fedResource, possibleOrphan, err := s.fedAccessor.FederatedResource(qualifiedName)
    if err != nil {
        runtime.HandleError(errors.Wrapf(err, "Error creating FederatedResource helper for %s %q", kind, qualifiedName))
        return util.StatusError
    // 可能是 orphan ,在所有集群理把 kubefed.io/managed=true 的标签删除
    if possibleOrphan {
        apiResource := s.typeConfig.GetTargetType()
        gvk := apiResourceToGVK(&apiResource)
        klog.V(2).Infof("Ensuring the removal of the label %q from %s %q in member clusters.", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
        err = s.removeManagedLabel(gvk, qualifiedName)
        if err != nil {
            wrappedErr := errors.Wrapf(err, "failed to remove the label %q from %s %q in member clusters", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
            return util.StatusError

        return util.StatusAllOK
    if fedResource == nil {
        return util.StatusAllOK

    key := fedResource.FederatedName().String()

    klog.V(4).Infof("Starting to reconcile %s %q", kind, key)
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished reconciling %s %q (duration: %v)", kind, key, time.Since(startTime))

    if fedResource.Object().GetDeletionTimestamp() != nil {
        return s.ensureDeletion(fedResource)
    // 确保下对应的 FederatedObject 有 finalizer
    err = s.ensureFinalizer(fedResource)
    if err != nil {
        fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer"))
        return util.StatusError
    // 推到各个集群
    return s.syncToClusters(fedResource)

删除部分需要特别说明下,因为这边涉及到了 kubefed.io/orphan 标签

func (s *KubeFedSyncController) ensureDeletion(fedResource FederatedResource) util.ReconciliationStatus {

    key := fedResource.FederatedName().String()
    kind := fedResource.FederatedKind()

    klog.V(2).Infof("Ensuring deletion of %s %q", kind, key)

    obj := fedResource.Object()

    // 没有 finalizer 就不管了,丢那了
    finalizers := sets.NewString(obj.GetFinalizers()...)
    if !finalizers.Has(FinalizerSyncController) {
        klog.V(2).Infof("%s %q does not have the %q finalizer. Nothing to do.", kind, key, FinalizerSyncController)
        return util.StatusAllOK

    // 如果设定了 kubefed.io/orphan=true 的 annotation 就不会做实际的删除,但是会去掉 finalizer 和 managed 的标签
    if util.IsOrphaningEnabled(obj) {
        klog.V(2).Infof("Found %q annotation on %s %q. Removing the finalizer.",
            util.OrphanManagedResourcesAnnotation, kind, key)
        err := s.removeFinalizer(fedResource)
        if err != nil {
            wrappedErr := errors.Wrapf(err, "failed to remove finalizer %q from %s %q", FinalizerSyncController, kind, key)
            return util.StatusError
        klog.V(2).Infof("Initiating the removal of the label %q from resources previously managed by %s %q.", util.ManagedByKubeFedLabelKey, kind, key)
        err = s.removeManagedLabel(fedResource.TargetGVK(), fedResource.TargetName())
        if err != nil {
            wrappedErr := errors.Wrapf(err, "failed to remove the label %q from all resources previously managed by %s %q", util.ManagedByKubeFedLabelKey, kind, key)
            return util.StatusError
        return util.StatusAllOK
    // 真的需要删除的。    // 这边之所以会有 recheck 是因为有些资源如果已经在 deletion 了(DeletionTimestamp != nil) 那就不会再删除。    // deleteFromClusters 里面还有个特殊处理是,如果是删 namespaced 不会需要 recheck,也就是只有删 federatedNamespace 需要
    klog.V(2).Infof("Deleting resources managed by %s %q from member clusters.", kind, key)
    recheckRequired, err := s.deleteFromClusters(fedResource)
    if err != nil {
        wrappedErr := errors.Wrapf(err, "failed to delete %s %q", kind, key)
        return util.StatusError
    if recheckRequired {
        return util.StatusNeedsRecheck
    return util.StatusAllOK


// syncToClusters ensures that the state of the given object is
// synchronized to member clusters.
func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) util.ReconciliationStatus {
    clusters, err := s.informer.GetClusters()
    if err != nil {
        fedResource.RecordError(string(status.ClusterRetrievalFailed), errors.Wrap(err, "Failed to retrieve list of clusters"))
        return s.setFederatedStatus(fedResource, status.ClusterRetrievalFailed, nil)

    selectedClusterNames, err := fedResource.ComputePlacement(clusters)
    if err != nil {
        fedResource.RecordError(string(status.ComputePlacementFailed), errors.Wrap(err, "Failed to compute placement"))
        return s.setFederatedStatus(fedResource, status.ComputePlacementFailed, nil)
    // 上述两个步骤取了所有集群和相关 FederatedObject 中 placement 涉及的集群

    kind := fedResource.TargetKind()
    key := fedResource.TargetName().String()
    klog.V(4).Infof("Ensuring %s %q in clusters: %s", kind, key, strings.Join(selectedClusterNames.List(), ","))

    dispatcher := dispatch.NewManagedDispatcher(s.informer.GetClientForCluster, fedResource, s.skipAdoptingResources)

    // Range 所有集群,因为我们需要做回收,所以不能单单 range selectedCluster
    for _, cluster := range clusters {
        clusterName := cluster.Name
        selectedCluster := selectedClusterNames.Has(clusterName)

        // 糟糕,选到的集群 NotReady 了,记个错误
        if !util.IsClusterReady(&cluster.Status) {
            if selectedCluster {
                // Cluster state only needs to be reported in resource
                // status for clusters selected for placement.
                err := errors.New("Cluster not ready")
                dispatcher.RecordClusterError(status.ClusterNotReady, clusterName, err)

        rawClusterObj, _, err := s.informer.GetTargetStore().GetByKey(clusterName, key)
        if err != nil {
            wrappedErr := errors.Wrap(err, "Failed to retrieve cached cluster object")
            dispatcher.RecordClusterError(status.CachedRetrievalFailed, clusterName, wrappedErr)

        var clusterObj *unstructured.Unstructured
        if rawClusterObj != nil {
            clusterObj = rawClusterObj.(*unstructured.Unstructured)

        // 如果没选上的集群里有,Target Object 那就干掉
        // 潜在风险:多集群加入 kubefed 的时候如果正好命中,那就 GG 了
        // Resource should not exist in the named cluster
        if !selectedCluster {
            if clusterObj == nil {
                // Resource does not exist in the cluster
            if clusterObj.GetDeletionTimestamp() != nil {
                // Resource is marked for deletion
                dispatcher.RecordStatus(clusterName, status.WaitingForRemoval)
            if fedResource.IsNamespaceInHostCluster(clusterObj) {
                // Host cluster namespace needs to have the managed
                // label removed so it won't be cached anymore.
                dispatcher.RemoveManagedLabel(clusterName, clusterObj)
            } else {

        // Resource should appear in the named cluster
        // 创建或者更新对象
        // TODO(marun) Consider waiting until the result of resource
        // creation has reached the target store before attempting
        // subsequent operations.  Otherwise the object won't be found
        // but an add operation will fail with AlreadyExists.
        if clusterObj == nil {
        } else {
            dispatcher.Update(clusterName, clusterObj)
    // 等 dispatcher 刚刚的工作都完成了,然后更新下 version map
    _, timeoutErr := dispatcher.Wait()
    if timeoutErr != nil {
        fedResource.RecordError("OperationTimeoutError", timeoutErr)

    // Write updated versions to the API.
    updatedVersionMap := dispatcher.VersionMap()
    err = fedResource.UpdateVersions(selectedClusterNames.List(), updatedVersionMap)
    if err != nil {
        // Versioning of federated resources is an optimization to
        // avoid unnecessary updates, and failure to record version
        // information does not indicate a failure of propagation.

    // 更新下传播状态 -> PropagationStatus
    collectedStatus := dispatcher.CollectedStatus()
    return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus)

从上述这段代码来说,这边没有处理集群挂掉时候的 failover(比如对于 Deployment、ReplicaSet 这类 ReplicaedObject),这部分会留到 SchedulingPreference 那边处理。


// ManagedDispatcher dispatches operations to member clusters for resources
// managed by a federated resource.
type ManagedDispatcher interface {

    Create(clusterName string)
    Update(clusterName string, clusterObj *unstructured.Unstructured)
    VersionMap() map[string]string
    CollectedStatus() status.CollectedPropagationStatus

    RecordClusterError(propStatus status.PropagationStatus, clusterName string, err error)
    RecordStatus(clusterName string, propStatus status.PropagationStatus)

// UnmanagedDispatcher dispatches operations to member clusters for
// resources that are no longer managed by a federated resource.
type UnmanagedDispatcher interface {

    Delete(clusterName string)
    RemoveManagedLabel(clusterName string, clusterObj *unstructured.Unstructured)

// OperationDispatcher provides an interface to wait for operations
// dispatched to member clusters.
type OperationDispatcher interface {
    // Wait returns true for ok if all operations completed
    // successfully and false if only some operations completed
    // successfully.  An error is returned on timeout.
    Wait() (ok bool, timeoutErr error)

来看看 Dispatcher 都定义了什么 func

咱们主要关注点在增删和 wait ,其他就不看了


func (d *managedDispatcherImpl) Create(clusterName string) {
    // Default the status to an operation-specific timeout.  Otherwise
    // when a timeout occurs it won't be possible to determine which
    // operation timed out.  The timeout status will be cleared by
    // Wait() if a timeout does not occur.
    // 开篇先来个骚动作。    // 以后咱们对于超时的这种处理状态反馈,可以先默认超时,后面成了再改回去。万一漏改了没事,下次再改
    d.RecordStatus(clusterName, status.CreationTimedOut)
    start := time.Now()
    const op = "create"
    go d.dispatcher.clusterOperation(clusterName, op, func(client generic.Client) util.ReconciliationStatus {
        d.recordEvent(clusterName, op, "Creating")

        obj, err := d.fedResource.ObjectForCluster(clusterName)
        if err != nil {
            return d.recordOperationError(status.ComputeResourceFailed, clusterName, op, err)

        err = d.fedResource.ApplyOverrides(obj, clusterName)
        if err != nil {
            return d.recordOperationError(status.ApplyOverridesFailed, clusterName, op, err)

        // 以上两个步骤:根据 FederatedObject 获取 Spec 然后 apply 下对应集群的 overrides。然后创建一波我们要的 Object

        err = client.Create(context.Background(), obj)
        if err == nil {
            version := util.ObjectVersion(obj)
            d.recordVersion(clusterName, version)
            metrics.DispatchOperationDurationFromStart("create", start)
            return util.StatusAllOK

        // TODO(marun) Figure out why attempting to create a namespace that
        // already exists indicates ServerTimeout instead of AlreadyExists.
        // 因为 controller-runtime 的 client 实现关系,所以这边的 namespace 重复创建会反馈为 timeout
        alreadyExists := apierrors.IsAlreadyExists(err) || d.fedResource.TargetKind() == util.NamespaceKind && apierrors.IsServerTimeout(err)
        if !alreadyExists {
            return d.recordOperationError(status.CreationFailed, clusterName, op, err)

        // Attempt to update the existing resource to ensure that it
        // is labeled as a managed resource.
        err = client.Get(context.Background(), obj, obj.GetNamespace(), obj.GetName())
        if err != nil {
            wrappedErr := errors.Wrapf(err, "failed to retrieve object potentially requiring adoption")
            return d.recordOperationError(status.RetrievalFailed, clusterName, op, wrappedErr)
        // 完全不知道 adoptingResource 啥意思啊。。。盲猜:已存在的资源。        // 跟其他同学聊了下,这货比较准确应该翻译为:从别处接管来的资源。        if d.skipAdoptingResources && !d.fedResource.IsNamespaceInHostCluster(obj) {
            _ = d.recordOperationError(status.AlreadyExists, clusterName, op, errors.Errorf("Resource pre-exist in cluster"))
            return util.StatusAllOK

        d.recordError(clusterName, op, errors.Errorf("An update will be attempted instead of a creation due to an existing resource"))
        d.Update(clusterName, obj)
        metrics.DispatchOperationDurationFromStart("update", start)
        return util.StatusAllOK

func (d *managedDispatcherImpl) Update(clusterName string, clusterObj *unstructured.Unstructured) {
    d.RecordStatus(clusterName, status.UpdateTimedOut)

    const op = "update"
    go d.dispatcher.clusterOperation(clusterName, op, func(client generic.Client) util.ReconciliationStatus {
        if util.IsExplicitlyUnmanaged(clusterObj) {
            err := errors.Errorf("Unable to manage the object which has label %s: %s", util.ManagedByKubeFedLabelKey, util.UnmanagedByKubeFedLabelValue)
            return d.recordOperationError(status.ManagedLabelFalse, clusterName, op, err)

        obj, err := d.fedResource.ObjectForCluster(clusterName)
        if err != nil {
            return d.recordOperationError(status.ComputeResourceFailed, clusterName, op, err)

        // 保留了原有的 Version、finalizer、annotation 和实例数,以及部分保留 service 和 service account
        err = RetainClusterFields(d.fedResource.TargetKind(), obj, clusterObj, d.fedResource.Object())
        if err != nil {
            wrappedErr := errors.Wrapf(err, "failed to retain fields")
            return d.recordOperationError(status.FieldRetentionFailed, clusterName, op, wrappedErr)

        err = d.fedResource.ApplyOverrides(obj, clusterName)
        if err != nil {
            return d.recordOperationError(status.ApplyOverridesFailed, clusterName, op, err)
        // 比对版本确认是否需要更新,避免盲目更新
        version, err := d.fedResource.VersionForCluster(clusterName)
        if err != nil {
            return d.recordOperationError(status.VersionRetrievalFailed, clusterName, op, err)
        if !util.ObjectNeedsUpdate(obj, clusterObj, version) {
            // Resource is current
            return util.StatusAllOK

        // Only record an event if the resource is not current
        d.recordEvent(clusterName, op, "Updating")

        err = client.Update(context.Background(), obj)
        if err != nil {
            return d.recordOperationError(status.UpdateFailed, clusterName, op, err)
        version = util.ObjectVersion(obj)
        d.recordVersion(clusterName, version)
        return util.StatusAllOK

func (d *managedDispatcherImpl) Wait() (bool, error) {
    Wait 主要是干两件事情,等待 update、delete 等等操作结束,然后更新下 status



以上基本是可以 cover 整个 kubefed 的在多集群传播工作(propagation)原理了,然后还剩下调度部分、潜在坑、场景和一些感受留到下篇写。

整体来说除了一些蜜汁代码倒还好,有些 namespace、cluster level 的下层 object reconcile 比较蛋疼不过也确实没有特别好的办法