Workflow

workflow workflow

Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows operate well with pipelines but can work with any callable object. Workflows are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.

Given that pipelines are callable objects, workflows enable efficient processing of pipeline data. Transformers models typically work with smaller batches of data, workflows are well suited to feed a series of transformers pipelines.

An example of the most basic workflow:

  1. workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
  2. list(workflow([1, 2, 3]))

This example multiplies each input value by 2 and returns transformed elements via a generator.

Since workflows run as generators, output must be consumed for execution to occur. The following snippets show how output can be consumed.

  1. # Small dataset where output fits in memory
  2. list(workflow(elements))
  3. # Large dataset
  4. for output in workflow(elements):
  5. function(output)
  6. # Large dataset where output is discarded
  7. for _ in workflow(elements):
  8. pass

Workflows are run with Python or configuration. Examples of both methods are shown below.

Example

A full-featured example is shown below in Python. This workflow transcribes a set of audio files, translates the text into French and indexes the data.

  1. from txtai.embeddings import Embeddings
  2. from txtai.pipeline import Transcription, Translation
  3. from txtai.workflow import FileTask, Task, Workflow
  4. # Embeddings instance
  5. embeddings = Embeddings({
  6. "path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
  7. "content": True
  8. })
  9. # Transcription instance
  10. transcribe = Transcription()
  11. # Translation instance
  12. translate = Translation()
  13. tasks = [
  14. FileTask(transcribe, r"\.wav$"),
  15. Task(lambda x: translate(x, "fr"))
  16. ]
  17. # List of files to process
  18. data = [
  19. "US_tops_5_million.wav",
  20. "Canadas_last_fully.wav",
  21. "Beijing_mobilises.wav",
  22. "The_National_Park.wav",
  23. "Maine_man_wins_1_mil.wav",
  24. "Make_huge_profits.wav"
  25. ]
  26. # Workflow that translate text to French
  27. workflow = Workflow(tasks)
  28. # Index data
  29. embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))
  30. # Search
  31. embeddings.search("wildlife", 1)

Configuration-driven example

Workflows can be defined using Python as shown above but they can also run with YAML configuration.

  1. writable: true
  2. embeddings:
  3. path: sentence-transformers/paraphrase-MiniLM-L3-v2
  4. content: true
  5. # Transcribe audio to text
  6. transcription:
  7. # Translate text between languages
  8. translation:
  9. workflow:
  10. index:
  11. tasks:
  12. - action: transcription
  13. select: "\\.wav$"
  14. task: file
  15. - action: translation
  16. args: ["fr"]
  17. - action: index
  1. # Create and run the workflow
  2. from txtai.app import Application
  3. # Create and run the workflow
  4. app = Application("workflow.yml")
  5. list(app.workflow("index", [
  6. "US_tops_5_million.wav",
  7. "Canadas_last_fully.wav",
  8. "Beijing_mobilises.wav",
  9. "The_National_Park.wav",
  10. "Maine_man_wins_1_mil.wav",
  11. "Make_huge_profits.wav"
  12. ]))
  13. # Search
  14. app.search("wildlife")

The code above executes a workflow defined in the file workflow.yml. The API is used to run the workflow locally, there is minimal overhead running workflows in this manner. It’s a matter of preference.

See the following links for more information.

Methods

Workflows are callable objects. Workflows take an input of iterable data elements and output iterable data elements.

__init__(self, tasks, batch=100, workers=None, name=None) special

Creates a new workflow. Workflows are lists of tasks to execute.

Parameters:

NameTypeDescriptionDefault
tasks

list of workflow tasks

required
batch

how many items to process at a time, defaults to 100

100
workers

number of concurrent workers

None
name

workflow name

None

Source code in txtai/workflow/base.py

  1. def __init__(self, tasks, batch=100, workers=None, name=None):
  2. """
  3. Creates a new workflow. Workflows are lists of tasks to execute.
  4. Args:
  5. tasks: list of workflow tasks
  6. batch: how many items to process at a time, defaults to 100
  7. workers: number of concurrent workers
  8. name: workflow name
  9. """
  10. self.tasks = tasks
  11. self.batch = batch
  12. self.workers = workers
  13. self.name = name
  14. # Set default number of executor workers to max number of actions in a task
  15. self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers

__call__(self, elements) special

Executes a workflow for input elements. This method returns a generator that yields transformed data elements.

Parameters:

NameTypeDescriptionDefault
elements

iterable data elements

required

Returns:

TypeDescription

generator that yields transformed data elements

Source code in txtai/workflow/base.py

  1. def __call__(self, elements):
  2. """
  3. Executes a workflow for input elements. This method returns a generator that yields transformed
  4. data elements.
  5. Args:
  6. elements: iterable data elements
  7. Returns:
  8. generator that yields transformed data elements
  9. """
  10. # Create execute instance for this run
  11. with Execute(self.workers) as executor:
  12. # Run task initializers
  13. self.initialize()
  14. # Process elements in batches
  15. for batch in self.chunk(elements):
  16. yield from self.process(batch, executor)
  17. # Run task finalizers
  18. self.finalize()

schedule(self, cron, elements, iterations=None)

Schedules a workflow using a cron expression and elements.

Parameters:

NameTypeDescriptionDefault
cron

cron expression

required
elements

iterable data elements passed to workflow each call

required
iterations

number of times to run workflow, defaults to run indefinitely

None

Source code in txtai/workflow/base.py

  1. def schedule(self, cron, elements, iterations=None):
  2. """
  3. Schedules a workflow using a cron expression and elements.
  4. Args:
  5. cron: cron expression
  6. elements: iterable data elements passed to workflow each call
  7. iterations: number of times to run workflow, defaults to run indefinitely
  8. """
  9. # Check that croniter is installed
  10. if not CRONITER:
  11. raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')
  12. logger.info("'%s' scheduler started with schedule %s", self.name, cron)
  13. maxiterations = iterations
  14. while iterations is None or iterations > 0:
  15. # Schedule using localtime
  16. schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
  17. logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
  18. time.sleep(schedule.timestamp() - time.time())
  19. # Run workflow
  20. # pylint: disable=W0703
  21. try:
  22. for _ in self(elements):
  23. pass
  24. except Exception:
  25. logger.error(traceback.format_exc())
  26. # Decrement iterations remaining, if necessary
  27. if iterations is not None:
  28. iterations -= 1
  29. logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)

More examples

See this link for a full list of workflow examples.