Service Task

task task

The Service Task extracts content from a http service.

Example

The following shows a simple example using this task as part of a workflow.

  1. from txtai.workflow import ServiceTask, Workflow
  2. workflow = Workflow([ServiceTask(url="https://service.url/action)])
  3. workflow(["parameter"])

Configuration-driven example

This task can also be created with workflow configuration.

  1. workflow:
  2. tasks:
  3. - task: service
  4. url: https://service.url/action

Methods

Python documentation for the task.

__init__(self, action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs) special

Source code in txtai/workflow/task/base.py

  1. def __init__(
  2. self,
  3. action=None,
  4. select=None,
  5. unpack=True,
  6. column=None,
  7. merge="hstack",
  8. initialize=None,
  9. finalize=None,
  10. concurrency=None,
  11. onetomany=True,
  12. **kwargs,
  13. ):
  14. """
  15. Creates a new task. A task defines two methods, type of data it accepts and the action to execute
  16. for each data element. Action is a callable function or list of callable functions.
  17. Args:
  18. action: action(s) to execute on each data element
  19. select: filter(s) used to select data to process
  20. unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
  21. column: column index to select if element is a tuple, defaults to all
  22. merge: merge mode for joining multi-action outputs, defaults to hstack
  23. initialize: action to execute before processing
  24. finalize: action to execute after processing
  25. concurrency: sets concurrency method when execute instance available
  26. valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
  27. onetomany: if one-to-many data transformations should be enabled, defaults to True
  28. kwargs: additional keyword arguments
  29. """
  30. # Standardize into list of actions
  31. if not action:
  32. action = []
  33. elif not isinstance(action, list):
  34. action = [action]
  35. self.action = action
  36. self.select = select
  37. self.unpack = unpack
  38. self.column = column
  39. self.merge = merge
  40. self.initialize = initialize
  41. self.finalize = finalize
  42. self.concurrency = concurrency
  43. self.onetomany = onetomany
  44. # Check for custom registration. Adds additional instance members and validates required dependencies available.
  45. if hasattr(self, "register"):
  46. self.register(**kwargs)
  47. elif kwargs:
  48. # Raise error if additional keyword arguments passed in without register method
  49. kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
  50. raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

register(self, url=None, method=None, params=None, batch=True, extract=None)

Adds service parameters to task. Checks if required dependencies are installed.

Parameters:

NameTypeDescriptionDefault
url

url to connect to

None
method

http method, GET or POST

None
params

default query parameters

None
batch

if True, all elements are passed in a single batch request, otherwise a service call is executed per element

True
extract

list of sections to extract from response

None

Source code in txtai/workflow/task/service.py

  1. def register(self, url=None, method=None, params=None, batch=True, extract=None):
  2. """
  3. Adds service parameters to task. Checks if required dependencies are installed.
  4. Args:
  5. url: url to connect to
  6. method: http method, GET or POST
  7. params: default query parameters
  8. batch: if True, all elements are passed in a single batch request, otherwise a service call is executed per element
  9. extract: list of sections to extract from response
  10. """
  11. if not XML_TO_DICT:
  12. raise ImportError('ServiceTask is not available - install "workflow" extra to enable')
  13. # pylint: disable=W0201
  14. # Save URL, method and parameter defaults
  15. self.url = url
  16. self.method = method
  17. self.params = params
  18. # If True, all elements are passed in a single batch request, otherwise a service call is executed per element
  19. self.batch = batch
  20. # Save sections to extract. Supports both a single string and a hierarchical list of sections.
  21. self.extract = extract
  22. if self.extract:
  23. self.extract = [self.extract] if isinstance(self.extract, str) else self.extract