Building Python function-based components

Building your own lightweight pipelines components using Python

Run in Google Colab View source on GitHub

A Kubeflow Pipelines component is a self-contained set of code that performs one step in your ML workflow. A pipeline component is composed of:

  • The component code, which implements the logic needed to perform a step in your ML workflow.

  • A component specification, which defines the following:

    • The component’s metadata, its name and description.
    • The component’s interface, the component’s inputs and outputs.
    • The component’s implementation, the Docker container image to run, how to pass inputs to your component code, and how to get the component’s outputs.

Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you. This document describes how to build Python function-based components and use them in your pipeline.

Before you begin

  1. Run the following command to install the Kubeflow Pipelines SDK.
  1. $ pip3 install kfp --upgrade
  1. Import the kfp and kfp.components packages.
  1. import kfp
  2. import kfp.components as comp
  1. Create an instance of the kfp.Client class.
  1. # If you run this command on a Jupyter notebook running on Kubeflow, you can
  2. # exclude the host parameter.
  3. # client = kfp.Client()
  4. client = kfp.Client(host='<your-kubeflow-pipelines-host-name>')

For more information about the Kubeflow Pipelines SDK, see the SDK reference guide.

Getting started with Python function-based components

This section demonstrates how to get started building Python function-based components by walking through the process of creating a simple component.

  1. Define your component’s code as a standalone python function. In this example, the function adds two floats and returns the sum of the two arguments.
  1. def add(a: float, b: float) -> float:
  2. '''Calculates sum of two arguments'''
  3. return a + b
  1. Use kfp.components.create_component_from_func to generate the component specification YAML and return a factory function that you can use to create kfp.dsl.ContainerOp class instances for your pipeline. The component specification YAML is a reusable and shareable definition of your component.
  1. add_op = comp.create_component_from_func(
  2. add, output_component_file='add_component.yaml')
  1. Create and run your pipeline. Learn more about creating and running pipelines.
  1. import kfp.dsl as dsl
  2. @dsl.pipeline(
  3. name='Addition pipeline',
  4. description='An example pipeline that performs addition calculations.'
  5. )
  6. def add_pipeline(
  7. a='3',
  8. b='7',
  9. ):
  10. # Passes a pipeline parameter and a constant value to the `add_op` factory
  11. # function.
  12. first_add_task = add_op(a, 4)
  13. # Passes an output reference from `first_add_task` and a pipeline parameter
  14. # to the `add_op` factory function. For operations with a single return
  15. # value, the output reference can be accessed as `task.output` or
  16. # `task.outputs['output_name']`.
  17. second_add_task = add_op(first_add_task.output, b)
  18. # Specify argument values for your pipeline run.
  19. arguments = {'a': '7', 'b': '8'}
  20. # Create a pipeline run, using the client you initialized in a prior step.
  21. client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)

Building Python function-based components

Use the following instructions to build a Python function-based component:

  1. Define a stand-alone Python function. This function must meet the following requirements:

  1. from typing import NamedTuple
  2. def multiple_return_values_example(a: float, b: float) -> NamedTuple(
  3. 'ExampleOutputs',
  4. [
  5. ('sum', float),
  6. ('product', float),
  7. ('mlpipeline_ui_metadata', 'UI_metadata'),
  8. ('mlpipeline_metrics', 'Metrics')
  9. ]):
  10. """Example function that demonstrates how to return multiple values."""
  11. sum_value = a + b
  12. product_value = a * b
  13. # Export a sample tensorboard
  14. metadata = {
  15. 'outputs' : [{
  16. 'type': 'tensorboard',
  17. 'source': 'gs://ml-pipeline-dataset/tensorboard-train',
  18. }]
  19. }
  20. # Export two metrics
  21. metrics = {
  22. 'metrics': [
  23. {
  24. 'name': 'sum',
  25. 'numberValue': float(sum_value),
  26. },{
  27. 'name': 'product',
  28. 'numberValue': float(product_value),
  29. }
  30. ]
  31. }
  32. from collections import namedtuple
  33. example_output = namedtuple(
  34. 'ExampleOutputs',
  35. ['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
  36. return example_output(sum_value, product_value, metadata, metrics)
  1. (Optional.) If your function has complex dependencies, choose or build a container image for your Python function to run in. Learn more about selecting or building your component’s container image.

  2. Call kfp.components.create_component_from_func(func) to convert your function into a pipeline component.

    • func: The Python function to convert.
    • base_image: (Optional.) Specify the Docker container image to run this function in. Learn more about selecting or building a container image.
    • output_component_file: (Optional.) Writes your component definition to a file. You can use this file to share the component with colleagues or reuse it in different pipelines.
    • packages_to_install: (Optional.) A list of versioned Python packages to install before running your function.

Using and installing Python packages

When Kubeflow Pipelines runs your pipeline, each component runs within a Docker container image on a Kubernetes Pod. To load the packages that your Python function depends on, one of the following must be true:

  • The package must be installed on the container image.
  • The package must be defined using the packages_to_install parameter of the kfp.components.create_component_from_func(func) function.
  • Your function must install the package. For example, your function can use the subprocess module to run a command like pip install that installs a package.

Selecting or building a container image

Currently, if you do not specify a container image, your Python-function based component uses the python:3.7 container image. If your function has complex dependencies, you may benefit from using a container image that has your dependencies preinstalled, or building a custom container image. Preinstalling your dependencies reduces the amount of time that your component runs in, since your component does not need to download and install packages each time it runs.

Many frameworks, such as TensorFlow and PyTorch, and cloud service providers offer prebuilt container images that have common dependencies installed.

If a prebuilt container is not available, you can build a custom container image with your Python function’s dependencies. For more information about building a custom container, read the Dockerfile reference guide in the Docker documentation.

If you build or select a container image, instead of using the default container image, the container image must use Python 3.5 or later.

Example Python function-based component

This section demonstrates how to build a Python function-based component that uses imports, helper functions, and produces multiple outputs.

  1. Define your function. This example function uses the numpy package to calcuate the quotient and remainder for a given dividend and divisor in a helper function. In addition to the quotient and remainder, the function also returns metadata for visualization and two metrics.
  1. from typing import NamedTuple
  2. def my_divmod(dividend: float, divisor: float) -> NamedTuple(
  3. 'MyDivmodOutput',
  4. [
  5. ('quotient', float),
  6. ('remainder', float),
  7. ('mlpipeline_ui_metadata', 'UI_metadata'),
  8. ('mlpipeline_metrics', 'Metrics')
  9. ]):
  10. '''Divides two numbers and calculate the quotient and remainder'''
  11. # Import the numpy package inside the component function
  12. import numpy as np
  13. # Define a helper function
  14. def divmod_helper(dividend, divisor):
  15. return np.divmod(dividend, divisor)
  16. (quotient, remainder) = divmod_helper(dividend, divisor)
  17. from tensorflow.python.lib.io import file_io
  18. import json
  19. # Export a sample tensorboard
  20. metadata = {
  21. 'outputs' : [{
  22. 'type': 'tensorboard',
  23. 'source': 'gs://ml-pipeline-dataset/tensorboard-train',
  24. }]
  25. }
  26. # Export two metrics
  27. metrics = {
  28. 'metrics': [{
  29. 'name': 'quotient',
  30. 'numberValue': float(quotient),
  31. },{
  32. 'name': 'remainder',
  33. 'numberValue': float(remainder),
  34. }]}
  35. from collections import namedtuple
  36. divmod_output = namedtuple(
  37. 'MyDivmodOutput',
  38. ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
  39. return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))
  1. Test your function by running it directly, or with unit tests.
  1. my_divmod(100, 7)
  1. This should return a result like the following:

    1. MyDivmodOutput(quotient=14, remainder=2, mlpipeline_ui_metadata='{"outputs": [{"type": "tensorboard", "source": "gs://ml-pipeline-dataset/tensorboard-train"}]}', mlpipeline_metrics='{"metrics": [{"name": "quotient", "numberValue": 14.0}, {"name": "remainder", "numberValue": 2.0}]}')
  2. Use kfp.components.create_component_from_func to return a factory function that you can use to create kfp.dsl.ContainerOp class instances for your pipeline. This example also specifies the base container image to run this function in.

  1. divmod_op = comp.func_to_container_op(
  2. my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
  1. Define your pipeline. This example uses the divmod_op factory function and the add_op factory function from an earlier example.
  1. import kfp.dsl as dsl
  2. @dsl.pipeline(
  3. name='Calculation pipeline',
  4. description='An example pipeline that performs arithmetic calculations.'
  5. )
  6. def calc_pipeline(
  7. a='1',
  8. b='7',
  9. c='17',
  10. ):
  11. # Passes a pipeline parameter and a constant value as operation arguments.
  12. add_task = add_op(a, 4) # The add_op factory function returns
  13. # a dsl.ContainerOp class instance.
  14. # Passes the output of the add_task and a pipeline parameter as operation
  15. # arguments. For an operation with a single return value, the output
  16. # reference are accessed using `task.output` or
  17. # `task.outputs['output_name']`.
  18. divmod_task = divmod_op(add_task.output, b)
  19. # For an operation with multiple return values, output references are
  20. # accessed as `task.outputs['output_name']`.
  21. result_task = add_op(divmod_task.outputs['quotient'], c)
  1. Create and run your pipeline. Learn more about creating and running pipelines.
  1. # Specify pipeline argument values
  2. arguments = {'a': '7', 'b': '8'}
  3. # Submit a pipeline run
  4. client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

Run in Google Colab View source on GitHub

Last modified 23.10.2020: Typo fix python-function-components (#2310) (4f05fc73)