Reconciler Implementation and Design

Reconciler Functionality

General steps the reconciliation process needs to cover:

  1. Update the ObservedGeneration and initialize the Status conditions (as defined in samplesource_lifecycle.go and samplesource_types.go)
  1. src.Status.InitializeConditions()
  2. src.Status.ObservedGeneration = src.Generation
  1. Create/reconcile the Receive Adapter (detailed below)
  2. If successful, update the Status and MarkDeployed
  1. src.Status.PropagateDeploymentAvailability(ra)
  1. Create/reconcile the SinkBinding for the Receive Adapter targeting the Sink (detailed below)
  2. MarkSink with the result
  1. src.Status.MarkSink(sb.Status.SinkURI)
  1. Return a new reconciler event stating that the process is done
  1. return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)

Reconcile/Create The Receive Adapter

As part of the source reconciliation, we have to create and deploy (and update if necessary) the underlying receive adapter.

Verify the specified kubernetes resources are valid, and update the Status accordingly

Assemble the ReceiveAdapterArgs

  1. raArgs := resources.ReceiveAdapterArgs{
  2. EventSource: src.Namespace + "/" + src.Name,
  3. Image: r.ReceiveAdapterImage,
  4. Source: src,
  5. Labels: resources.Labels(src.Name),
  6. AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
  7. }

NB The exact arguments may change based on functional requirements Create the underlying deployment from the arguments provided, matching pod templates, labels, owner references, etc as needed to fill out the deployment Example: pkg/reconciler/sample/resources/receive_adapter.go

  1. Fetch the existing receive adapter deployment
  1. namespace := owner.GetObjectMeta().GetNamespace()
  2. ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{})
  1. Otherwise, create the deployment
  1. ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected)
  1. Check if the expected vs existing spec is different, and update the deployment if required
  1. } else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
  2. ra.Spec.Template.Spec = expected.Spec.Template.Spec
  3. if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
  4. return ra, err
  5. }
  1. If updated, record the event
  1. return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name)

Reconcile/Create The SinkBinding

Instead of directly giving the details of the sink to the receive adapter, use a SinkBinding to bind the receive adapter with the sink.

Steps here are almost the same with the Deployment reconciliation above, but it is for another resource, SinkBinding.

  1. Create a Reference for the receive adapter deployment. This deployment will be SinkBinding’s source:
  1. tracker.Reference{
  2. APIVersion: appsv1.SchemeGroupVersion.String(),
  3. Kind: "Deployment",
  4. Namespace: ra.Namespace,
  5. Name: ra.Name,
  6. }
  1. Fetch the existing SinkBinding
  1. namespace := owner.GetObjectMeta().GetNamespace()
  2. sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{})
  1. If it doesn’t exist, create it
  1. sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected)
  1. Check if the expected vs existing spec is different, and update the SinkBinding if required
  1. else if r.specChanged(sb.Spec, expected.Spec) {
  2. sb.Spec = expected.Spec
  3. if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil {
  4. return sb, err
  5. }
  1. If updated, record the event
  1. return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name)