T O P

[资源分享]     k8s自定义controller设计与实现

  • By - 楼主

  • 2021-03-03 00:02:14
  • k8s自定义controller设计与实现

    创建CRD

    1. 登录可以执行kubectl命令的机器,创建student.yaml

      apiVersion: apiextensions.k8s.io/v1beta1
      kind: CustomResourceDefinition
      metadata:
        # metadata.name的内容是由"复数名.分组名"构成,如下,students是复数名,bolingcavalry.k8s.io是分组名
        name: students.bolingcavalry.k8s.io
      spec:
        # 分组名,在REST API中也会用到的,格式是: /apis/分组名/CRD版本
        group: bolingcavalry.k8s.io
        # list of versions supported by this CustomResourceDefinition
        versions:
          - name: v1
            # 是否有效的开关.
            served: true
            # 只有一个版本能被标注为storage
            storage: true
        # 范围是属于namespace的
        scope: Namespaced
        names:
          # 复数名
          plural: students
          # 单数名
          singular: student
          # 类型名
          kind: Student
          # 简称,就像service的简称是svc
          shortNames:
          - stu
      
    2. 在student.yaml所在目录执行命令kubectl apply -f student.yaml,即可在k8s环境创建Student的定义,今后如果发起对类型为Student的对象的处理,k8s的api server就能识别到该对象类型了

    创建Student对象

    前面的步骤使得k8s能识别Student类型了,接下来创建Students对象

    1. 创建object-student.yaml文件

      apiVersion: bolingcavalry.k8s.io/v1
      kind: Student
      metadata:
        name: object-student
      spec:
        name: "张三"
        school: "深圳中学"
      
    2. 在object-student.yaml文件所在目录执行命令kubectl apply -f object-student.yaml,会看到提示创建成功

    3. 执行命令kubectl get stu可见已创建成功的Student对象

    至此,自定义API对象(也就是CRD)就创建成功了,此刻我们只是让k8s能识别到Student这个对象的身份,但是当我们创建Student对象的时候,还没有触发任何业务(相对于创建Pod对象的时候,会触发kubelet在node节点创建docker容器)

    自动生成代码

    为什么要做controller

    如果仅仅是在etcd保存Student对象是没有什么意义的,试想通过deployment创建pod时,如果只在etcd创建pod对象,而不去node节点创建容器,那这个pod对象只是一条数据而已,没有什么实质性作用,其他对象如service、pv也是如此。

    controller的作用就是监听指定对象的新增、删除、修改等变化,针对这些变化做出相应的响应(例如新增pod的响应为创建docker容器)

    在这里插入图片描述

    如上图,API对象的变化会通过Informer存入队列(WorkQueue),在Controller中消费队列的数据做出响应,响应相关的具体代码就是我们要做的真正业务逻辑。

    自动生成代码是什么

    从上图可以发现整个逻辑还是比较复杂的,为了简化我们的自定义controller开发,k8s的大师们利用自动代码生成工具将controller之外的事情都做好了,我们只要专注于controller的开发就好。

    开始实战

    接下来要做的事情就是编写API对象Student相关的声明的定义代码,然后用代码生成工具结合这些代码,自动生成Client、Informet、WorkQueue相关的代码;

    1. 在$GOPATH/src目录下创建一个文件夹k8s_customize_controller

    2. 进入文件夹执行如下命令创建三层目录

      mkdir -p pkg/apis/bolingcavalry
      
    3. 在新建的bolingcavalry目录下创建文件register.go

      package bolingcavalry
      
      const(
        GroupName = "bolingcavalry.k8s.io"
        Version = "v1"
      )
      
    4. 在新建的bolingcavalry目录下创建名为v1的文件夹

    5. 在v1文件夹下创建文件doc.go

      package v1
      

      上述代码中的两行注释,都是代码生成工具会用到的,一个是声明为整个v1包下的类型定义生成DeepCopy方法,另一个声明了这个包对应的API的组名,和CRD中的组名一致;

    6. 在v1文件夹创建文件types.go,里面定义Student对象的具体内容

      package v1
      
      import (
      	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
      )
      
      // +genclient
      // +genclient:noStatus
      // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
      
      type Student struct {
      	metav1.TypeMeta   `json:",inline"`
      	metav1.ObjectMeta `json:"metadata,omitempty"`
      	Spec              StudentSpec `json:"spec"`
      }
      
      type StudentSpec struct {
      	name   string `json:"name"`
      	school string `json:"school"`
      }
      
      // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
      
      // StudentList is a list of Student resources
      type StudentList struct {
      	metav1.TypeMeta `json:",inline"`
      	metav1.ListMeta `json:"metadata"`
      
      	Items []Student `json:"items"`
      }
      

      从上述源码可见,Student对象的内容已经被设定好,主要有name和school这两个字段,表示学生的名字和所在学校,因此创建Student对象的时候内容就要和这里匹配了;

    7. 在v1目录下创建register.go文件,此文件的作用是通过addKnownTypes方法使得client可以知道Student类型的API对象

      package v1
      
      import (
      	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
      	"k8s.io/apimachinery/pkg/runtime"
      	"k8s.io/apimachinery/pkg/runtime/schema"
      
      	"k8s_customize_controller/pkg/apis/bolingcavalry"
      )
      
      var SchemeGroupVersion = schema.GroupVersion{
      	Group:   bolingcavalry.GroupName,
      	Version: bolingcavalry.Version,
      }
      
      var (
      	SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
      	AddToScheme   = SchemeBuilder.AddToScheme
      )
      
      func Resource(resource string) schema.GroupResource {
      	return SchemeGroupVersion.WithResource(resource).GroupResource()
      }
      
      func Kind(kind string) schema.GroupKind {
      	return SchemeGroupVersion.WithKind(kind).GroupKind()
      }
      
      func addKnownTypes(scheme *runtime.Scheme) error {
      	scheme.AddKnownTypes(
      		SchemeGroupVersion,
      		&Student{},
      		&StudentList{},
      	)
      
      	// register the type in the scheme
      	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
      	return nil
      }
      
    8. 至此,为自动生成代码做的准备工作已经完成

    9. 执行以下命令,会先下载依赖包,再下载代码生成工具,再执行代码生成工作:

      cd $GOPATH/src \
      && go get -u -v k8s.io/apimachinery/pkg/apis/meta/v1 \
      && go get -u -v k8s.io/code-generator/... \
      && cd $GOPATH/src/k8s.io/code-generator \
      && ./generate-groups.sh all \
      k8s_customize_controller/pkg/client \
      k8s_customize_controller/pkg/apis \
      bolingcavalry:v1
      
      #如果code-generator安装失败(网络原因),可以手动下载代码安装,在执行上面命令
      git clone https://github.com/kubernetes/code-generator
      ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOME_RESOURCE_VERSION"
      
      
    10. 如果代码没问题,会看到以下输出

      Generating deepcopy funcs
      Generating clientset for bolingcavalry:v1 at k8s_customize_controller/pkg/client/clientset
      Generating listers for bolingcavalry:v1 at k8s_customize_controller/pkg/client/listers
      Generating informers for bolingcavalry:v1 at k8s_customize_controller/pkg/client/informers
      
    11. 此时再去$GOPATH/src/k8s_customize_controller目录下执行tree命令,可见已生成了很多内容

      [root@master k8s_customize_controller]# tree
      .
      └── pkg
          ├── apis
          │   └── bolingcavalry
          │       ├── register.go
          │       └── v1
          │           ├── doc.go
          │           ├── register.go
          │           ├── types.go
          │           └── zz_generated.deepcopy.go
          └── client
              ├── clientset
              │   └── versioned
              │       ├── clientset.go
              │       ├── doc.go
              │       ├── fake
              │       │   ├── clientset_generated.go
              │       │   ├── doc.go
              │       │   └── register.go
              │       ├── scheme
              │       │   ├── doc.go
              │       │   └── register.go
              │       └── typed
              │           └── bolingcavalry
              │               └── v1
              │                   ├── bolingcavalry_client.go
              │                   ├── doc.go
              │                   ├── fake
              │                   │   ├── doc.go
              │                   │   ├── fake_bolingcavalry_client.go
              │                   │   └── fake_student.go
              │                   ├── generated_expansion.go
              │                   └── student.go
              ├── informers
              │   └── externalversions
              │       ├── bolingcavalry
              │       │   ├── interface.go
              │       │   └── v1
              │       │       ├── interface.go
              │       │       └── student.go
              │       ├── factory.go
              │       ├── generic.go
              │       └── internalinterfaces
              │           └── factory_interfaces.go
              └── listers
                  └── bolingcavalry
                      └── v1
                          ├── expansion_generated.go
                          └── student.go
      
      21 directories, 27 files
      

      如上所示,zz_generated.deepcopy.go就是DeepCopy代码文件,client目录下的内容都是客户端相关代码,在开发controller时会用到;
      client目录下的clientset、informers、listers的身份和作用可以和前面的图结合来理解

    编写controller代码

    现在已经能监听到Student对象的增删改等事件,接下来就是根据这些事件来做不同的事情,满足个性化的业务需求

    1. 编写的第一个go文件就是controller.go,在k8s_customize_controller目录下创建controller.go

      package main
      
      import (
      	"fmt"
      	"time"
      
      	"github.com/golang/glog"
      	corev1 "k8s.io/api/core/v1"
      	"k8s.io/apimachinery/pkg/api/errors"
      	"k8s.io/apimachinery/pkg/util/runtime"
      	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
      	"k8s.io/apimachinery/pkg/util/wait"
      	"k8s.io/client-go/kubernetes"
      	"k8s.io/client-go/kubernetes/scheme"
      	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
      	"k8s.io/client-go/tools/cache"
      	"k8s.io/client-go/tools/record"
      	"k8s.io/client-go/util/workqueue"
      
      	bolingcavalryv1 "github.com/zq2599/k8s-controller-custom-resource/pkg/apis/bolingcavalry/v1"
      	clientset "github.com/zq2599/k8s-controller-custom-resource/pkg/client/clientset/versioned"
      	studentscheme "github.com/zq2599/k8s-controller-custom-resource/pkg/client/clientset/versioned/scheme"
      	informers "github.com/zq2599/k8s-controller-custom-resource/pkg/client/informers/externalversions/bolingcavalry/v1"
      	listers "github.com/zq2599/k8s-controller-custom-resource/pkg/client/listers/bolingcavalry/v1"
      )
      
      const controllerAgentName = "student-controller"
      
      const (
      	SuccessSynced = "Synced"
      
      	MessageResourceSynced = "Student synced successfully"
      )
      
      // Controller is the controller implementation for Student resources
      type Controller struct {
      	// kubeclientset is a standard kubernetes clientset
      	kubeclientset kubernetes.Interface
      	// studentclientset is a clientset for our own API group
      	studentclientset clientset.Interface
      
      	studentsLister listers.StudentLister
      	studentsSynced cache.InformerSynced
      
      	workqueue workqueue.RateLimitingInterface
      
      	recorder record.EventRecorder
      }
      
      // NewController returns a new student controller
      func NewController(
      	kubeclientset kubernetes.Interface,
      	studentclientset clientset.Interface,
      	studentInformer informers.StudentInformer) *Controller {
      
      	utilruntime.Must(studentscheme.AddToScheme(scheme.Scheme))
      	glog.V(4).Info("Creating event broadcaster")
      	eventBroadcaster := record.NewBroadcaster()
      	eventBroadcaster.StartLogging(glog.Infof)
      	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
      	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
      
      	controller := &Controller{
      		kubeclientset:    kubeclientset,
      		studentclientset: studentclientset,
      		studentsLister:   studentInformer.Lister(),
      		studentsSynced:   studentInformer.Informer().HasSynced,
      		workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"),
      		recorder:         recorder,
      	}
      
      	glog.Info("Setting up event handlers")
      	// Set up an event handler for when Student resources change
      	studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      		AddFunc: controller.enqueueStudent,
      		UpdateFunc: func(old, new interface{}) {
      			oldStudent := old.(*bolingcavalryv1.Student)
      			newStudent := new.(*bolingcavalryv1.Student)
      			if oldStudent.ResourceVersion == newStudent.ResourceVersion {
                      //版本一致,就表示没有实际更新的操作,立即返回
      				return
      			}
      			controller.enqueueStudent(new)
      		},
      		DeleteFunc: controller.enqueueStudentForDelete,
      	})
      
      	return controller
      }
      
      //在此处开始controller的业务
      func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
      	defer runtime.HandleCrash()
      	defer c.workqueue.ShutDown()
      
      	glog.Info("开始controller业务,开始一次缓存数据同步")
      	if ok := cache.WaitForCacheSync(stopCh, c.studentsSynced); !ok {
      		return fmt.Errorf("failed to wait for caches to sync")
      	}
      
      	glog.Info("worker启动")
      	for i := 0; i < threadiness; i++ {
      		go wait.Until(c.runWorker, time.Second, stopCh)
      	}
      
      	glog.Info("worker已经启动")
      	<-stopCh
      	glog.Info("worker已经结束")
      
      	return nil
      }
      
      func (c *Controller) runWorker() {
      	for c.processNextWorkItem() {
      	}
      }
      
      // 取数据处理
      func (c *Controller) processNextWorkItem() bool {
      
      	obj, shutdown := c.workqueue.Get()
      
      	if shutdown {
      		return false
      	}
      
      	// We wrap this block in a func so we can defer c.workqueue.Done.
      	err := func(obj interface{}) error {
      		defer c.workqueue.Done(obj)
      		var key string
      		var ok bool
      
      		if key, ok = obj.(string); !ok {
      
      			c.workqueue.Forget(obj)
      			runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
      			return nil
      		}
      		// 在syncHandler中处理业务
      		if err := c.syncHandler(key); err != nil {
      			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
      		}
      
      		c.workqueue.Forget(obj)
      		glog.Infof("Successfully synced '%s'", key)
      		return nil
      	}(obj)
      
      	if err != nil {
      		runtime.HandleError(err)
      		return true
      	}
      
      	return true
      }
      
      // 处理
      func (c *Controller) syncHandler(key string) error {
      	// Convert the namespace/name string into a distinct namespace and name
      	namespace, name, err := cache.SplitMetaNamespaceKey(key)
      	if err != nil {
      		runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
      		return nil
      	}
      
      	// 从缓存中取对象
      	student, err := c.studentsLister.Students(namespace).Get(name)
      	if err != nil {
      		// 如果Student对象被删除了,就会走到这里,所以应该在这里加入执行
      		if errors.IsNotFound(err) {
      			glog.Infof("Student对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name)
      
      			return nil
      		}
      
      		runtime.HandleError(fmt.Errorf("failed to list student by: %s/%s", namespace, name))
      
      		return err
      	}
      
      	glog.Infof("这里是student对象的期望状态: %#v ...", student)
      	glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)")
      
      	c.recorder.Event(student, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
      	return nil
      }
      
      // 数据先放入缓存,再入队列
      func (c *Controller) enqueueStudent(obj interface{}) {
      	var key string
      	var err error
      	// 将对象放入缓存
      	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
      		runtime.HandleError(err)
      		return
      	}
      
      	// 将key放入队列
      	c.workqueue.AddRateLimited(key)
      }
      
      // 删除操作
      func (c *Controller) enqueueStudentForDelete(obj interface{}) {
      	var key string
      	var err error
      	// 从缓存中删除指定对象
      	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
      	if err != nil {
      		runtime.HandleError(err)
      		return
      	}
      	//再将key放入队列
      	c.workqueue.AddRateLimited(key)
      }
      
      

      上述代码有以下几处关键点:
      a. 创建controller的NewController方法中,定义了收到Student对象的增删改消息时的具体处理逻辑,除了同步本地缓存,就是将该对象的key放入消息中;
      b. 实际处理消息的方法是syncHandler,这里面可以添加实际的业务代码,来响应Student对象的增删改情况,达到业务目的;

    2. 接下来可以写main.go了,不过在此之前把处理系统信号量的辅助类先写好,然后在main.go中会用到(处理例如ctrl+c的退出),在$GOPATH/src/k8s_customize_controller/pkg目录下新建目录signals;

    3. 在signals目录下新建文件signal_posix.go:

      // +build !windows
      
      package signals
      
      import (
      	"os"
      	"syscall"
      )
      
      var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
      
      
      
    4. 在signals目录下新建文件signal_windows.go

      package signals
      
      import (
      	"os"
      )
      
      var shutdownSignals = []os.Signal{os.Interrupt}
      
      
    5. 在signals目录下新建文件signal.go

      package signals
      
      import (
              "os"
              "os/signal"
      )
      
      var onlyOneSignalHandler = make(chan struct{})
      
      func SetupSignalHandler() (stopCh <-chan struct{}) {
              close(onlyOneSignalHandler) // panics when called twice
      
              stop := make(chan struct{})
              c := make(chan os.Signal, 2)
              signal.Notify(c, shutdownSignals...)
              go func() {
                      <-c
                      close(stop)
                      <-c
                      os.Exit(1) // second signal. Exit directly.
              }()
      
              return stop
      }
      
    6. 接下来可以编写main.go了,在k8s_customize_controller目录下创建main.go文件,内容如下,关键位置已经加了注释,就不再赘述了:

      package main
      
      import (
      	"flag"
      	"time"
      
      	"github.com/golang/glog"
      	"k8s.io/client-go/kubernetes"
      	"k8s.io/client-go/tools/clientcmd"
      	// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
      	// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
      
      	clientset "k8s_customize_controller/pkg/client/clientset/versioned"
      	informers "k8s_customize_controller/pkg/client/informers/externalversions"
      	"k8s_customize_controller/pkg/signals"
      )
      
      var (
      	masterURL  string
      	kubeconfig string
      )
      
      func main() {
      	flag.Parse()
      
      	// 处理信号量
      	stopCh := signals.SetupSignalHandler()
      
          // 处理入参
      	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
      	if err != nil {
      		glog.Fatalf("Error building kubeconfig: %s", err.Error())
      	}
      
      	kubeClient, err := kubernetes.NewForConfig(cfg)
      	if err != nil {
      		glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
      	}
      
      	studentClient, err := clientset.NewForConfig(cfg)
      	if err != nil {
      		glog.Fatalf("Error building example clientset: %s", err.Error())
      	}
      
      	studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30)
      
          //得到controller
      	controller := NewController(kubeClient, studentClient,
      		studentInformerFactory.Bolingcavalry().V1().Students())
      
          //启动informer
      	go studentInformerFactory.Start(stopCh)
      
          //controller开始处理消息
      	if err = controller.Run(2, stopCh); err != nil {
      		glog.Fatalf("Error running controller: %s", err.Error())
      	}
      }
      
      func init() {
      	flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
      	flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
      }
      
      
      

      至此,所有代码已经编写完毕,接下来是编译构建

    编译构建和启动

    1. 在$GOPATH/src/k8s_customize_controller目录下,执行以下命令:

      go get k8s.io/client-go/kubernetes/scheme \
      && go get github.com/golang/glog \
      && go get k8s.io/kube-openapi/pkg/util/proto \
      && go get k8s.io/utils/buffer \
      && go get k8s.io/utils/integer \
      && go get k8s.io/utils/trace
      
    2. 上述脚本将编译过程中依赖的库通过go get方式进行获取,属于笨办法,更好的方法是选用一种包依赖工具,具体的可以参照k8s的官方demo,这个代码中同时提供了godep和vendor两种方式来处理上面的包依赖问题,地址是:https

    3. 解决了包依赖问题后,在$GOPATH/src/k8s_customize_controller目录下执行命令go build,即可在当前目录生成k8s_customize_controller文件;

    4. 将文件k8s_customize_controller复制到k8s环境中,记得通过chmod a+x命令给其可执行权限;

    5. 执行命令./k8s_customize_controller -kubeconfig=$HOME/.kube/config -alsologtostderr=true,会立即启动controller

    总结

    现在小结一下自定义controller开发的整个过程:

    1. 创建CRD(Custom Resource Definition),令k8s明白我们自定义的API对象;

    2. 编写代码,将CRD的情况写入对应的代码中,然后通过自动代码生成工具,将controller之外的informer,client等内容较为固定的代码通过工具生成;

    3. 编写controller,在里面判断实际情况是否达到了API对象的声明情况,如果未达到,就要进行实际业务处理,而这也是controller的通用做法;

    4. 实际编码过程并不负载,动手编写的文件如下:

      ├── controller.go
      ├── main.go
      └── pkg
          ├── apis
          │   └── bolingcavalry
          │       ├── register.go
          │       └── v1
          │           ├── doc.go
          │           ├── register.go
          │           └── types.go
          └── signals
              ├── signal.go
              ├── signal_posix.go
              └── signal_windows.go
      

    原文链接:https://blog.csdn.net/boling_cavalry/article/details/88924194

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册