Service Task

task task

The Service Task extracts content from a http service.

Example

The following shows a simple example using this task as part of a workflow.

  1. from txtai.workflow import ServiceTask, Workflow
  2. workflow = Workflow([ServiceTask(url="https://service.url/action)])
  3. workflow(["parameter"])

Configuration-driven example

This task can also be created with workflow configuration.

  1. workflow:
  2. tasks:
  3. - task: service
  4. url: https://service.url/action

Methods

Python documentation for the task.

Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.

Parameters:

NameTypeDescriptionDefault
action

action(s) to execute on each data element

None
select

filter(s) used to select data to process

None
unpack

if data elements should be unpacked or unwrapped from (id, data, tag) tuples

True
column

column index to select if element is a tuple, defaults to all

None
merge

merge mode for joining multi-action outputs, defaults to hstack

‘hstack’
initialize

action to execute before processing

None
finalize

action to execute after processing

None
concurrency

sets concurrency method when execute instance available valid values: “thread” for thread-based concurrency, “process” for process-based concurrency

None
onetomany

if one-to-many data transformations should be enabled, defaults to True

True
kwargs

additional keyword arguments

{}

Source code in txtai/workflow/task/base.py

  1. 21
  2. 22
  3. 23
  4. 24
  5. 25
  6. 26
  7. 27
  8. 28
  9. 29
  10. 30
  11. 31
  12. 32
  13. 33
  14. 34
  15. 35
  16. 36
  17. 37
  18. 38
  19. 39
  20. 40
  21. 41
  22. 42
  23. 43
  24. 44
  25. 45
  26. 46
  27. 47
  28. 48
  29. 49
  30. 50
  31. 51
  32. 52
  33. 53
  34. 54
  35. 55
  36. 56
  37. 57
  38. 58
  39. 59
  40. 60
  41. 61
  42. 62
  43. 63
  44. 64
  45. 65
  46. 66
  47. 67
  48. 68
  49. 69
  50. 70
  51. 71
  52. 72
  53. 73
  54. 74
  1. def init(
  2. self,
  3. action=None,
  4. select=None,
  5. unpack=True,
  6. column=None,
  7. merge=”hstack”,
  8. initialize=None,
  9. finalize=None,
  10. concurrency=None,
  11. onetomany=True,
  12. kwargs,
  13. ):
  14. “””
  15. Creates a new task. A task defines two methods, type of data it accepts and the action to execute
  16. for each data element. Action is a callable function or list of callable functions.
  17. Args:
  18. action: action(s) to execute on each data element
  19. select: filter(s) used to select data to process
  20. unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
  21. column: column index to select if element is a tuple, defaults to all
  22. merge: merge mode for joining multi-action outputs, defaults to hstack
  23. initialize: action to execute before processing
  24. finalize: action to execute after processing
  25. concurrency: sets concurrency method when execute instance available
  26. valid values: thread for thread-based concurrency, process for process-based concurrency
  27. onetomany: if one-to-many data transformations should be enabled, defaults to True
  28. kwargs: additional keyword arguments
  29. “””
  30. # Standardize into list of actions
  31. if not action:
  32. action = []
  33. elif not isinstance(action, list):
  34. action = [action]
  35. self.action = action
  36. self.select = select
  37. self.unpack = unpack
  38. self.column = column
  39. self.merge = merge
  40. self.initialize = initialize
  41. self.finalize = finalize
  42. self.concurrency = concurrency
  43. self.onetomany = onetomany
  44. # Check for custom registration. Adds additional instance members and validates required dependencies available.
  45. if hasattr(self, register”):
  46. self.register(kwargs)
  47. elif kwargs:
  48. # Raise error if additional keyword arguments passed in without register method
  49. kwargs = “, “.join(f”‘{kw}’” for kw in kwargs)
  50. raise TypeError(finit() got unexpected keyword arguments: {kwargs}”)

Adds service parameters to task. Checks if required dependencies are installed.

Parameters:

NameTypeDescriptionDefault
url

url to connect to

None
method

http method, GET or POST

None
params

default query parameters

None
batch

if True, all elements are passed in a single batch request, otherwise a service call is executed per element

True
extract

list of sections to extract from response

None

Source code in txtai/workflow/task/service.py

  1. 22
  2. 23
  3. 24
  4. 25
  5. 26
  6. 27
  7. 28
  8. 29
  9. 30
  10. 31
  11. 32
  12. 33
  13. 34
  14. 35
  15. 36
  16. 37
  17. 38
  18. 39
  19. 40
  20. 41
  21. 42
  22. 43
  23. 44
  24. 45
  25. 46
  26. 47
  27. 48
  28. 49
  1. def register(self, url=None, method=None, params=None, batch=True, extract=None):
  2. “””
  3. Adds service parameters to task. Checks if required dependencies are installed.
  4. Args:
  5. url: url to connect to
  6. method: http method, GET or POST
  7. params: default query parameters
  8. batch: if True, all elements are passed in a single batch request, otherwise a service call is executed per element
  9. extract: list of sections to extract from response
  10. “””
  11. if not XML_TO_DICT:
  12. raise ImportError(‘ServiceTask is not available - install workflow extra to enable’)
  13. # pylint: disable=W0201
  14. # Save URL, method and parameter defaults
  15. self.url = url
  16. self.method = method
  17. self.params = params
  18. # If True, all elements are passed in a single batch request, otherwise a service call is executed per element
  19. self.batch = batch
  20. # Save sections to extract. Supports both a single string and a hierarchical list of sections.
  21. self.extract = extract
  22. if self.extract:
  23. self.extract = [self.extract] if isinstance(self.extract, str) else self.extract