Add Executor

.add() is the core method to add an Executor to a Flow object. Each add creates a new Executor, and these Executors can be run as a local thread/process, a remote process, inside a Docker container, or even inside a remote Docker container.

Tip

You can see the exhaustive list of options for adding an Executor here.

Chain .add()

Chaining .add()s creates a sequential Flow.

  1. from jina import Flow
  2. f = Flow().add().add().add().add()

../../../_images/chain-flow.svg

Define Executor via uses

uses parameter to specify the Executor.

uses accepts multiple value types including class name, Docker image, (inline) YAML. Therefore, you can add an executor via:

Class Name

  1. from jina import Flow, Executor
  2. class MyExecutor(Executor):
  3. ...
  4. f = Flow().add(uses=MyExecutor)
  5. with f:
  6. ...

YAML file

myexec.py:

  1. from jina import Executor
  2. class MyExecutor(Executor):
  3. def __init__(self, bar, *args, **kwargs):
  4. super().__init__()
  5. self.bar = bar
  6. ...

myexec.yml

  1. jtype: MyExecutor
  2. with:
  3. bar: 123
  4. metas:
  5. name: awesomeness
  6. description: my first awesome executor
  7. py_modules: myexec.py
  8. requests:
  9. /random_work: foo
  1. from jina import Flow
  2. f = Flow().add(uses='myexec.yml')
  3. with f:
  4. ...

Note

YAML file can be also inline:

  1. from jina import Flow
  2. f = (Flow()
  3. .add(uses='''
  4. jtype: MyExecutor
  5. with:
  6. bar: 123
  7. metas:
  8. name: awesomeness
  9. description: my first awesome executor
  10. requests:
  11. /random_work: foo
  12. '''))

Dict

  1. from jina import Flow
  2. from jina import Executor
  3. class MyExecutor(Executor):
  4. ...
  5. f = Flow().add(
  6. uses={
  7. 'jtype': 'MyExecutor',
  8. 'with': {'bar': 123}
  9. })

Executor discovery

By default, the Flow will attempt to retrieve the Executors’ source files and YAML config files from the working directory and other paths set in the PATH environment variable. If your Executor’s source files and YAML config are located elsewhere, you can specify their locations using the parameter extra_search_paths.

For example, suppose we have the following project structure where app/ represents the working directory:

  1. .
  2. ├── app
  3. └── main.py
  4. └── executor
  5. ├── config.yml
  6. └── my_executor.py

executor/my_executor.py:

  1. from jina import Executor, DocumentArray, requests
  2. class MyExecutor(Executor):
  3. @requests
  4. def foo(self, docs: DocumentArray, **kwargs):
  5. pass

executor/config.yml:

  1. jtype: MyExecutor
  2. metas:
  3. py_modules:
  4. - executor.py

Now, in app/main.py, to correctly load the Executor, you can specify the directory of the Executor like so:

  1. from jina import Flow, Document
  2. f = Flow(extra_search_paths=['../executor']).add(uses='config.yml')
  3. with f:
  4. r = f.post('/', inputs=Document())

Important

If you are creating a Flow from a YAML config file which is located outside the working directory, you just need to specify a correct relative or absolute path of the Flow’s YAML config file and make all paths to Executor config files relative to the Flow config file. The Flow will infer its config file location and add it to extra_search_paths:

  1. .
  2. ├── app
  3. └── main.py
  4. ├── flow
  5. └── flow.yml
  6. └── executor
  7. ├── config.yml
  8. └── my_executor.py

flow.yml:

  1. jtype: Flow
  2. executors:
  3. - name: executor
  4. uses: ../executor/config.yml

main.py:

  1. from jina import Flow, Document
  2. f = Flow.load_config('../flow/flow.yml')
  3. with f:
  4. r = f.post('/', inputs=Document())

Override Executor config

You can override an executor’s meta configs when creating a flow.

Override metas configuration

To override the metas configuration of an executor, use uses_metas:

  1. from jina import Executor, requests, Flow
  2. class MyExecutor(Executor):
  3. @requests
  4. def foo(self, docs, **kwargs):
  5. print(self.metas.workspace)
  6. flow = Flow().add(
  7. uses=MyExecutor,
  8. uses_metas={'workspace': 'different_workspace'},
  9. )
  10. with flow as f:
  11. f.post('/')
  1. [email protected][L]:ready and listening
  2. [email protected][L]:ready and listening
  3. [email protected][I]:🎉 Flow is ready to use!
  4. 🔗 Protocol: GRPC
  5. 🏠 Local access: 0.0.0.0:58827
  6. 🔒 Private network: 192.168.1.101:58827
  7. different_workspace

Override with configuration

To override the with configuration of an executor, use uses_with. The with configuration refers to user-defined constructor kwargs.

  1. from jina import Executor, requests, Flow
  2. class MyExecutor(Executor):
  3. def __init__(self, param1=1, param2=2, param3=3, *args, **kwargs):
  4. super().__init__(*args, **kwargs)
  5. self.param1 = param1
  6. self.param2 = param2
  7. self.param3 = param3
  8. @requests
  9. def foo(self, docs, **kwargs):
  10. print('param1:', self.param1)
  11. print('param2:', self.param2)
  12. print('param3:', self.param3)
  13. flow = Flow().add(uses=MyExecutor, uses_with={'param1': 10, 'param3': 30})
  14. with flow as f:
  15. f.post('/')
  1. [email protected][L]:ready and listening
  2. [email protected][L]:ready and listening
  3. [email protected][I]:🎉 Flow is ready to use!
  4. 🔗 Protocol: GRPC
  5. 🏠 Local access: 0.0.0.0:32825
  6. 🔒 Private network: 192.168.1.101:32825
  7. 🌐 Public address: 197.28.82.165:32825
  8. param1: 10
  9. param2: 2
  10. param3: 30

Override requests configuration

You can override the requests configuration of an executor and bind methods to endpoints that you provide. In the following codes, we replace the endpoint /foo binded to the foo() function with /non_foo and add a new endpoint /bar for binding bar(). Note the all_req() function is binded to all the endpoints except those explicitly binded to other functions, i.e. /non_foo and /bar.

  1. from jina import Executor, requests, Flow
  2. class MyExecutor(Executor):
  3. @requests
  4. def all_req(self, parameters, **kwargs):
  5. print(f'all req {parameters.get("recipient")}')
  6. @requests(on='/foo')
  7. def foo(self, parameters, **kwargs):
  8. print(f'foo {parameters.get("recipient")}')
  9. def bar(self, parameters, **kwargs):
  10. print(f'bar {parameters.get("recipient")}')
  11. flow = Flow().add(uses=MyExecutor, uses_requests={'/bar': 'bar', '/non_foo': 'foo', })
  12. with flow as f:
  13. f.post('/bar', parameters={'recipient': 'bar()'})
  14. f.post('/non_foo', parameters={'recipient': 'foo()'})
  15. f.post('/foo', parameters={'recipient': 'all_req()'})
  1. [email protected][L]:ready and listening
  2. [email protected][L]:ready and listening
  3. [email protected][I]:🎉 Flow is ready to use!
  4. 🔗 Protocol: GRPC
  5. 🏠 Local access: 0.0.0.0:36507
  6. 🔒 Private network: 192.168.1.101:36507
  7. 🌐 Public address: 197.28.82.165:36507
  8. bar
  9. foo

Add remote Executor

Add an already spawned Executor

A Flow does not have to be local-only. You can use any Executor on remote(s).

The external Executor in the following two use-cases could have been spawned

  • either by another Flow

  • or by the jina executor CLI command

  1. f.add(host='localhost', port_in=12345, external=True)
  2. f.add(host='123.45.67.89', port_in=12345, external=True)

Spawn remote Executors using JinaD

Commonly used arguments for deployment in

NamedefaultDescription
host0.0.0.0The host of the machine. Can be an ip address or DNS name (e.g. 0.0.0.0, my_encoder.jina.ai)
port_exposerandomly initializedPort of JinaD on the remote machine.
port_inrandomly initializedPort for incoming traffic for the Executor.
port_outrandomly initializedPort for outgoing traffic for the Executor. This is only used in the remote-local use-case described below.
connect_to_predecessorFalseForces a Head to connect to the previous Tail. This is only used in the remote-local use-case described below.
externalFalseStops Flow from context managing an Executor. This allows spawning of an external Executor and reusing across multiple Flows.
uses, uses_before and uses_after prefixNo prefixWhen prefixing one of the uses arguments with docker or jinahub+docker, the Executor does not run natively, but is spawned inside a container.

See Also

JinaD

JinaHub

Forcing an Executor in the remote-local config

Sometimes you want to use a remote Executor in your local Flow (e.g. using an expensive encoder on a remote GPU). Then the remote cannot talk back to the next local Executor directly. This is similar to a server that cannot talk to a client before the client has opened a connection. The Flow inside Jina has an auto-detection mechanism for such cases. Anyhow, in some networking setups this mechanism fails. Then you can force this by hand by setting the connect_to_predecessor argument and port_out to the Executor in front.

  1. f.add(name='remote', host='123.45.67.89', port_out=23456).add(name='local', connect_to_predecessor=True)

Summary: common patterns

DescriptionUsage (f = Flow(…))
Local native Executor in the contextf.add(uses=MyExecutor)
Local native Executor from a YAMLf.add(uses=’mwu_encoder.yml’)
Executor from Jina Hubf.add(uses=’jinahub://MyExecutor’)
Dockerized Executor from Jina Hubf.add(uses=’jinahub+docker://MyExecutor’)
Generalized dockerized Executorf.add(uses=’docker://MyExecutor’)
Existing remote Executorf.add(host=’123.45.67.89’, port_in=12345, external=True)
Spawn Remote Executor (via jinad on Remote)f.add(uses=’mwu_encoder.yml’, host=’123.45.67.89’, port_in=12345, port_expose=8080)

For a full list of arguments, please check jina executor --help.