本文主要分析csi-provisioner的源码,关于开发一个Dynamic Provisioner,具体可参考nfs-client-provisioner的源码分析

1. Dynamic Provisioner

1.1. Provisioner Interface

开发Dynamic Provisioner需要实现Provisioner接口,该接口有两个方法,分别是:

  • Provision:创建存储资源,并且返回一个PV对象。
  • Delete:移除对应的存储资源,但并没有删除PV对象。

1.2. 开发provisioner的步骤

  1. 写一个provisioner实现Provisioner接口(包含ProvisionDelete的方法)。
  2. 通过该provisioner构建ProvisionController
  3. 执行ProvisionControllerRun方法。

2. CSI Provisioner

CSI Provisioner的源码可参考:https://github.com/kubernetes-csi/external-provisioner。

2.1. Main 函数

2.1.1. 读取环境变量

源码如下:

  1. var (
  2. provisioner = flag.String("provisioner", "", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")
  3. master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
  4. kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
  5. csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume")
  6. connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
  7. volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume")
  8. volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
  9. showVersion = flag.Bool("version", false, "Show version.")
  10. provisionController *controller.ProvisionController
  11. version = "unknown"
  12. )
  13. func init() {
  14. var config *rest.Config
  15. var err error
  16. flag.Parse()
  17. flag.Set("logtostderr", "true")
  18. if *showVersion {
  19. fmt.Println(os.Args[0], version)
  20. os.Exit(0)
  21. }
  22. glog.Infof("Version: %s", version)
  23. ...
  24. }

通过init函数解析相关参数,其实provisioner指明为PVC提供PV的provisioner的名字,需要和StorageClass对象中的provisioner字段一致。

2.1.2. 获取clientset对象

源码如下:

  1. // get the KUBECONFIG from env if specified (useful for local/debug cluster)
  2. kubeconfigEnv := os.Getenv("KUBECONFIG")
  3. if kubeconfigEnv != "" {
  4. glog.Infof("Found KUBECONFIG environment variable set, using that..")
  5. kubeconfig = &kubeconfigEnv
  6. }
  7. if *master != "" || *kubeconfig != "" {
  8. glog.Infof("Either master or kubeconfig specified. building kube config from that..")
  9. config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
  10. } else {
  11. glog.Infof("Building kube configs for running in cluster...")
  12. config, err = rest.InClusterConfig()
  13. }
  14. if err != nil {
  15. glog.Fatalf("Failed to create config: %v", err)
  16. }
  17. clientset, err := kubernetes.NewForConfig(config)
  18. if err != nil {
  19. glog.Fatalf("Failed to create client: %v", err)
  20. }
  21. // snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1alpha1Client
  22. snapClient, err := snapclientset.NewForConfig(config)
  23. if err != nil {
  24. glog.Fatalf("Failed to create snapshot client: %v", err)
  25. }
  26. csiAPIClient, err := csiclientset.NewForConfig(config)
  27. if err != nil {
  28. glog.Fatalf("Failed to create CSI API client: %v", err)
  29. }

通过读取对应的k8s的配置,创建clientset对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。

2.1.3. k8s版本校验

  1. // The controller needs to know what the server version is because out-of-tree
  2. // provisioners aren't officially supported until 1.5
  3. serverVersion, err := clientset.Discovery().ServerVersion()
  4. if err != nil {
  5. glog.Fatalf("Error getting server version: %v", err)
  6. }

获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。

2.1.4. 连接 csi socket

  1. // Generate a unique ID for this provisioner
  2. timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
  3. identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + *provisioner
  4. // Provisioner will stay in Init until driver opens csi socket, once it's done
  5. // controller will exit this loop and proceed normally.
  6. socketDown := true
  7. grpcClient := &grpc.ClientConn{}
  8. for socketDown {
  9. grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
  10. if err == nil {
  11. socketDown = false
  12. continue
  13. }
  14. time.Sleep(10 * time.Second)
  15. }

Provisioner会停留在初始化状态,直到csi socket连接成功才正常运行。如果连接失败,会暂停10秒后重试,其中涉及以下2个参数:

  • csiEndpoint:CSI Volume的gRPC地址,默认通过为/run/csi/socket
  • connectionTimeout:连接CSI driver socket的超时时间,默认为10秒。

2.1.5. 构造csi-Provisioner对象

  1. // Create the provisioner: it implements the Provisioner interface expected by
  2. // the controller
  3. csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
  4. provisionController = controller.NewProvisionController(
  5. clientset,
  6. *provisioner,
  7. csiProvisioner,
  8. serverVersion.GitVersion,
  9. )

通过参数clientset,csiAPIClient, csiEndpoint, connectionTimeout, identity, volumeNamePrefix, volumeNameUUIDLength,grpcClient, snapClient构造csi-Provisioner对象。

通过csiProvisioner构造ProvisionController对象。

2.1.6. 运行ProvisionController

  1. func main() {
  2. provisionController.Run(wait.NeverStop)
  3. }

ProvisionController实现了具体的PV和PVC的相关逻辑,Run方法以常驻进程的方式运行。

2.2. ProvisionDelete方法

2.2.1. Provision方法

csiProvisionerProvision方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L336

Provision方法用来创建存储资源,并且返回一个PV对象。其中入参是VolumeOptions,用来指定PV对象的相关属性。

1、构造PV相关属性

  1. pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
  2. if err != nil {
  3. return nil, err
  4. }

2、构造CSIPersistentVolumeSource相关属性

  1. driverState, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport)
  2. if err != nil {
  3. return nil, err
  4. }
  5. ...
  6. // Resolve controller publish, node stage, node publish secret references
  7. controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretNameKey, controllerPublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
  8. if err != nil {
  9. return nil, err
  10. }
  11. nodeStageSecretRef, err := getSecretReference(nodeStageSecretNameKey, nodeStageSecretNamespaceKey, options.Parameters, pvName, options.PVC)
  12. if err != nil {
  13. return nil, err
  14. }
  15. nodePublishSecretRef, err := getSecretReference(nodePublishSecretNameKey, nodePublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
  16. if err != nil {
  17. return nil, err
  18. }
  19. ...
  20. volumeAttributes := map[string]string{provisionerIDKey: p.identity}
  21. for k, v := range rep.Volume.Attributes {
  22. volumeAttributes[k] = v
  23. }
  24. ...
  25. fsType := ""
  26. for k, v := range options.Parameters {
  27. switch strings.ToLower(k) {
  28. case "fstype":
  29. fsType = v
  30. }
  31. }
  32. if len(fsType) == 0 {
  33. fsType = defaultFSType
  34. }

3、创建CSI CreateVolumeRequest

  1. // Create a CSI CreateVolumeRequest and Response
  2. req := csi.CreateVolumeRequest{
  3. Name: pvName,
  4. Parameters: options.Parameters,
  5. VolumeCapabilities: volumeCaps,
  6. CapacityRange: &csi.CapacityRange{
  7. RequiredBytes: int64(volSizeBytes),
  8. },
  9. }
  10. ...
  11. glog.V(5).Infof("CreateVolumeRequest %+v", req)
  12. rep := &csi.CreateVolumeResponse{}
  13. ...
  14. opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
  15. err = wait.ExponentialBackoff(opts, func() (bool, error) {
  16. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
  17. defer cancel()
  18. rep, err = p.csiClient.CreateVolume(ctx, &req)
  19. if err == nil {
  20. // CreateVolume has finished successfully
  21. return true, nil
  22. }
  23. if status, ok := status.FromError(err); ok {
  24. if status.Code() == codes.DeadlineExceeded {
  25. // CreateVolume timed out, give it another chance to complete
  26. glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())
  27. return false, nil
  28. }
  29. }
  30. // CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
  31. return false, err
  32. })
  33. if err != nil {
  34. return nil, err
  35. }
  36. if rep.Volume != nil {
  37. glog.V(3).Infof("create volume rep: %+v", *rep.Volume)
  38. }
  39. respCap := rep.GetVolume().GetCapacityBytes()
  40. if respCap < volSizeBytes {
  41. capErr := fmt.Errorf("created volume capacity %v less than requested capacity %v", respCap, volSizeBytes)
  42. delReq := &csi.DeleteVolumeRequest{
  43. VolumeId: rep.GetVolume().GetId(),
  44. }
  45. delReq.ControllerDeleteSecrets = provisionerCredentials
  46. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
  47. defer cancel()
  48. _, err := p.csiClient.DeleteVolume(ctx, delReq)
  49. if err != nil {
  50. capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err)
  51. }
  52. return nil, capErr
  53. }

Provison方法核心功能是调用p.csiClient.CreateVolume(ctx, &req)

4、构造PV对象

  1. pv := &v1.PersistentVolume{
  2. ObjectMeta: metav1.ObjectMeta{
  3. Name: pvName,
  4. },
  5. Spec: v1.PersistentVolumeSpec{
  6. PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
  7. AccessModes: options.PVC.Spec.AccessModes,
  8. Capacity: v1.ResourceList{
  9. v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),
  10. },
  11. // TODO wait for CSI VolumeSource API
  12. PersistentVolumeSource: v1.PersistentVolumeSource{
  13. CSI: &v1.CSIPersistentVolumeSource{
  14. Driver: driverState.driverName,
  15. VolumeHandle: p.volumeIdToHandle(rep.Volume.Id),
  16. FSType: fsType,
  17. VolumeAttributes: volumeAttributes,
  18. ControllerPublishSecretRef: controllerPublishSecretRef,
  19. NodeStageSecretRef: nodeStageSecretRef,
  20. NodePublishSecretRef: nodePublishSecretRef,
  21. },
  22. },
  23. },
  24. }
  25. if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) {
  26. pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
  27. }
  28. glog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)
  29. return pv, nil

Provision方法只是通过VolumeOptions参数来构建PV对象,并没有执行具体PV的创建或删除的操作。

不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如csi-provisioner对应的PersistentVolumeSourceCSI,并且需要传入CSI相关的参数:

  • Driver
  • VolumeHandle
  • FSType
  • VolumeAttributes
  • ControllerPublishSecretRef
  • NodeStageSecretRef
  • NodePublishSecretRef

2.2.2. Delete方法

csiProvisionerdelete方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L606

  1. func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
  2. if volume == nil || volume.Spec.CSI == nil {
  3. return fmt.Errorf("invalid CSI PV")
  4. }
  5. volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)
  6. _, err := checkDriverState(p.grpcClient, p.timeout, false)
  7. if err != nil {
  8. return err
  9. }
  10. req := csi.DeleteVolumeRequest{
  11. VolumeId: volumeId,
  12. }
  13. // get secrets if StorageClass specifies it
  14. storageClassName := volume.Spec.StorageClassName
  15. if len(storageClassName) != 0 {
  16. if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil {
  17. // Resolve provision secret credentials.
  18. // No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
  19. provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil)
  20. if err != nil {
  21. return err
  22. }
  23. credentials, err := getCredentials(p.client, provisionerSecretRef)
  24. if err != nil {
  25. return err
  26. }
  27. req.ControllerDeleteSecrets = credentials
  28. }
  29. }
  30. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
  31. defer cancel()
  32. _, err = p.csiClient.DeleteVolume(ctx, &req)
  33. return err
  34. }

Delete方法主要是调用了p.csiClient.DeleteVolume(ctx, &req)方法。

2.3. 总结

csi provisioner实现了Provisioner接口,其中包含ProvisonDelete两个方法:

  • Provision:调用csiClient.CreateVolume方法,同时构造并返回PV对象。
  • Delete:调用csiClient.DeleteVolume方法。

csi provisioner的核心方法都调用了csi-client相关方法。

3. csi-client

csi client的相关代码参考:https://github.com/container-storage-interface/spec/blob/master/lib/go/csi/v0/csi.pb.go

3.1. 构造csi-client

3.1.1. 构造grpcClient

  1. // Provisioner will stay in Init until driver opens csi socket, once it's done
  2. // controller will exit this loop and proceed normally.
  3. socketDown := true
  4. grpcClient := &grpc.ClientConn{}
  5. for socketDown {
  6. grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
  7. if err == nil {
  8. socketDown = false
  9. continue
  10. }
  11. time.Sleep(10 * time.Second)
  12. }

通过连接csi socket,连接成功才构造可用的grpcClient

3.1.2. 构造csi-client

通过grpcClient构造csi-client

  1. // Create the provisioner: it implements the Provisioner interface expected by
  2. // the controller
  3. csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)

NewCSIProvisioner

  1. // NewCSIProvisioner creates new CSI provisioner
  2. func NewCSIProvisioner(client kubernetes.Interface,
  3. csiAPIClient csiclientset.Interface,
  4. csiEndpoint string,
  5. connectionTimeout time.Duration,
  6. identity string,
  7. volumeNamePrefix string,
  8. volumeNameUUIDLength int,
  9. grpcClient *grpc.ClientConn,
  10. snapshotClient snapclientset.Interface) controller.Provisioner {
  11. csiClient := csi.NewControllerClient(grpcClient)
  12. provisioner := &csiProvisioner{
  13. client: client,
  14. grpcClient: grpcClient,
  15. csiClient: csiClient,
  16. csiAPIClient: csiAPIClient,
  17. snapshotClient: snapshotClient,
  18. timeout: connectionTimeout,
  19. identity: identity,
  20. volumeNamePrefix: volumeNamePrefix,
  21. volumeNameUUIDLength: volumeNameUUIDLength,
  22. }
  23. return provisioner
  24. }

NewControllerClient

  1. csiClient := csi.NewControllerClient(grpcClient)
  2. ...
  3. type controllerClient struct {
  4. cc *grpc.ClientConn
  5. }
  6. func NewControllerClient(cc *grpc.ClientConn) ControllerClient {
  7. return &controllerClient{cc}
  8. }

3.2. csiClient.CreateVolume

csi provisoner中调用csiClient.CreateVolume代码如下:

  1. opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
  2. err = wait.ExponentialBackoff(opts, func() (bool, error) {
  3. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
  4. defer cancel()
  5. rep, err = p.csiClient.CreateVolume(ctx, &req)
  6. if err == nil {
  7. // CreateVolume has finished successfully
  8. return true, nil
  9. }
  10. if status, ok := status.FromError(err); ok {
  11. if status.Code() == codes.DeadlineExceeded {
  12. // CreateVolume timed out, give it another chance to complete
  13. glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())
  14. return false, nil
  15. }
  16. }
  17. // CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
  18. return false, err
  19. })

CreateVolumeRequest的构造:

  1. // Create a CSI CreateVolumeRequest and Response
  2. req := csi.CreateVolumeRequest{
  3. Name: pvName,
  4. Parameters: options.Parameters,
  5. VolumeCapabilities: volumeCaps,
  6. CapacityRange: &csi.CapacityRange{
  7. RequiredBytes: int64(volSizeBytes),
  8. },
  9. }
  10. ...
  11. req.VolumeContentSource = volumeContentSource
  12. ...
  13. req.AccessibilityRequirements = requirements
  14. ...
  15. req.ControllerCreateSecrets = provisionerCredentials

具体的Create实现方法如下:

其中csiClient是个接口类型

具体代码参考controllerClient.CreateVolume

  1. func (c *controllerClient) CreateVolume(ctx context.Context, in *CreateVolumeRequest, opts ...grpc.CallOption) (*CreateVolumeResponse, error) {
  2. out := new(CreateVolumeResponse)
  3. err := grpc.Invoke(ctx, "/csi.v0.Controller/CreateVolume", in, out, c.cc, opts...)
  4. if err != nil {
  5. return nil, err
  6. }
  7. return out, nil
  8. }

3.3. csiClient.DeleteVolume

csi provisoner中调用csiClient.DeleteVolume代码如下:

  1. func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
  2. ...
  3. req := csi.DeleteVolumeRequest{
  4. VolumeId: volumeId,
  5. }
  6. // get secrets if StorageClass specifies it
  7. ...
  8. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
  9. defer cancel()
  10. _, err = p.csiClient.DeleteVolume(ctx, &req)
  11. return err
  12. }

DeleteVolumeRequest的构造:

  1. req := csi.DeleteVolumeRequest{
  2. VolumeId: volumeId,
  3. }
  4. ...
  5. req.ControllerDeleteSecrets = credentials

将构造的DeleteVolumeRequest传给DeleteVolume方法。

具体的Delete实现方法如下:

具体代码参考:controllerClient.DeleteVolume

  1. func (c *controllerClient) DeleteVolume(ctx context.Context, in *DeleteVolumeRequest, opts ...grpc.CallOption) (*DeleteVolumeResponse, error) {
  2. out := new(DeleteVolumeResponse)
  3. err := grpc.Invoke(ctx, "/csi.v0.Controller/DeleteVolume", in, out, c.cc, opts...)
  4. if err != nil {
  5. return nil, err
  6. }
  7. return out, nil
  8. }

4. ProvisionController.Run

自定义的provisioner实现了Provisoner接口ProvisionDelete方法,这两个方法主要对后端存储做创建和删除操作,并没有对PV对象进行创建和删除操作。

PV对象的相关操作具体由ProvisionController中的provisionClaimOperationdeleteVolumeOperation具体执行,同时调用了具体provisionerProvisionDelete两个方法来对存储数据做处理。

  1. func main() {
  2. provisionController.Run(wait.NeverStop)
  3. }

这块代码逻辑可参考:nfs-client-provisioner 源码分析

参考文章: