Tasks

task task

Workflows execute tasks. Tasks are callable objects with a number of parameters to control the processing of data at a given step. While similar to pipelines, tasks encapsulate processing and don’t perform signficant transformations on their own. Tasks perform logic to prepare content for the underlying action(s).

A simple task is shown below.

  1. Task(lambda x: [y * 2 for y in x])

The task above executes the function above for all input elements.

Tasks work well with pipelines, since pipelines are callable objects. The example below will summarize each input element.

  1. summary = Summary()
  2. Task(summary)

Tasks can operate independently but work best with workflows, as workflows add large-scale stream processing.

  1. summary = Summary()
  2. task = Task(summary)
  3. task(["Very long text here"])
  4. workflow = Workflow([task])
  5. list(workflow(["Very long text here"]))

Tasks can also be created with configuration as part of a workflow.

  1. workflow:
  2. tasks:
  3. - action: summary

__init__(self, action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs) special

Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.

Parameters:

NameTypeDescriptionDefault
action

action(s) to execute on each data element

None
select

filter(s) used to select data to process

None
unpack

if data elements should be unpacked or unwrapped from (id, data, tag) tuples

True
column

column index to select if element is a tuple, defaults to all

None
merge

merge mode for joining multi-action outputs, defaults to hstack

‘hstack’
initialize

action to execute before processing

None
finalize

action to execute after processing

None
concurrency

sets concurrency method when execute instance available valid values: “thread” for thread-based concurrency, “process” for process-based concurrency

None
onetomany

if one-to-many data transformations should be enabled, defaults to True

True
kwargs

additional keyword arguments

{}

Source code in txtai/workflow/task/base.py

  1. def __init__(
  2. self,
  3. action=None,
  4. select=None,
  5. unpack=True,
  6. column=None,
  7. merge="hstack",
  8. initialize=None,
  9. finalize=None,
  10. concurrency=None,
  11. onetomany=True,
  12. **kwargs,
  13. ):
  14. """
  15. Creates a new task. A task defines two methods, type of data it accepts and the action to execute
  16. for each data element. Action is a callable function or list of callable functions.
  17. Args:
  18. action: action(s) to execute on each data element
  19. select: filter(s) used to select data to process
  20. unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
  21. column: column index to select if element is a tuple, defaults to all
  22. merge: merge mode for joining multi-action outputs, defaults to hstack
  23. initialize: action to execute before processing
  24. finalize: action to execute after processing
  25. concurrency: sets concurrency method when execute instance available
  26. valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
  27. onetomany: if one-to-many data transformations should be enabled, defaults to True
  28. kwargs: additional keyword arguments
  29. """
  30. # Standardize into list of actions
  31. if not action:
  32. action = []
  33. elif not isinstance(action, list):
  34. action = [action]
  35. self.action = action
  36. self.select = select
  37. self.unpack = unpack
  38. self.column = column
  39. self.merge = merge
  40. self.initialize = initialize
  41. self.finalize = finalize
  42. self.concurrency = concurrency
  43. self.onetomany = onetomany
  44. # Check for custom registration. Adds additional instance members and validates required dependencies available.
  45. if hasattr(self, "register"):
  46. self.register(**kwargs)
  47. elif kwargs:
  48. # Raise error if additional keyword arguments passed in without register method
  49. kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
  50. raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

Multi-action task concurrency

The default processing mode is to run actions sequentially. Multiprocessing support is already built in at a number of levels. Any of the GPU models will maximize GPU utilization for example and even in CPU mode, concurrency is utilized. But there are still use cases for task action concurrency. For example, if the system has multiple GPUs, the task runs external sequential code, or the task has a large number of I/O tasks.

In addition to sequential processing, multi-action tasks can run either multithreaded or with multiple processes. The advantages of each approach are discussed below.

  • multithreading - no overhead of creating separate processes or pickling data. But Python can only execute a single thread due the GIL, so this approach won’t help with CPU bound actions. This method works well with I/O bound actions and GPU actions.

  • multiprocessing - separate subprocesses are created and data is exchanged via pickling. This method can fully utilize all CPU cores since each process runs independently. This method works well with CPU bound actions.

More information on multiprocessing can be found in the Python documentation.

Multi-action task merges

Multi-action tasks will generate parallel outputs for the input data. The task output can be merged together in a couple different ways.

hstack(self, outputs)

Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.

Column-wise merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Column Merge => [(a1, a2), (b1, b2), (c1, c2)]

Parameters:

NameTypeDescriptionDefault
outputs

task outputs

required

Returns:

TypeDescription

list of aggregated/zipped outputs as tuples (column-wise)

Source code in txtai/workflow/task/base.py

  1. def hstack(self, outputs):
  2. """
  3. Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.
  4. Column-wise merge example (2 actions)
  5. Inputs: [a, b, c]
  6. Outputs => [[a1, b1, c1], [a2, b2, c2]]
  7. Column Merge => [(a1, a2), (b1, b2), (c1, c2)]
  8. Args:
  9. outputs: task outputs
  10. Returns:
  11. list of aggregated/zipped outputs as tuples (column-wise)
  12. """
  13. # If all outputs are numpy arrays, use native method
  14. if all(isinstance(output, np.ndarray) for output in outputs):
  15. return np.stack(outputs, axis=1)
  16. # If all outputs are torch tensors, use native method
  17. # pylint: disable=E1101
  18. if all(torch.is_tensor(output) for output in outputs):
  19. return torch.stack(outputs, axis=1)
  20. return list(zip(*outputs))

vstack(self, outputs)

Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.

Row-wise merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]

Parameters:

NameTypeDescriptionDefault
outputs

task outputs

required

Returns:

TypeDescription

list of aggregated/zipped outputs as one to many transforms (row-wise)

Source code in txtai/workflow/task/base.py

  1. def vstack(self, outputs):
  2. """
  3. Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.
  4. Row-wise merge example (2 actions)
  5. Inputs: [a, b, c]
  6. Outputs => [[a1, b1, c1], [a2, b2, c2]]
  7. Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]
  8. Args:
  9. outputs: task outputs
  10. Returns:
  11. list of aggregated/zipped outputs as one to many transforms (row-wise)
  12. """
  13. # If all outputs are numpy arrays, use native method
  14. if all(isinstance(output, np.ndarray) for output in outputs):
  15. return np.concatenate(np.stack(outputs, axis=1))
  16. # If all outputs are torch tensors, use native method
  17. # pylint: disable=E1101
  18. if all(torch.is_tensor(output) for output in outputs):
  19. return torch.cat(tuple(torch.stack(outputs, axis=1)))
  20. # Flatten into lists of outputs per input row. Wrap as one to many transformation.
  21. merge = []
  22. for x in zip(*outputs):
  23. combine = []
  24. for y in x:
  25. if isinstance(y, list):
  26. combine.extend(y)
  27. else:
  28. combine.append(y)
  29. merge.append(OneToMany(combine))
  30. return merge

concat(self, outputs)

Merges outputs column-wise and concats values together into a string. Returns a list of strings.

Concat merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => [“a1. a2”, “b1. b2”, “c1. c2”]

Parameters:

NameTypeDescriptionDefault
outputs

task outputs

required

Returns:

TypeDescription

list of concat outputs

Source code in txtai/workflow/task/base.py

  1. def concat(self, outputs):
  2. """
  3. Merges outputs column-wise and concats values together into a string. Returns a list of strings.
  4. Concat merge example (2 actions)
  5. Inputs: [a, b, c]
  6. Outputs => [[a1, b1, c1], [a2, b2, c2]]
  7. Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]
  8. Args:
  9. outputs: task outputs
  10. Returns:
  11. list of concat outputs
  12. """
  13. return [". ".join([str(y) for y in x if y]) for x in self.hstack(outputs)]

Extract task output columns

With column-wise merging, each output row will be a tuple of output values for each task action. This can be fed as input to a downstream task and that task can have separate tasks work with each element.

A simple example:

  1. workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
  2. list(workflow([(2, 8)]))

For the example input tuple of (2, 2), the workflow will only select the first element (2) and run the task against that element.

  1. workflow = Workflow([Task([lambda x: [y * 3 for y in x],
  2. lambda x: [y - 1 for y in x]],
  3. unpack=False, column={0:0, 1:1})])
  4. list(workflow([(2, 8)]))

The example above applies a separate action to each input column. This simple construct can help build extremely powerful workflow graphs!