Create Executor

Executor process DocumentArray in-place via functions decorated with @requests. To create an Executor, you only need to follow three principles:

  • An Executor should subclass directly from jina.Executor class.

  • An Executor class is a bag of functions with shared state (via self); it can contain an arbitrary number of functions with arbitrary names.

  • Functions decorated by @requests will be invoked according to their on= endpoint. These functions can be coroutines (async def) or regular functions.

Minimum working example

  1. from jina import Executor, requests
  2. class MyExecutor(Executor):
  3. @requests
  4. def foo(self, **kwargs):
  5. print(kwargs)

Use it in a Flow

  1. from jina import Executor
  2. f = Flow().add(uses=MyExecutor)
  3. with f:
  4. f.post(on='/random_work', inputs=Document(), on_done=print)

Use it as-is

  1. m = MyExecutor()
  2. m.foo()

Using Executors with AsyncIO

For I/O bound Executors it can be helpful to utilize Python’s AsyncIO API. This means we can wait for multiple pending Executor function calls concurrently.

  1. import asyncio
  2. from jina import Executor, requests
  3. class MyExecutor(Executor):
  4. @requests
  5. async def foo(self, **kwargs):
  6. await asyncio.sleep(1.0)
  7. print(kwargs)
  8. async def main():
  9. m = MyExecutor()
  10. call1 = asyncio.create_task(m.foo())
  11. call2 = asyncio.create_task(m.foo())
  12. await asyncio.gather(call1, call2)
  13. asyncio.run(main())

Constructor

Subclass

Every new executor should be subclass from jina.Executor.

You can name your executor class freely.

__init__

No need to implement __init__ if your Executor does not contain initial states.

If your executor has __init__, it needs to carry **kwargs in the signature and call super().__init__(**kwargs) in the body:

  1. from jina import Executor
  2. class MyExecutor(Executor):
  3. def __init__(self, foo: str, bar: int, **kwargs):
  4. super().__init__(**kwargs)
  5. self.bar = bar
  6. self.foo = foo

What is inside kwargs?

Here, kwargs contains metas and requests (representing the request-to-function mapping) values from the YAML config and runtime_args injected on startup.

You can access the values of these arguments in __init__ body via self.metas/self.requests/self.runtime_args, or modifying their values before sending to super().__init__().

Passing arguments

When using an Executor in a Flow, there are two ways of passing arguments to its __init__.

via uses_with

  1. from jina import Flow
  2. f = Flow.add(uses=MyExecutor, uses_with={'foo': 'hello', 'bar': 1})
  3. with f:
  4. ...

via predefined YAML

my-exec.yml

Create Executor - 图1

Create Executor - 图2

  1. jtype: MyExecutor
  2. with:
  3. foo: hello
  4. bar: 1
  5. ``` my-flow.py
  6. ![](/projects/jina-2.7.0-en/560a334f905a2c3f3a88dc4cbf6917b0.svg)
  7. ![](/projects/jina-2.7.0-en/7d4a10a8b8976688e4d950af40acec01.svg)

from jina import Flow

f = Flow.add(uses=’my-exec.yml’)

with f: …

  1. Hint
  2. `uses_with` has higher priority than predefined `with` config in YAML. When both presented, `uses_with` is picked up first.
  3. ## Methods
  4. Methods of `Executor` can be named and written freely.
  5. Only methods that are decorated with `@requests` can be used in a `Flow`.
  6. ### Method decorator
  7. You can import `requests` decorator via

from jina import requests

  1. `@requests` defines when a function will be invoked in the Flow. It has a keyword `on=` to define the endpoint.
  2. To call an Executors function, uses `Flow.post(on=..., ...)`. For example, given:

from jina import Executor, Flow, Document, requests

class MyExecutor(Executor):

  1. @requests(on='/index')
  2. def foo(self, **kwargs):
  3. print(f'foo is called: {kwargs}')
  4. @requests(on='/random_work')
  5. def bar(self, **kwargs):
  6. print(f'bar is called: {kwargs}')

f = Flow().add(uses=MyExecutor)

with f: f.post(on=’/index’, inputs=Document(text=’index’)) f.post(on=’/random_work’, inputs=Document(text=’random_work’)) f.post(on=’/blah’, inputs=Document(text=’blah’))

  1. Then:
  2. - `f.post(on='/index', ...)` will trigger `MyExecutor.foo`;
  3. - `f.post(on='/random_work', ...)` will trigger `MyExecutor.bar`;
  4. - `f.post(on='/blah', ...)` will not trigger any function, as no function is bound to `/blah`;
  5. #### Default binding
  6. A class method decorated with plain `@requests` (without `on=`) is the default handler for all endpoints. That means it is the fallback handler for endpoints that are not found. `f.post(on='/blah', ...)` will invoke `MyExecutor.foo`

from jina import Executor, requests

class MyExecutor(Executor):

  1. @requests
  2. def foo(self, **kwargs):
  3. print(kwargs)
  4. @requests(on='/index')
  5. def bar(self, **kwargs):
  6. print(kwargs)
  1. #### Multiple bindings
  2. To bind a method with multiple endpoints, you can use `@requests(on=['/foo', '/bar'])`. This allows either `f.post(on='/foo', ...)` or `f.post(on='/bar', ...)` to invoke that function.
  3. #### No binding
  4. A class with no `@requests` binding plays no part in the Flow. The request will simply pass through without any processing.
  5. ### Method signature
  6. Class method decorated by `@request` follows the signature below (`async` is optional):

async def foo(docs: DocumentArray, parameters: Dict, docs_matrix: List[DocumentArray], groundtruths: Optional[DocumentArray], groundtruths_matrix: List[DocumentArray]) -> Optional[DocumentArray]: pass

  1. If the function is using `async` in its signature, it will be used as a coroutine and the regular `asyncio` functionality is available inside the function.
  2. The Executors methods receive the following arguments in order:
  3. <table><thead><tr><th>Name</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td><code>docs</code></td><td><code>DocumentArray</code></td><td><code>Request.docs</code>. When multiple requests are available, it is a concatenation of all <code>Request.docs</code> as one <code>DocumentArray</code>.</td></tr><tr><td><code>parameters</code></td><td><code>Dict</code></td><td><code>Request.parameters</code>, given by <code>Flow.post(..., parameters=)</code></td></tr><tr><td><code>docs_matrix</code></td><td><code>List[DocumentArray]</code></td><td>When multiple requests are available, it is a list of all <code>Request.docs</code>. On single request, it is <code>None</code></td></tr><tr><td><code>groundtruths</code></td><td><code>Optional[DocumentArray]</code></td><td><code>Request.groundtruths</code>. Same behavior as <code>docs</code></td></tr><tr><td><code>groundtruths_matrix</code></td><td><code>List[DocumentArray]</code></td><td>Same behavior as <code>docs_matrix</code> but on <code>Request.groundtruths</code></td></tr></tbody></table>
  4. Note
  5. Executors methods not decorated with `@request` do not enjoy these arguments.
  6. Note
  7. The arguments order is designed as common-usage-first. Not alphabetical order or semantic closeness.
  8. Hint
  9. If you dont need some arguments, you can suppress them into `**kwargs`. For example:

from jina import Executor, requests

class MyExecutor(Executor):

  1. @requests
  2. def foo_using_docs_arg(self, docs, **kwargs):
  3. print(docs)
  4. @requests
  5. def foo_using_docs_parameters_arg(self, docs, parameters, **kwargs):
  6. print(docs)
  7. print(parameters)
  8. @requests
  9. def foo_using_no_arg(self, **kwargs):
  10. # the args are suppressed into kwargs
  11. print(kwargs['docs_matrix'])
  1. ### Method returns
  2. Methods decorated with `@request` can return `DocumentArray`, `DocumentArrayMemmap`, `Dict` or `None`.
  3. - If the return is `None`, then Jina considers all changes to happened in-place. The next Executor will receive the updated `docs` modified by the current Executor.
  4. - If the return is `DocumentArray` or `DocumentArrayMemmap`, then the current `docs` field in the `Request` will be overridden by the return, which will be forwarded to the next Executor in the Flow.
  5. - If the return is a `Dict`, then `Request.parameters` will be updated by union with the return. The next Executor will receive this updated `Request.parameters`. One can leverage this feature to pass parameters between Executors.
  6. So do I need a return? Most time you dont. Lets see some examples.
  7. #### Embed Documents `blob`
  8. In this example, `encode()` uses some neural network to get the embedding for each `Document.blob`, then assign it to `Document.embedding`. The whole procedure is in-place and there is no need to return anything.

import numpy as np from jina import requests, Executor, DocumentArray

from my_model import get_predict_model

class PNEncoder(Executor): def init(self, kwargs): super().init(kwargs) self.model = get_predict_model()

  1. @requests
  2. def encode(self, docs: DocumentArray, *args, **kwargs) -> None:
  3. _blob, _docs = docs.traverse_flat(['c']).get_attributes_with_docs('blob')
  4. embeds = self.model.predict(np.stack(_blob))
  5. for d, b in zip(_docs, embeds):
  6. d.embedding = b
  1. #### Add Chunks by segmenting Document
  2. In this example, each `Document` is segmented by `get_mesh` and the results are added to `.chunks`. After that, `.buffer` and `.uri` are removed from each `Document`. In this case, all changes happen in-place and there is no need to return anything.

from jina import requests, Document, Executor, DocumentArray

class ConvertSegmenter(Executor):

  1. @requests
  2. def segment(self, docs: DocumentArray, **kwargs) -> None:
  3. for d in docs:
  4. d.load_uri_to_buffer()
  5. d.chunks = [Document(blob=_r['blob'], tags=_r['tags']) for _r in get_mesh(d.content)]
  6. d.pop('buffer', 'uri')
  1. #### Preserve Document `id` only
  2. In this example, a simple indexer stores incoming `docs` in a `DocumentArray`. Then it recreates a new `DocumentArray` by preserving only `id` in the original `docs` and dropping all others, as the developer does not want to carry all rich info over the network. This needs a return.

from jina import requests, Document, Executor, DocumentArray

class MyIndexer(Executor): “””Simple indexer class “””

  1. def __init__(self, **kwargs):
  2. super().__init__(**kwargs)
  3. self._docs = DocumentArray()
  4. @requests(on='/index')
  5. def index(self, docs: DocumentArray, **kwargs):
  6. self._docs.extend(docs)
  7. return DocumentArray([Document(id=d.id) for d in docs])
  1. #### Pass/change request parameters
  2. In this example, `MyExec2` receives the parameters `{'top_k': 10}` from `MyExec1` when the Flow containing `MyExec1 -> MyExec2` in order.

from jina import requests, Document, Executor

class MyExec1(Executor):

  1. @requests(on='/index')
  2. def index(self, **kwargs):
  3. return {'top_k': 10}

class MyExec2(Executor):

  1. @requests(on='/index')
  2. def index(self, parameters, **kwargs):
  3. self.docs[:int(parameters['top_k']))

```