Workflow

workflow workflow

Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows operate well with pipelines but can work with any callable object. Workflows are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.

Given that pipelines are callable objects, workflows enable efficient processing of pipeline data. Large language models typically work with smaller batches of data, workflows are well suited to feed a series of transformers pipelines.

An example of the most basic workflow:

  1. workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
  2. list(workflow([1, 2, 3]))

This example multiplies each input value by 2 and returns transformed elements via a generator.

Since workflows run as generators, output must be consumed for execution to occur. The following snippets show how output can be consumed.

  1. # Small dataset where output fits in memory
  2. list(workflow(elements))
  3. # Large dataset
  4. for output in workflow(elements):
  5. function(output)
  6. # Large dataset where output is discarded
  7. for _ in workflow(elements):
  8. pass

Workflows are run with Python or configuration. Examples of both methods are shown below.

Example

A full-featured example is shown below in Python. This workflow transcribes a set of audio files, translates the text into French and indexes the data.

  1. from txtai.embeddings import Embeddings
  2. from txtai.pipeline import Transcription, Translation
  3. from txtai.workflow import FileTask, Task, Workflow
  4. # Embeddings instance
  5. embeddings = Embeddings({
  6. "path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
  7. "content": True
  8. })
  9. # Transcription instance
  10. transcribe = Transcription()
  11. # Translation instance
  12. translate = Translation()
  13. tasks = [
  14. FileTask(transcribe, r"\.wav$"),
  15. Task(lambda x: translate(x, "fr"))
  16. ]
  17. # List of files to process
  18. data = [
  19. "US_tops_5_million.wav",
  20. "Canadas_last_fully.wav",
  21. "Beijing_mobilises.wav",
  22. "The_National_Park.wav",
  23. "Maine_man_wins_1_mil.wav",
  24. "Make_huge_profits.wav"
  25. ]
  26. # Workflow that translate text to French
  27. workflow = Workflow(tasks)
  28. # Index data
  29. embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))
  30. # Search
  31. embeddings.search("wildlife", 1)

Configuration-driven example

Workflows can also be defined with YAML configuration.

  1. writable: true
  2. embeddings:
  3. path: sentence-transformers/paraphrase-MiniLM-L3-v2
  4. content: true
  5. # Transcribe audio to text
  6. transcription:
  7. # Translate text between languages
  8. translation:
  9. workflow:
  10. index:
  11. tasks:
  12. - action: transcription
  13. select: "\\.wav$"
  14. task: file
  15. - action: translation
  16. args: ["fr"]
  17. - action: index
  1. # Create and run the workflow
  2. from txtai.app import Application
  3. # Create and run the workflow
  4. app = Application("workflow.yml")
  5. list(app.workflow("index", [
  6. "US_tops_5_million.wav",
  7. "Canadas_last_fully.wav",
  8. "Beijing_mobilises.wav",
  9. "The_National_Park.wav",
  10. "Maine_man_wins_1_mil.wav",
  11. "Make_huge_profits.wav"
  12. ]))
  13. # Search
  14. app.search("wildlife")

The code above executes a workflow defined in the file `workflow.yml.

LLM workflow example

Workflows can connect multiple LLM prompting tasks together.

  1. llm:
  2. path: google/flan-t5-xl
  3. workflow:
  4. llm:
  5. tasks:
  6. - task: template
  7. template: |
  8. Extract keywords for the following text.
  9. {text}
  10. action: llm
  11. - task: template
  12. template: |
  13. Translate the following text into French.
  14. {text}
  15. action: llm
  1. from txtai.app import Application
  2. app = Application("workflow.yml")
  3. list(app.workflow("llm", [
  4. """
  5. txtai is an open-source platform for semantic search
  6. and workflows powered by language models.
  7. """
  8. ]))

Any txtai pipeline/workflow task can be connected in workflows with LLMs.

  1. llm:
  2. path: google/flan-t5-xl
  3. translation:
  4. workflow:
  5. llm:
  6. tasks:
  7. - task: template
  8. template: |
  9. Extract keywords for the following text.
  10. {text}
  11. action: llm
  12. - action: translation
  13. args:
  14. - fr

See the following links for more information.

Methods

Workflows are callable objects. Workflows take an input of iterable data elements and output iterable data elements.

Creates a new workflow. Workflows are lists of tasks to execute.

Parameters:

NameTypeDescriptionDefault
tasks

list of workflow tasks

required
batch

how many items to process at a time, defaults to 100

100
workers

number of concurrent workers

None
name

workflow name

None
stream

workflow stream processor

None

Source code in txtai/workflow/base.py

  1. 30
  2. 31
  3. 32
  4. 33
  5. 34
  6. 35
  7. 36
  8. 37
  9. 38
  10. 39
  11. 40
  12. 41
  13. 42
  14. 43
  15. 44
  16. 45
  17. 46
  18. 47
  19. 48
  20. 49
  1. def init(self, tasks, batch=100, workers=None, name=None, stream=None):
  2. “””
  3. Creates a new workflow. Workflows are lists of tasks to execute.
  4. Args:
  5. tasks: list of workflow tasks
  6. batch: how many items to process at a time, defaults to 100
  7. workers: number of concurrent workers
  8. name: workflow name
  9. stream: workflow stream processor
  10. “””
  11. self.tasks = tasks
  12. self.batch = batch
  13. self.workers = workers
  14. self.name = name
  15. self.stream = stream
  16. # Set default number of executor workers to max number of actions in a task
  17. self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers

Executes a workflow for input elements. This method returns a generator that yields transformed data elements.

Parameters:

NameTypeDescriptionDefault
elements

iterable data elements

required

Returns:

TypeDescription

generator that yields transformed data elements

Source code in txtai/workflow/base.py

  1. 51
  2. 52
  3. 53
  4. 54
  5. 55
  6. 56
  7. 57
  8. 58
  9. 59
  10. 60
  11. 61
  12. 62
  13. 63
  14. 64
  15. 65
  16. 66
  17. 67
  18. 68
  19. 69
  20. 70
  21. 71
  22. 72
  23. 73
  24. 74
  25. 75
  26. 76
  1. def call(self, elements):
  2. “””
  3. Executes a workflow for input elements. This method returns a generator that yields transformed
  4. data elements.
  5. Args:
  6. elements: iterable data elements
  7. Returns:
  8. generator that yields transformed data elements
  9. “””
  10. # Create execute instance for this run
  11. with Execute(self.workers) as executor:
  12. # Run task initializers
  13. self.initialize()
  14. # Process elements with stream processor, if available
  15. elements = self.stream(elements) if self.stream else elements
  16. # Process elements in batches
  17. for batch in self.chunk(elements):
  18. yield from self.process(batch, executor)
  19. # Run task finalizers
  20. self.finalize()

Schedules a workflow using a cron expression and elements.

Parameters:

NameTypeDescriptionDefault
cron

cron expression

required
elements

iterable data elements passed to workflow each call

required
iterations

number of times to run workflow, defaults to run indefinitely

None

Source code in txtai/workflow/base.py

  1. 78
  2. 79
  3. 80
  4. 81
  5. 82
  6. 83
  7. 84
  8. 85
  9. 86
  10. 87
  11. 88
  12. 89
  13. 90
  14. 91
  15. 92
  16. 93
  17. 94
  18. 95
  19. 96
  20. 97
  21. 98
  22. 99
  23. 100
  24. 101
  25. 102
  26. 103
  27. 104
  28. 105
  29. 106
  30. 107
  31. 108
  32. 109
  33. 110
  34. 111
  35. 112
  36. 113
  1. def schedule(self, cron, elements, iterations=None):
  2. “””
  3. Schedules a workflow using a cron expression and elements.
  4. Args:
  5. cron: cron expression
  6. elements: iterable data elements passed to workflow each call
  7. iterations: number of times to run workflow, defaults to run indefinitely
  8. “””
  9. # Check that croniter is installed
  10. if not CRONITER:
  11. raise ImportError(‘Workflow scheduling is not available - install workflow extra to enable’)
  12. logger.info(“‘%s scheduler started with schedule %s”, self.name, cron)
  13. maxiterations = iterations
  14. while iterations is None or iterations > 0:
  15. # Schedule using localtime
  16. schedule = croniter(cron, datetime.now().astimezone()).getnext(datetime)
  17. logger.info(“‘%s next run scheduled for %s”, self.name, schedule.isoformat())
  18. time.sleep(schedule.timestamp() - time.time())
  19. # Run workflow
  20. # pylint: disable=W0703
  21. try:
  22. for in self(elements):
  23. pass
  24. except Exception:
  25. logger.error(traceback.format_exc())
  26. # Decrement iterations remaining, if necessary
  27. if iterations is not None:
  28. iterations -= 1
  29. logger.info(“‘%s max iterations (%d) reached”, self.name, maxiterations)

More examples

See this link for a full list of workflow examples.