Overview of Kubeflow Pipelines

Understanding the goals and main concepts of Kubeflow Pipelines

Beta

This Kubeflow component has beta status. See the Kubeflow versioning policies. The Kubeflow team is interested in your feedback about the usability of the feature.

Kubeflow Pipelines is a platform for building and deploying portable,scalable machine learning (ML) workflows based on Docker containers.

Quickstart

Run your first pipeline by following thepipelines quickstart guide.

What is Kubeflow Pipelines?

The Kubeflow Pipelines platform consists of:

  • A user interface (UI) for managing and tracking experiments, jobs, and runs.
  • An engine for scheduling multi-step ML workflows.
  • An SDK for defining and manipulating pipelines and components.
  • Notebooks for interacting with the system using the SDK.

The following are the goals of Kubeflow Pipelines:

  • End-to-end orchestration: enabling and simplifying the orchestration ofmachine learning pipelines.
  • Easy experimentation: making it easy for you to try numerous ideas andtechniques and manage your various trials/experiments.
  • Easy re-use: enabling you to re-use components and pipelines to quicklycreate end-to-end solutions without having to rebuild each time.

InKubeflow v0.1.3 and later,Kubeflow Pipelines is one of the Kubeflow core components. It’s automatically deployed during Kubeflow deployment.

Due tokubeflow/pipelines#1700,the container builder in Kubeflow Pipelines currently prepares credentials forGoogle Cloud Platform (GCP) only. As a result, the container builder supportsonly Google Container Registry. However, you can store the container images onother registries, provided you set up the credentials correctly to fetchthe image.

What is a pipeline?

A pipeline is a description of an ML workflow, including all of the componentsin the workflow and how they combine in the form of a graph. (See thescreenshot below showing an example of a pipeline graph.) The pipelineincludes the definition of the inputs (parameters) required to run the pipelineand the inputs and outputs of each component.

After developing your pipeline, you can upload and share it on theKubeflow Pipelines UI.

A pipeline component is a self-contained set of user code, packaged as aDocker image, thatperforms one step in the pipeline. For example, a component can be responsiblefor data preprocessing, data transformation, model training, and so on.

See the conceptual guides to pipelinesand components.

Example of a pipeline

The screenshots and code below show the xgboost-training-cm.py pipeline, whichcreates an XGBoost model using structured data in CSV format. You can see thesource code and other information about the pipeline onGitHub.

The runtime execution graph of the pipeline

The screenshot below shows the example pipeline’s runtime execution graph in theKubeflow Pipelines UI:

XGBoost results on the pipelines UI

The Python code that represents the pipeline

Below is an extract from the Python code that defines thexgboost-training-cm.py pipeline. You can see the full code onGitHub.

  1. @dsl.pipeline(
  2. name='XGBoost Trainer',
  3. description='A trainer that does end-to-end distributed training for XGBoost models.'
  4. )
  5. def xgb_train_pipeline(
  6. output='gs://your-gcs-bucket',
  7. project='your-gcp-project',
  8. cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER,
  9. region='us-central1',
  10. train_data='gs://ml-pipeline-playground/sfpd/train.csv',
  11. eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
  12. schema='gs://ml-pipeline-playground/sfpd/schema.json',
  13. target='resolution',
  14. rounds=200,
  15. workers=2,
  16. true_label='ACTION',
  17. ):
  18. output_template = str(output) + '/' + dsl.RUN_ID_PLACEHOLDER + '/data'
  19. # Current GCP pyspark/spark op do not provide outputs as return values, instead,
  20. # we need to use strings to pass the uri around.
  21. analyze_output = output_template
  22. transform_output_train = os.path.join(output_template, 'train', 'part-*')
  23. transform_output_eval = os.path.join(output_template, 'eval', 'part-*')
  24. train_output = os.path.join(output_template, 'train_output')
  25. predict_output = os.path.join(output_template, 'predict_output')
  26. with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op(
  27. project_id=project,
  28. region=region,
  29. name=cluster_name
  30. )):
  31. _create_cluster_op = dataproc_create_cluster_op(
  32. project_id=project,
  33. region=region,
  34. name=cluster_name,
  35. initialization_actions=[
  36. os.path.join(_PYSRC_PREFIX,
  37. 'initialization_actions.sh'),
  38. ],
  39. image_version='1.2'
  40. )
  41. _analyze_op = dataproc_analyze_op(
  42. project=project,
  43. region=region,
  44. cluster_name=cluster_name,
  45. schema=schema,
  46. train_data=train_data,
  47. output=output_template
  48. ).after(_create_cluster_op).set_display_name('Analyzer')
  49. _transform_op = dataproc_transform_op(
  50. project=project,
  51. region=region,
  52. cluster_name=cluster_name,
  53. train_data=train_data,
  54. eval_data=eval_data,
  55. target=target,
  56. analysis=analyze_output,
  57. output=output_template
  58. ).after(_analyze_op).set_display_name('Transformer')
  59. _train_op = dataproc_train_op(
  60. project=project,
  61. region=region,
  62. cluster_name=cluster_name,
  63. train_data=transform_output_train,
  64. eval_data=transform_output_eval,
  65. target=target,
  66. analysis=analyze_output,
  67. workers=workers,
  68. rounds=rounds,
  69. output=train_output
  70. ).after(_transform_op).set_display_name('Trainer')
  71. _predict_op = dataproc_predict_op(
  72. project=project,
  73. region=region,
  74. cluster_name=cluster_name,
  75. data=transform_output_eval,
  76. model=train_output,
  77. target=target,
  78. analysis=analyze_output,
  79. output=predict_output
  80. ).after(_train_op).set_display_name('Predictor')
  81. _cm_op = confusion_matrix_op(
  82. predictions=os.path.join(predict_output, 'part-*.csv'),
  83. output_dir=output_template
  84. ).after(_predict_op)
  85. _roc_op = roc_op(
  86. predictions_dir=os.path.join(predict_output, 'part-*.csv'),
  87. true_class=true_label,
  88. true_score_column=true_label,
  89. output_dir=output_template
  90. ).after(_predict_op)
  91. dsl.get_pipeline_conf().add_op_transformer(
  92. gcp.use_gcp_secret('user-gcp-sa'))

Pipeline input data on the Kubeflow Pipelines UI

The partial screenshot below shows the Kubeflow Pipelines UI for kicking off arun of the pipeline. The pipeline definition in your code determines whichparameters appear in the UI form. The pipeline definition can also set defaultvalues for the parameters:

Starting the XGBoost run on the pipelines UI

Outputs from the pipeline

The following screenshots show examples of the pipeline output visible onthe Kubeflow Pipelines UI.

Prediction results:

Prediction output

Confusion matrix:

Confusion matrix

Receiver operating characteristics (ROC) curve:

ROC

Architectural overview

Pipelines architectural diagram

At a high level, the execution of a pipeline proceeds as follows:

  • Python SDK: You create components or specify a pipeline using the KubeflowPipelines domain-specific language(DSL).
  • DSL compiler: TheDSL compilertransforms your pipeline’s Python code into a static configuration (YAML).
  • Pipeline Service: You call the Pipeline Service to create apipeline run from the static configuration.
  • Kubernetes resources: The Pipeline Service calls the Kubernetes APIserver to create the necessary Kubernetes resources(CRDs)to run the pipeline.
  • Orchestration controllers: A set of orchestration controllersexecute the containers needed to complete the pipeline execution specifiedby the Kubernetes resources(CRDs).The containers execute within Kubernetes Pods on virtual machines. Anexample controller is the Argo Workflow controller, whichorchestrates task-driven workflows.
  • Artifact storage: The Pods store two kinds of data:

    • Metadata: Experiments, jobs, runs, etc. Also single scalar metrics,generally aggregated for the purposes of sorting and filtering.Kubeflow Pipelines stores the metadata in a MySQL database.
    • Artifacts: Pipeline packages, views, etc. Alsolarge-scale metrics like time series, usually used for investigating anindividual run’s performance and for debugging. Kubeflow Pipelinesstores the artifacts in an artifact store likeMinio server orCloud Storage.The MySQL database and the Minio server are both backed by the KubernetesPersistentVolume(PV) subsystem.
  • Persistence agent and ML metadata: The Pipeline Persistence Agentwatches the Kubernetes resources created by the Pipeline Service andpersists the state of these resources in the ML Metadata Service. ThePipeline Persistence Agent records the set of containers that executed aswell as their inputs and outputs. The input/output consists of eithercontainer parameters or data artifact URIs.

  • Pipeline web server: The Pipeline web server gathers data from variousservices to display relevant views: the list of pipelines currently running,the history of pipeline execution, the list of data artifacts, debugginginformation about individual pipeline runs, execution status about individualpipeline runs.

Next steps

Feedback

Was this page helpful?

Glad to hear it! Please tell us how we can improve.

Sorry to hear that. Please tell us how we can improve.

Last modified 12.02.2020: fix link in pipeline overview (#1679) (c380e917)