Flow IO

.post() is the core method for sending data to a Flow object, it provides multiple callbacks for fetching results from the Flow.

  1. from jina import Flow
  2. f = Flow().add(...)
  3. with f:
  4. f.post(...)

Caution

.post() must be called inside the with context.

Hint

You can also use CRUD methods (index, search, update, delete) which are just sugary syntax of post with on='/index' , on='/search', etc. Precisely, they are defined as:

  1. index = partialmethod(post, '/index')
  2. search = partialmethod(post, '/search')
  3. update = partialmethod(post, '/update')
  4. delete = partialmethod(post, '/delete')

Request data

Request data can be a single Document object, an iterator of Document, a generator of Document, a DocumentArray object, and None.

For example:

  1. from jina import Flow, DocumentArray, Document, DocumentArrayMemmap
  2. d1 = Document(content='hello')
  3. d2 = Document(content='world')
  4. def doc_gen():
  5. for j in range(10):
  6. yield Document(content=f'hello {j}')
  7. with Flow() as f:
  8. f.post('/', d1) # Single document
  9. f.post('/', [d1, d2]) # a list of Document
  10. f.post('/', doc_gen) # Document generator
  11. f.post('/', DocumentArray([d1, d2])) # DocumentArray
  12. f.post('/', DocumentArrayMemmap('./my-mmap')) # DocumentArray
  13. f.post('/') # empty

Document module provides some methods that lets you build Document generator, e.g. from_csv , from_files, from_ndarray, from_ndjson. They can be used in conjunction with .post(), e.g.

  1. from jina import Flow
  2. from jina.types.document.generators import from_csv
  3. f = Flow()
  4. with f, open('my.csv') as fp:
  5. f.index(from_csv(fp, field_resolver={'question': 'text'}))

Request parameters

To send parameters to the Executor, use

  1. from jina import Document, Executor, Flow, requests
  2. class MyExecutor(Executor):
  3. @requests
  4. def foo(self, parameters, **kwargs):
  5. print(parameters['hello'])
  6. f = Flow().add(uses=MyExecutor)
  7. with f:
  8. f.post('/', Document(), parameters={'hello': 'world'})

Note

You can send a parameters-only data request via:

  1. with f:
  2. f.post('/', parameters={'hello': 'world'})

This is useful to control Executor objects in the runtime.

If user wants different executors to have different values of the same parameters, one can specify specific parameters for the specific executor by adding a dictionary inside parameters with the executor name as key. Jina will then take all these specific parameters and copy to the root of the parameters dictionary before calling the executor method.

  1. from typing import Optional
  2. from jina import Executor, requests, DocumentArray, Flow
  3. class MyExecutor(Executor):
  4. def __init__(self, default_param: int = 1, *args, **kwargs):
  5. super().__init__(*args, **kwargs)
  6. self.default_param = default_param
  7. @requests
  8. def foo(self, docs: Optional[DocumentArray], parameters: dict, **kwargs):
  9. param = parameters.get('param', self.default_param)
  10. # param may be overriden for this specific request.
  11. # The first instance will receive 10, and the second one will receive 5
  12. if self.metas.name == 'my-executor-1':
  13. assert param == 10
  14. elif self.metas.name == 'my-executor-2':
  15. assert param == 5
  16. with (Flow().
  17. add(uses={'jtype': 'MyExecutor', 'metas': {'name': 'my-executor-1'}}).
  18. add(uses={'jtype': 'MyExecutor', 'metas': {'name': 'my-executor-2'}})) as f:
  19. f.post(on='/endpoint', inputs=DocumentArray([]), parameters={'param': 5, 'my-executor-1': {'param': 10}})

Note, as parameters does not have a fixed schema, it is declared with type google.protobuf.Struct in the RequestProto protobuf declaration. However, google.protobuf.Struct follows the JSON specification and does not differentiate int from float. So, data of type int in parameters will be casted to float when request is sent to executor.

As a result, users need be explicit and cast the data to the expected type as follows.

✅ Do

  1. class MyExecutor(Executor):
  2. animals = ['cat', 'dog', 'turtle']
  3. @requests
  4. def foo(self, docs, parameters: dict, **kwargs):
  5. # need to cast to int since list indices must be integers not float
  6. index = int(parameters.get('index', 0))
  7. assert self.animals[index] == 'dog'
  8. with Flow().add(uses=MyExecutor) as f:
  9. f.post(on='/endpoint', inputs=DocumentArray([]), parameters={'index': 1})

😔 Don’t

  1. class MyIndexer(Executor):
  2. animals = ['cat', 'dog', 'turtle']
  3. @requests
  4. def foo(self, docs, parameters: dict, **kwargs):
  5. # ERROR: list indices must be integer not float
  6. index = parameters.get('index', 0)
  7. assert self.animals[index] == 'dog'
  8. with Flow().add(uses=MyExecutor) as f:
  9. f.post(on='/endpoint',
  10. inputs=DocumentArray([]), parameters={'index': 1})

Size of request

You can control how many Documents in each request by request_size. Say your inputs has length of 100, whereas you request_size is set to 10. Then f.post will send ten requests and return 10 responses:

  1. from jina import Flow, Document
  2. f = Flow()
  3. with f:
  4. f.post('/', (Document() for _ in range(100)), request_size=10)
  1. [email protected][L]:ready and listening
  2. [email protected][I]:🎉 Flow is ready to use!
  3. 🔗 Protocol: GRPC
  4. 🏠 Local access: 0.0.0.0:44249
  5. 🔒 Private network: 192.168.1.100:44249
  6. 🌐 Public address: 197.28.126.36:44249

To see that more clearly, you can turn on the progress-bar by show_progress.

  1. with f:
  2. f.post('/', (Document() for _ in range(100)), request_size=10, show_progress=True)
  1. [email protected][L]:ready and listening
  2. [email protected][I]:🎉 Flow is ready to use!
  3. 🔗 Protocol: GRPC
  4. 🏠 Local access: 0.0.0.0:59109
  5. 🔒 Private network: 192.168.1.100:59109
  6. 🌐 Public address: 197.28.126.36:59109
  7. |██████████ | ⏱️ 0.0s 🐎 429.0 RPS 10 requests done in 0 seconds 🐎 425.1 RPS

Limiting outstanding requests

You can control the number of requests fetched at a time from the Client generator into the Executor using prefetch argument, e.g.- Setting prefetch=2 would make sure only 2 requests reach the Executors at a time, hence controlling the overload. By default, prefetch is disabled (set to 0). In cases where an Executor is a slow worker, you can assign a higher value to prefetch.

  1. def requests_generator():
  2. while True:
  3. yield Document(...)
  4. class MyExecutor(Executor):
  5. @requests
  6. def foo(self, **kwargs):
  7. slow_operation()
  8. # Makes sure only 2 requests reach the Executor at a time.
  9. with Flow(prefetch=2).add(uses=MyExecutor) as f:
  10. f.post(on='/', inputs=requests_generator)

Danger

When working with very slow executors and a big amount of data, you must set prefetch to some small number to prevent OOM. If you are unsure, always set prefetch=1.

Response result

Once a request is returned, callback functions are fired. Jina Flow implements a Promise-like interface. You can add callback functions on_done, on_error, on_always to hook different events.

In Jina, callback function’s first argument is a jina.types.request.Response object. Hence, you can annotate the callback function via:

  1. from jina.types.request import Response
  2. def my_callback(rep: Response):
  3. ...

Response object has many attributes, probably the most popular one is Response.docs, where you can access all Document as an DocumentArray.

In the example below, our Flow passes the message then prints the result when successful. If something goes wrong, it beeps. Finally, the result is written to output.txt.

  1. from jina import Document, Flow
  2. def beep(*args):
  3. # make a beep sound
  4. import sys
  5. sys.stdout.write('\a')
  6. with Flow().add() as f, open('output.txt', 'w') as fp:
  7. f.post('/',
  8. Document(),
  9. on_done=print,
  10. on_error=beep,
  11. on_always=lambda x: x.docs.save(fp))

Get response documents

In some scenarios (e.g. testing), you may want to get the all response Documents in a single DocumentArray and then process them, instead of processing responses on the fly. To do that, you can turn on return_results:

  1. from jina import Flow, Document
  2. f = Flow()
  3. with f:
  4. docs = f.post('/', (Document() for _ in range(100)), request_size=10, return_results=True, on_done=lambda x: x)
  5. print(docs)
  1. [email protected][I]:🎉 Flow is ready to use!
  2. 🔗 Protocol: GRPC
  3. 🏠 Local access: 0.0.0.0:59275
  4. 🔒 Private network: 192.168.1.100:59275
  5. 🌐 Public address: 197.28.126.36:59275
  6. <docarray.array.document.DocumentArray (length=100) at 140181102572448>

If no callback is provided to on_done or on_always, return_results=True gets set automatically:

  1. from jina import Flow, Document
  2. f = Flow()
  3. with f:
  4. docs = f.post('/', (Document() for _ in range(100)), request_size=10, return_results=False)
  5. print(docs)
  1. [email protected][I]:🎉 Flow is ready to use!
  2. 🔗 Protocol: GRPC
  3. 🏠 Local access: 0.0.0.0:59275
  4. 🔒 Private network: 192.168.1.100:59275
  5. 🌐 Public address: 197.28.126.36:59275
  6. <docarray.array.document.DocumentArray (length=100) at 140696905030480>
  1. ```{admonition} Caution
  2. :class: caution
  3. Turning on `return_results` breaks the streaming of the system. If you are sending 1000 requests,
  4. then `return_results=True` means you will get nothing until the 1000th response returns. Moreover, if each response
  5. takes 10MB memory, it means you will consume upto 10GB memory! On contrary, with callback and `return_results=False`,
  6. your memory usage will stay constant at 10MB.

Environment Variables

In some scenarios, you may want to set environment variables to the Flow and use it inside Executor. To do that, you can use env:

  1. import os
  2. from jina import Flow, Executor, requests
  3. class MyExecutor(Executor):
  4. @requests
  5. def foo(self, **kwargs):
  6. print('MY_ENV', '->', os.environ.get('MY_ENV'))
  7. f = Flow().add(uses=MyExecutor, env={'MY_ENV': 'MY_ENV_VALUE'})
  8. with f:
  9. f.post('/foo')
  1. [email protected][I]:🎉 Flow is ready to use!
  2. 🔗 Protocol: GRPC
  3. 🏠 Local access: 0.0.0.0:51587
  4. 🔒 Private network: 172.18.0.253:51587
  5. 🌐 Public address: 94.135.231.132:51587
  6. MY_ENV -> MY_ENV_VALUE
  7. Process finished with exit code 0