如果要开发一个Dynamic Provisioner,需要使用到the helper library

1. Dynamic Provisioner

1.1. Provisioner Interface

开发Dynamic Provisioner需要实现Provisioner接口,该接口有两个方法,分别是:

  • Provision:创建存储资源,并且返回一个PV对象。
  • Delete:移除对应的存储资源,但并没有删除PV对象。

Provisioner 接口源码如下:

  1. // Provisioner is an interface that creates templates for PersistentVolumes
  2. // and can create the volume as a new resource in the infrastructure provider.
  3. // It can also remove the volume it created from the underlying storage
  4. // provider.
  5. type Provisioner interface {
  6. // Provision creates a volume i.e. the storage asset and returns a PV object
  7. // for the volume
  8. Provision(VolumeOptions) (*v1.PersistentVolume, error)
  9. // Delete removes the storage asset that was created by Provision backing the
  10. // given PV. Does not delete the PV object itself.
  11. //
  12. // May return IgnoredError to indicate that the call has been ignored and no
  13. // action taken.
  14. Delete(*v1.PersistentVolume) error
  15. }

1.2. VolumeOptions

Provisioner接口的Provision方法的入参是一个VolumeOptions对象。VolumeOptions对象包含了创建PV对象所需要的信息,例如:PV的回收策略,PV的名字,PV所对应的PVC对象以及PVC的StorageClass对象使用的参数等。

VolumeOptions 源码如下:

  1. // VolumeOptions contains option information about a volume
  2. // https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.go
  3. type VolumeOptions struct {
  4. // Reclamation policy for a persistent volume
  5. PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy
  6. // PV.Name of the appropriate PersistentVolume. Used to generate cloud
  7. // volume name.
  8. PVName string
  9. // PV mount options. Not validated - mount of the PVs will simply fail if one is invalid.
  10. MountOptions []string
  11. // PVC is reference to the claim that lead to provisioning of a new PV.
  12. // Provisioners *must* create a PV that would be matched by this PVC,
  13. // i.e. with required capacity, accessMode, labels matching PVC.Selector and
  14. // so on.
  15. PVC *v1.PersistentVolumeClaim
  16. // Volume provisioning parameters from StorageClass
  17. Parameters map[string]string
  18. // Node selected by the scheduler for the volume.
  19. SelectedNode *v1.Node
  20. // Topology constraint parameter from StorageClass
  21. AllowedTopologies []v1.TopologySelectorTerm
  22. }

1.3. ProvisionController

ProvisionController是一个给PVC提供PV的控制器,具体执行Provisioner接口的ProvisionDelete的方法的所有逻辑。

1.4. 开发provisioner的步骤

  1. 写一个provisioner实现Provisioner接口(包含ProvisionDelete的方法)。
  2. 通过该provisioner构建ProvisionController
  3. 执行ProvisionControllerRun方法。

2. NFS Client Provisioner

nfs-client-provisioner是一个automatic provisioner,使用NFS作为存储,自动创建PV和对应的PVC,本身不提供NFS存储,需要外部先有一套NFS存储服务。

  • PV以 ${namespace}-${pvcName}-${pvName}的命名格式提供(在NFS服务器上)
  • PV回收的时候以 archieved-${namespace}-${pvcName}-${pvName} 的命名格式(在NFS服务器上)

以下通过nfs-client-provisioner的源码分析来说明开发自定义provisioner整个过程。nfs-client-provisioner的主要代码都在provisioner.go的文件中。

nfs-client-provisioner源码地址:https://github.com/kubernetes-incubator/external-storage/tree/master/nfs-client

2.1. Main函数

2.1.1. 读取环境变量

源码如下:

  1. func main() {
  2. flag.Parse()
  3. flag.Set("logtostderr", "true")
  4. server := os.Getenv("NFS_SERVER")
  5. if server == "" {
  6. glog.Fatal("NFS_SERVER not set")
  7. }
  8. path := os.Getenv("NFS_PATH")
  9. if path == "" {
  10. glog.Fatal("NFS_PATH not set")
  11. }
  12. provisionerName := os.Getenv(provisionerNameKey)
  13. if provisionerName == "" {
  14. glog.Fatalf("environment variable %s is not set! Please set it.", provisionerNameKey)
  15. }
  16. ...
  17. }

main函数先获取NFS_SERVERNFS_PATHPROVISIONER_NAME三个环境变量的值,因此在部署nfs-client-provisioner的时候,需要将这三个环境变量的值传入。

  • NFS_SERVER:NFS服务端的IP地址。
  • NFS_PATH:NFS服务端设置的共享目录
  • PROVISIONER_NAME:provisioner的名字,需要和StorageClass对象中的provisioner字段一致。

例如StorageClass对象的yaml文件如下:

  1. apiVersion: storage.k8s.io/v1
  2. kind: StorageClass
  3. metadata:
  4. name: managed-nfs-storage
  5. provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME'
  6. parameters:
  7. archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.

2.1.2. 获取clientset对象

源码如下:

  1. // Create an InClusterConfig and use it to create a client for the controller
  2. // to use to communicate with Kubernetes
  3. config, err := rest.InClusterConfig()
  4. if err != nil {
  5. glog.Fatalf("Failed to create config: %v", err)
  6. }
  7. clientset, err := kubernetes.NewForConfig(config)
  8. if err != nil {
  9. glog.Fatalf("Failed to create client: %v", err)
  10. }

通过读取对应的k8s的配置,创建clientset对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。

2.1.3. 构造nfsProvisioner对象

源码如下:

  1. // The controller needs to know what the server version is because out-of-tree
  2. // provisioners aren't officially supported until 1.5
  3. serverVersion, err := clientset.Discovery().ServerVersion()
  4. if err != nil {
  5. glog.Fatalf("Error getting server version: %v", err)
  6. }
  7. clientNFSProvisioner := &nfsProvisioner{
  8. client: clientset,
  9. server: server,
  10. path: path,
  11. }

通过clientsetserverpath等值构造nfsProvisioner对象,同时还获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。

nfsProvisioner类型定义如下:

  1. type nfsProvisioner struct {
  2. client kubernetes.Interface
  3. server string
  4. path string
  5. }
  6. var _ controller.Provisioner = &nfsProvisioner{}

nfsProvisioner是一个自定义的provisioner,用来实现Provisioner的接口,其中的属性除了serverpath这两个关于NFS相关的参数,还包含了client,主要用来调用k8s的API。

  1. var _ controller.Provisioner = &nfsProvisioner{}

以上用法用来检测nfsProvisioner是否实现了Provisioner的接口。

2.1.4. 构建并运行ProvisionController

源码如下:

  1. // Start the provision controller which will dynamically provision efs NFS
  2. // PVs
  3. pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion)
  4. pc.Run(wait.NeverStop)

通过nfsProvisioner构造ProvisionController对象并执行Run方法,ProvisionController实现了具体的PV和PVC的相关逻辑,Run方法以常驻进程的方式运行。

2.2. ProvisionDelete方法

2.2.1. Provision方法

nfsProvisionerProvision方法具体源码参考:https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L56

Provision方法用来创建存储资源,并且返回一个PV对象。其中入参是VolumeOptions,用来指定PV对象的相关属性。

1、构建PV和PVC的名称

  1. func (p *nfsProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
  2. if options.PVC.Spec.Selector != nil {
  3. return nil, fmt.Errorf("claim Selector is not supported")
  4. }
  5. glog.V(4).Infof("nfs provisioner: VolumeOptions %v", options)
  6. pvcNamespace := options.PVC.Namespace
  7. pvcName := options.PVC.Name
  8. pvName := strings.Join([]string{pvcNamespace, pvcName, options.PVName}, "-")
  9. fullPath := filepath.Join(mountPath, pvName)
  10. glog.V(4).Infof("creating path %s", fullPath)
  11. if err := os.MkdirAll(fullPath, 0777); err != nil {
  12. return nil, errors.New("unable to create directory to provision new pv: " + err.Error())
  13. }
  14. os.Chmod(fullPath, 0777)
  15. path := filepath.Join(p.path, pvName)
  16. ...
  17. }

通过VolumeOptions的入参,构建PV和PVC的名称,以及创建路径path。

2、构造PV对象

  1. pv := &v1.PersistentVolume{
  2. ObjectMeta: metav1.ObjectMeta{
  3. Name: options.PVName,
  4. },
  5. Spec: v1.PersistentVolumeSpec{
  6. PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
  7. AccessModes: options.PVC.Spec.AccessModes,
  8. MountOptions: options.MountOptions,
  9. Capacity: v1.ResourceList{
  10. v1.ResourceName(v1.ResourceStorage): options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)],
  11. },
  12. PersistentVolumeSource: v1.PersistentVolumeSource{
  13. NFS: &v1.NFSVolumeSource{
  14. Server: p.server,
  15. Path: path,
  16. ReadOnly: false,
  17. },
  18. },
  19. },
  20. }
  21. return pv, nil

综上可以看出,Provision方法只是通过VolumeOptions参数来构建PV对象,并没有执行具体PV的创建或删除的操作。

不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如nfs-provisioner对应的PersistentVolumeSourceNFS,并且需要传入NFS相关的参数:ServerPath等。

2.2.2. Delete方法

nfsProvisionerdelete方法具体源码参考:https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L99

1、获取pvName和path等相关参数

  1. func (p *nfsProvisioner) Delete(volume *v1.PersistentVolume) error {
  2. path := volume.Spec.PersistentVolumeSource.NFS.Path
  3. pvName := filepath.Base(path)
  4. oldPath := filepath.Join(mountPath, pvName)
  5. if _, err := os.Stat(oldPath); os.IsNotExist(err) {
  6. glog.Warningf("path %s does not exist, deletion skipped", oldPath)
  7. return nil
  8. }
  9. ...
  10. }

通过pathpvName生成oldPath,其中oldPath是原先NFS服务器上pod对应的数据持久化存储路径。

2、获取archiveOnDelete参数并删除数据

  1. // Get the storage class for this volume.
  2. storageClass, err := p.getClassForVolume(volume)
  3. if err != nil {
  4. return err
  5. }
  6. // Determine if the "archiveOnDelete" parameter exists.
  7. // If it exists and has a falsey value, delete the directory.
  8. // Otherwise, archive it.
  9. archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"]
  10. if exists {
  11. archiveBool, err := strconv.ParseBool(archiveOnDelete)
  12. if err != nil {
  13. return err
  14. }
  15. if !archiveBool {
  16. return os.RemoveAll(oldPath)
  17. }
  18. }

如果storageClass对象中指定archiveOnDelete参数并且值为false,则会自动删除oldPath下的所有数据,即pod对应的数据持久化存储数据。

archiveOnDelete字面意思为删除时是否存档,false表示不存档,即删除数据,true表示存档,即重命名路径。

3、重命名旧数据路径

  1. archivePath := filepath.Join(mountPath, "archived-"+pvName)
  2. glog.V(4).Infof("archiving path %s to %s", oldPath, archivePath)
  3. return os.Rename(oldPath, archivePath)

如果storageClass对象中没有指定archiveOnDelete参数或者值为true,表明需要删除时存档,即将oldPath重命名,命名格式为oldPath前面增加archived-的前缀。

3. ProvisionController

3.1. ProvisionController结构体

源码具体参考:https://github.com/kubernetes-incubator/external-storage/blob/master/lib/controller/controller.go#L82

ProvisionController是一个给PVC提供PV的控制器,具体执行Provisioner接口的ProvisionDelete的方法的所有逻辑。

3.1.1. 入参

  1. // ProvisionController is a controller that provisions PersistentVolumes for
  2. // PersistentVolumeClaims.
  3. type ProvisionController struct {
  4. client kubernetes.Interface
  5. // The name of the provisioner for which this controller dynamically
  6. // provisions volumes. The value of annDynamicallyProvisioned and
  7. // annStorageProvisioner to set & watch for, respectively
  8. provisionerName string
  9. // The provisioner the controller will use to provision and delete volumes.
  10. // Presumably this implementer of Provisioner carries its own
  11. // volume-specific options and such that it needs in order to provision
  12. // volumes.
  13. provisioner Provisioner
  14. // Kubernetes cluster server version:
  15. // * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic
  16. // provisioning is not officially supported, though it works
  17. // * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is
  18. // officially supported
  19. // * 1.6: storage classes enter GA
  20. kubeVersion *utilversion.Version
  21. ...
  22. }

clientprovisionerNameprovisionerkubeVersion等属性作为NewProvisionController的入参。

  • client:clientset客户端,用来调用k8s的API。
  • provisionerName:provisioner的名字,需要和StorageClass对象中的provisioner字段一致。
  • provisioner:具体的provisioner的实现者,本文为nfsProvisioner
  • kubeVersion:k8s的版本信息。

3.1.2. Controller和Informer

  1. type ProvisionController struct {
  2. ...
  3. claimInformer cache.SharedInformer
  4. claims cache.Store
  5. claimController cache.Controller
  6. volumeInformer cache.SharedInformer
  7. volumes cache.Store
  8. volumeController cache.Controller
  9. classInformer cache.SharedInformer
  10. classes cache.Store
  11. classController cache.Controller
  12. ...
  13. }

ProvisionController结构体中包含了PVPVCStorageClass三个对象的ControllerInformerStore,主要用来执行这三个对象的相关操作。

  • Controller:通用的控制框架
  • Informer:消息通知器
  • Store:通用的对象存储接口

3.1.3. workqueue

  1. type ProvisionController struct {
  2. ...
  3. claimQueue workqueue.RateLimitingInterface
  4. volumeQueue workqueue.RateLimitingInterface
  5. ...
  6. }

claimQueuevolumeQueue分别是PVPVC的任务队列。

3.1.4. 其他

  1. // Identity of this controller, generated at creation time and not persisted
  2. // across restarts. Useful only for debugging, for seeing the source of
  3. // events. controller.provisioner may have its own, different notion of
  4. // identity which may/may not persist across restarts
  5. id string
  6. component string
  7. eventRecorder record.EventRecorder
  8. resyncPeriod time.Duration
  9. exponentialBackOffOnError bool
  10. threadiness int
  11. createProvisionedPVRetryCount int
  12. createProvisionedPVInterval time.Duration
  13. failedProvisionThreshold, failedDeleteThreshold int
  14. // The port for metrics server to serve on.
  15. metricsPort int32
  16. // The IP address for metrics server to serve on.
  17. metricsAddress string
  18. // The path of metrics endpoint path.
  19. metricsPath string
  20. // Parameters of leaderelection.LeaderElectionConfig.
  21. leaseDuration, renewDeadline, retryPeriod time.Duration
  22. hasRun bool
  23. hasRunLock *sync.Mutex

3.2. NewProvisionController方法

源码地址:https://github.com/kubernetes-incubator/external-storage/blob/master/lib/controller/controller.go#L418

NewProvisionController方法主要用来构造ProvisionController

3.2.1. 初始化默认值

  1. // NewProvisionController creates a new provision controller using
  2. // the given configuration parameters and with private (non-shared) informers.
  3. func NewProvisionController(
  4. client kubernetes.Interface,
  5. provisionerName string,
  6. provisioner Provisioner,
  7. kubeVersion string,
  8. options ...func(*ProvisionController) error,
  9. ) *ProvisionController {
  10. ...
  11. controller := &ProvisionController{
  12. client: client,
  13. provisionerName: provisionerName,
  14. provisioner: provisioner,
  15. kubeVersion: utilversion.MustParseSemantic(kubeVersion),
  16. id: id,
  17. component: component,
  18. eventRecorder: eventRecorder,
  19. resyncPeriod: DefaultResyncPeriod,
  20. exponentialBackOffOnError: DefaultExponentialBackOffOnError,
  21. threadiness: DefaultThreadiness,
  22. createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,
  23. createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
  24. failedProvisionThreshold: DefaultFailedProvisionThreshold,
  25. failedDeleteThreshold: DefaultFailedDeleteThreshold,
  26. leaseDuration: DefaultLeaseDuration,
  27. renewDeadline: DefaultRenewDeadline,
  28. retryPeriod: DefaultRetryPeriod,
  29. metricsPort: DefaultMetricsPort,
  30. metricsAddress: DefaultMetricsAddress,
  31. metricsPath: DefaultMetricsPath,
  32. hasRun: false,
  33. hasRunLock: &sync.Mutex{},
  34. }
  35. ...
  36. }

3.2.2. 初始化任务队列

  1. ratelimiter := workqueue.NewMaxOfRateLimiter(
  2. workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),
  3. &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  4. )
  5. if !controller.exponentialBackOffOnError {
  6. ratelimiter = workqueue.NewMaxOfRateLimiter(
  7. workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),
  8. &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  9. )
  10. }
  11. controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims")
  12. controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes")

3.2.3. ListWatch

  1. // PVC
  2. claimSource := &cache.ListWatch{
  3. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  4. return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
  5. },
  6. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  7. return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
  8. },
  9. }
  10. // PV
  11. volumeSource := &cache.ListWatch{
  12. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  13. return client.CoreV1().PersistentVolumes().List(options)
  14. },
  15. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  16. return client.CoreV1().PersistentVolumes().Watch(options)
  17. },
  18. }
  19. // StorageClass
  20. classSource = &cache.ListWatch{
  21. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  22. return client.StorageV1().StorageClasses().List(options)
  23. },
  24. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  25. return client.StorageV1().StorageClasses().Watch(options)
  26. },
  27. }

list-watch机制是k8s中用来监听对象变化的核心机制,ListWatch包含ListFuncWatchFunc两个函数,且不能为空,以上代码分别构造了PV、PVC、StorageClass三个对象的ListWatch结构体。该机制的实现在client-gocache包中,具体参考:https://godoc.org/k8s.io/client-go/tools/cache。

更多ListWatch代码如下:

具体参考:https://github.com/kubernetes-incubator/external-storage/blob/89b0aaf6413b249b37834b124fc314ef7b8ee949/vendor/k8s.io/client-go/tools/cache/listwatch.go#L34

  1. // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
  2. type ListerWatcher interface {
  3. // List should return a list type object; the Items field will be extracted, and the
  4. // ResourceVersion field will be used to start the watch in the right place.
  5. List(options metav1.ListOptions) (runtime.Object, error)
  6. // Watch should begin a watch at the specified version.
  7. Watch(options metav1.ListOptions) (watch.Interface, error)
  8. }
  9. // ListFunc knows how to list resources
  10. type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
  11. // WatchFunc knows how to watch resources
  12. type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
  13. // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
  14. // It is a convenience function for users of NewReflector, etc.
  15. // ListFunc and WatchFunc must not be nil
  16. type ListWatch struct {
  17. ListFunc ListFunc
  18. WatchFunc WatchFunc
  19. // DisableChunking requests no chunking for this list watcher.
  20. DisableChunking bool
  21. }

3.2.4. ResourceEventHandlerFuncs

  1. // PVC
  2. claimHandler := cache.ResourceEventHandlerFuncs{
  3. AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
  4. UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
  5. DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },
  6. }
  7. // PV
  8. volumeHandler := cache.ResourceEventHandlerFuncs{
  9. AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
  10. UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
  11. DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
  12. }
  13. // StorageClass
  14. classHandler := cache.ResourceEventHandlerFuncs{
  15. // We don't need an actual event handler for StorageClasses,
  16. // but we must pass a non-nil one to cache.NewInformer()
  17. AddFunc: nil,
  18. UpdateFunc: nil,
  19. DeleteFunc: nil,
  20. }

ResourceEventHandlerFuncs是资源事件处理函数,主要用来对k8s资源对象增删改变化的事件进行消息通知,该函数实现了ResourceEventHandler的接口。具体代码逻辑在client-go的cache包中。

更多ResourceEventHandlerFuncs代码可参考:

  1. // ResourceEventHandler can handle notifications for events that happen to a
  2. // resource. The events are informational only, so you can't return an
  3. // error.
  4. // * OnAdd is called when an object is added.
  5. // * OnUpdate is called when an object is modified. Note that oldObj is the
  6. // last known state of the object-- it is possible that several changes
  7. // were combined together, so you can't use this to see every single
  8. // change. OnUpdate is also called when a re-list happens, and it will
  9. // get called even if nothing changed. This is useful for periodically
  10. // evaluating or syncing something.
  11. // * OnDelete will get the final state of the item if it is known, otherwise
  12. // it will get an object of type DeletedFinalStateUnknown. This can
  13. // happen if the watch is closed and misses the delete event and we don't
  14. // notice the deletion until the subsequent re-list.
  15. type ResourceEventHandler interface {
  16. OnAdd(obj interface{})
  17. OnUpdate(oldObj, newObj interface{})
  18. OnDelete(obj interface{})
  19. }
  20. // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
  21. // as few of the notification functions as you want while still implementing
  22. // ResourceEventHandler.
  23. type ResourceEventHandlerFuncs struct {
  24. AddFunc func(obj interface{})
  25. UpdateFunc func(oldObj, newObj interface{})
  26. DeleteFunc func(obj interface{})
  27. }

3.2.5. 构造Store和Controller

1、PVC

  1. if controller.claimInformer != nil {
  2. controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
  3. controller.claims, controller.claimController =
  4. controller.claimInformer.GetStore(),
  5. controller.claimInformer.GetController()
  6. } else {
  7. controller.claims, controller.claimController =
  8. cache.NewInformer(
  9. claimSource,
  10. &v1.PersistentVolumeClaim{},
  11. controller.resyncPeriod,
  12. claimHandler,
  13. )
  14. }

2、PV

  1. if controller.volumeInformer != nil {
  2. controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
  3. controller.volumes, controller.volumeController =
  4. controller.volumeInformer.GetStore(),
  5. controller.volumeInformer.GetController()
  6. } else {
  7. controller.volumes, controller.volumeController =
  8. cache.NewInformer(
  9. volumeSource,
  10. &v1.PersistentVolume{},
  11. controller.resyncPeriod,
  12. volumeHandler,
  13. )
  14. }

3、StorageClass

  1. if controller.classInformer != nil {
  2. // no resource event handler needed for StorageClasses
  3. controller.classes, controller.classController =
  4. controller.classInformer.GetStore(),
  5. controller.classInformer.GetController()
  6. } else {
  7. controller.classes, controller.classController = cache.NewInformer(
  8. classSource,
  9. versionedClassType,
  10. controller.resyncPeriod,
  11. classHandler,
  12. )
  13. }

通过cache.NewInformer的方法构造,入参是ListWatch结构体和ResourceEventHandlerFuncs函数等,返回值是StoreController

通过以上各个部分的构造,最后返回一个具体的ProvisionController对象。

3.3. ProvisionController.Run方法

ProvisionControllerRun方法是以常驻进程的方式运行,函数内部再运行其他的controller。

3.3.1. prometheus数据收集

  1. // Run starts all of this controller's control loops
  2. func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
  3. run := func(stopCh <-chan struct{}) {
  4. ...
  5. if ctrl.metricsPort > 0 {
  6. prometheus.MustRegister([]prometheus.Collector{
  7. metrics.PersistentVolumeClaimProvisionTotal,
  8. metrics.PersistentVolumeClaimProvisionFailedTotal,
  9. metrics.PersistentVolumeClaimProvisionDurationSeconds,
  10. metrics.PersistentVolumeDeleteTotal,
  11. metrics.PersistentVolumeDeleteFailedTotal,
  12. metrics.PersistentVolumeDeleteDurationSeconds,
  13. }...)
  14. http.Handle(ctrl.metricsPath, promhttp.Handler())
  15. address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10))
  16. glog.Infof("Starting metrics server at %s\n", address)
  17. go wait.Forever(func() {
  18. err := http.ListenAndServe(address, nil)
  19. if err != nil {
  20. glog.Errorf("Failed to listen on %s: %v", address, err)
  21. }
  22. }, 5*time.Second)
  23. }
  24. ...
  25. }

3.3.2. Controller.Run

  1. // If a SharedInformer has been passed in, this controller should not
  2. // call Run again
  3. if ctrl.claimInformer == nil {
  4. go ctrl.claimController.Run(stopCh)
  5. }
  6. if ctrl.volumeInformer == nil {
  7. go ctrl.volumeController.Run(stopCh)
  8. }
  9. if ctrl.classInformer == nil {
  10. go ctrl.classController.Run(stopCh)
  11. }

运行消息通知器Informer。

3.3.3. Worker

  1. for i := 0; i < ctrl.threadiness; i++ {
  2. go wait.Until(ctrl.runClaimWorker, time.Second, stopCh)
  3. go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh)
  4. }

runClaimWorkerrunVolumeWorker分别为PVC和PV的worker,这两个的具体执行体分别是processNextClaimWorkItemprocessNextVolumeWorkItem

执行流程如下:

PVC的函数调用流程

  1. runClaimWorkerprocessNextClaimWorkItemsyncClaimHandlersyncClaimprovisionClaimOperation

PV的函数调用流程

  1. runVolumeWorkerprocessNextVolumeWorkItemsyncVolumeHandlersyncVolumedeleteVolumeOperation

可见最后执行的函数分别是provisionClaimOperationdeleteVolumeOperation

3.4. Operation

3.4.1. provisionClaimOperation

1、provisionClaimOperation入参是PVC,通过PVC获得PV对象,并判断PV对象是否存在,如果存在则退出后续操作。

  1. // provisionClaimOperation attempts to provision a volume for the given claim.
  2. // Returns error, which indicates whether provisioning should be retried
  3. // (requeue the claim) or not
  4. func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error {
  5. // Most code here is identical to that found in controller.go of kube's PV controller...
  6. claimClass := helper.GetPersistentVolumeClaimClass(claim)
  7. operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)
  8. glog.Infof(logOperation(operation, "started"))
  9. // A previous doProvisionClaim may just have finished while we were waiting for
  10. // the locks. Check that PV (with deterministic name) hasn't been provisioned
  11. // yet.
  12. pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
  13. volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
  14. if err == nil && volume != nil {
  15. // Volume has been already provisioned, nothing to do.
  16. glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))
  17. return nil
  18. }
  19. ...
  20. }

2、获取StorageClass对象中的ProvisionerReclaimPolicy参数,如果provisionerNameStorageClass对象中的provisioner字段不一致则报错并退出执行。

  1. provisioner, parameters, err := ctrl.getStorageClassFields(claimClass)
  2. if err != nil {
  3. glog.Errorf(logOperation(operation, "error getting claim's StorageClass's fields: %v", err))
  4. return nil
  5. }
  6. if provisioner != ctrl.provisionerName {
  7. // class.Provisioner has either changed since shouldProvision() or
  8. // annDynamicallyProvisioned contains different provisioner than
  9. // class.Provisioner.
  10. glog.Errorf(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", provisioner))
  11. return nil
  12. }
  13. // Check if this provisioner can provision this claim.
  14. if err = ctrl.canProvision(claim); err != nil {
  15. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
  16. glog.Errorf(logOperation(operation, "failed to provision volume: %v", err))
  17. return nil
  18. }
  19. reclaimPolicy := v1.PersistentVolumeReclaimDelete
  20. if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.8.0")) {
  21. reclaimPolicy, err = ctrl.fetchReclaimPolicy(claimClass)
  22. if err != nil {
  23. return err
  24. }
  25. }

3、执行具体的provisioner.Provision方法,构建PV对象,例如本文中的provisionernfs-provisioner

  1. options := VolumeOptions{
  2. PersistentVolumeReclaimPolicy: reclaimPolicy,
  3. PVName: pvName,
  4. PVC: claim,
  5. MountOptions: mountOptions,
  6. Parameters: parameters,
  7. SelectedNode: selectedNode,
  8. AllowedTopologies: allowedTopologies,
  9. }
  10. ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))
  11. volume, err = ctrl.provisioner.Provision(options)
  12. if err != nil {
  13. if ierr, ok := err.(*IgnoredError); ok {
  14. // Provision ignored, do nothing and hope another provisioner will provision it.
  15. glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr))
  16. return nil
  17. }
  18. err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
  19. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
  20. return err
  21. }

4、创建k8s的PV对象。

  1. // Try to create the PV object several times
  2. for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
  3. glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name))
  4. if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
  5. // Save succeeded.
  6. if err != nil {
  7. glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
  8. err = nil
  9. } else {
  10. glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name))
  11. }
  12. break
  13. }
  14. // Save failed, try again after a while.
  15. glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
  16. time.Sleep(ctrl.createProvisionedPVInterval)
  17. }

5、创建PV失败,清理存储资源。

  1. if err != nil {
  2. // Save failed. Now we have a storage asset outside of Kubernetes,
  3. // but we don't have appropriate PV object for it.
  4. // Emit some event here and try to delete the storage asset several
  5. // times.
  6. ...
  7. for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
  8. if err = ctrl.provisioner.Delete(volume); err == nil {
  9. // Delete succeeded
  10. glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
  11. break
  12. }
  13. // Delete failed, try again after a while.
  14. glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
  15. time.Sleep(ctrl.createProvisionedPVInterval)
  16. }
  17. if err != nil {
  18. // Delete failed several times. There is an orphaned volume and there
  19. // is nothing we can do about it.
  20. strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
  21. glog.Error(logOperation(operation, strerr))
  22. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
  23. }
  24. }

如果创建成功,则打印成功的日志,并返回nil

3.4.2. deleteVolumeOperation

1、deleteVolumeOperation入参是PV,先获得PV对象,并判断是否需要删除。

  1. // deleteVolumeOperation attempts to delete the volume backing the given
  2. // volume. Returns error, which indicates whether deletion should be retried
  3. // (requeue the volume) or not
  4. func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {
  5. ...
  6. // This method may have been waiting for a volume lock for some time.
  7. // Our check does not have to be as sophisticated as PV controller's, we can
  8. // trust that the PV controller has set the PV to Released/Failed and it's
  9. // ours to delete
  10. newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
  11. if err != nil {
  12. return nil
  13. }
  14. if !ctrl.shouldDelete(newVolume) {
  15. glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping"))
  16. return nil
  17. }
  18. ...
  19. }

2、调用具体的provisionerDelete方法,例如,如果是nfs-provisioner,则是调用nfs-provisioner的Delete方法。

  1. err = ctrl.provisioner.Delete(volume)
  2. if err != nil {
  3. if ierr, ok := err.(*IgnoredError); ok {
  4. // Delete ignored, do nothing and hope another provisioner will delete it.
  5. glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr))
  6. return nil
  7. }
  8. // Delete failed, emit an event.
  9. glog.Errorf(logOperation(operation, "volume deletion failed: %v", err))
  10. ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error())
  11. return err
  12. }

3、删除k8s中的PV对象。

  1. // Delete the volume
  2. if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {
  3. // Oops, could not delete the volume and therefore the controller will
  4. // try to delete the volume again on next update.
  5. glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err))
  6. return err
  7. }

4. 总结

  1. Provisioner接口包含ProvisionDelete两个方法,自定义的provisioner需要实现这两个方法,这两个方法只是处理了跟存储类型相关的事项,并没有针对PVPVC对象的增删等操作。
  2. Provision方法主要用来构造PV对象,不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如nfs-provisioner对应的PersistentVolumeSourceNFS,并且需要传入NFS相关的参数:ServerPath等。
  3. Delete方法主要针对对应的存储类型,做数据存档(备份)或删除的处理。
  4. StorageClass对象需要单独创建,用来指定具体的provisioner来执行相关逻辑。
  5. provisionClaimOperationdeleteVolumeOperation具体执行了k8s中PV对象的创建和删除操作,同时调用了具体provisionerProvisionDelete两个方法来对存储数据做处理。

参考文章