Retrieve Task

task task

The Retrieve Task connects to a url and downloads the content locally. This task is helpful when working with actions that require data to be available locally.

Example

The following shows a simple example using this task as part of a workflow.

  1. from txtai.workflow import RetrieveTask, Workflow
  2. workflow = Workflow([RetrieveTask(directory="/tmp")])
  3. workflow(["https://file.to.download", "/local/file/to/copy"])

Configuration-driven example

This task can also be created with workflow configuration.

  1. workflow:
  2. tasks:
  3. - task: retrieve
  4. directory: /tmp

Methods

Python documentation for the task.

Source code in txtai/workflow/task/base.py

  1. def __init__(
  2. self,
  3. action=None,
  4. select=None,
  5. unpack=True,
  6. column=None,
  7. merge="hstack",
  8. initialize=None,
  9. finalize=None,
  10. concurrency=None,
  11. onetomany=True,
  12. **kwargs,
  13. ):
  14. """
  15. Creates a new task. A task defines two methods, type of data it accepts and the action to execute
  16. for each data element. Action is a callable function or list of callable functions.
  17. Args:
  18. action: action(s) to execute on each data element
  19. select: filter(s) used to select data to process
  20. unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
  21. column: column index to select if element is a tuple, defaults to all
  22. merge: merge mode for joining multi-action outputs, defaults to hstack
  23. initialize: action to execute before processing
  24. finalize: action to execute after processing
  25. concurrency: sets concurrency method when execute instance available
  26. valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
  27. onetomany: if one-to-many data transformations should be enabled, defaults to True
  28. kwargs: additional keyword arguments
  29. """
  30. # Standardize into list of actions
  31. if not action:
  32. action = []
  33. elif not isinstance(action, list):
  34. action = [action]
  35. self.action = action
  36. self.select = select
  37. self.unpack = unpack
  38. self.column = column
  39. self.merge = merge
  40. self.initialize = initialize
  41. self.finalize = finalize
  42. self.concurrency = concurrency
  43. self.onetomany = onetomany
  44. # Check for custom registration. Adds additional instance members and validates required dependencies available.
  45. if hasattr(self, "register"):
  46. self.register(**kwargs)
  47. elif kwargs:
  48. # Raise error if additional keyword arguments passed in without register method
  49. kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
  50. raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

Adds retrieve parameters to task.

Parameters:

NameTypeDescriptionDefault
directory

local directory used to store retrieved files

None
flatten

flatten input directory structure, defaults to True

True

Source code in txtai/workflow/task/retrieve.py

  1. def register(self, directory=None, flatten=True):
  2. """
  3. Adds retrieve parameters to task.
  4. Args:
  5. directory: local directory used to store retrieved files
  6. flatten: flatten input directory structure, defaults to True
  7. """
  8. # pylint: disable=W0201
  9. # Create default temporary directory if not specified
  10. if not directory:
  11. # Save tempdir to prevent content from being deleted until this task is out of scope
  12. # pylint: disable=R1732
  13. self.tempdir = tempfile.TemporaryDirectory()
  14. directory = self.tempdir.name
  15. # Create output directory if necessary
  16. os.makedirs(directory, exist_ok=True)
  17. self.directory = directory
  18. self.flatten = flatten