Remarks

Joining/Reducing

If you have multiple Executors running in parallel and one deployment that needs them, merging of the resulting Documents is applied automatically if you don’t specify uses or uses_before. You can also customize the default reducing logic using a Merger Executor in uses_before or uses.

Combining docs from multiple requests is already done before feeding them to the Executor’s function. Hence, simple joining is just returning this docs. However, if you wish to perform more complex logic, you can make your endpoints expect a docs_matrix object which is a list of DocumentArrays coming from all Executors. You can then process them and return the resulting DocumentArray

  1. from jina import Executor, requests, Flow, Document
  2. def print_fn(resp):
  3. for doc in resp.docs:
  4. print(doc.text)
  5. class B(Executor):
  6. @requests
  7. def foo(self, docs, **kwargs):
  8. # 3 docs
  9. for idx, d in enumerate(docs):
  10. d.text = f'hello {idx}'
  11. class A(Executor):
  12. @requests
  13. def A(self, docs, **kwargs):
  14. # 3 docs
  15. for idx, d in enumerate(docs):
  16. d.text = f'world {idx}'

Default Reduce

  1. f = Flow().add(uses=A).add(uses=B, needs='gateway').add(needs=['executor0', 'executor1'])
  2. with f:
  3. f.post(on='/some_endpoint',
  4. inputs=[Document() for _ in range(3)],
  5. on_done=print_fn)
  1. hello 0
  2. hello 1
  3. hello 2

Simple Custom Reduce

  1. class C(Executor):
  2. @requests
  3. def foo(self, docs, **kwargs):
  4. # 6 docs
  5. return docs
  6. f = Flow().add(uses=A).add(uses=B, needs='gateway').add(uses=C, needs=['executor0', 'executor1'])
  7. with f:
  8. f.post(on='/some_endpoint',
  9. inputs=[Document() for _ in range(3)],
  10. on_done=print_fn)
  1. world 0
  2. world 1
  3. world 2
  4. hello 0
  5. hello 1
  6. hello 2

Custom Reduce with docs_matrix

  1. class C(Executor):
  2. @requests
  3. def foo(self, docs_matrix: List[DocumentArray], **kwargs):
  4. da = DocumentArray()
  5. for doc1, doc2 in zip(docs_matrix[0], docs_matrix[1]):
  6. da.append(Document(text=f'{doc1.text}-{doc2.text}'))
  7. return da
  8. f = Flow().add(uses=A).add(uses=B, needs='gateway').add(uses=C, needs=['executor0', 'executor1'])
  9. with f:
  10. f.post(on='/some_endpoint',
  11. inputs=[Document() for _ in range(3)],
  12. on_done=print_fn)
  1. world 0-hello 0
  2. world 1-hello 1
  3. world 2-hello 2

Custom Reduce with modification

You can also modify the Documents while merging:

  1. class C(Executor):
  2. @requests
  3. def foo(self, docs, **kwargs):
  4. # 6 docs
  5. for d in docs:
  6. d.text += '!!!'
  7. return docs
  8. f = Flow().add(uses=A).add(uses=B, needs='gateway').add(uses=C, needs=['executor0', 'executor1'])
  9. with f:
  10. f.post(on='/some_endpoint',
  11. inputs=[Document() for _ in range(3)],
  12. on_done=print_fn)
  1. hello 0!!!
  2. hello 1!!!
  3. hello 2!!!
  4. world 0!!!
  5. world 1!!!
  6. world 2!!!

multiprocessing Spawn

Few cases require to use spawn start method for multiprocessing. (e.g.- Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the ‘spawn’ start method)

  • Please set JINA_MP_START_METHOD=spawn before starting the Python script to enable this.

    Hint

    There’s no need to set this for Windows, as it only supports spawn method for multiprocessing.

  • Define & start the Flow via an explicit function call inside if __name__ == '__main__'. For example

    ✅ Do

    1. from jina import Flow, Executor, requests
    2. class CustomExecutor(Executor):
    3. @requests
    4. def foo(self, **kwargs):
    5. ...
    6. def main():
    7. f = Flow().add(uses=CustomExecutor)
    8. with f:
    9. ...
    10. if __name__ == '__main__':
    11. main()

    😔 Don’t

    1. from jina import Flow, Executor, requests
    2. class CustomExecutor(Executor):
    3. @requests
    4. def foo(self, **kwargs):
    5. ...
    6. f = Flow().add(uses=CustomExecutor)
    7. with f:
    8. ...
    9. """
    10. # error
    11. This probably means that you are not using fork to start your
    12. child processes and you have forgotten to use the proper idiom
    13. in the main module:
    14. if _name_ == '_main_':
    15. freeze_support()
    16. ...
    17. The "freeze_support()" line can be omitted if the program
    18. is not going to be frozen to produce an executable.
    19. """
  • Declare Executors on the top-level of the module

    ✅ Do

    1. class CustomExecutor(Executor):
    2. @requests
    3. def foo(self, **kwargs):
    4. ...
    5. def main():
    6. f = Flow().add(uses=Executor)
    7. with f:
    8. ...

    😔 Don’t

    1. def main():
    2. class CustomExecutor(Executor):
    3. @requests
    4. def foo(self, **kwargs):
    5. ...
    6. f = Flow().add(uses=Executor)
    7. with f:
    8. ...
  • Avoid un-picklable objects

    Here’s a list of types that can be pickled in Python. Since spawn relies on pickling, we should avoid using code that cannot be pickled.

    Hint

    Here are a few errors which indicates that you are using some code that is not pickable.

    1. pickle.PicklingError: Can't pickle: it's not the same object
    2. AssertionError: can only join a started process

    Inline functions, such as nested or lambda functions are not picklable. Use functools.partial instead.

  • Always provide absolute path

    While passing filepaths to different jina arguments (e.g.- uses, py_modules), always pass the absolute path.

Debugging Executor in a Flow

Standard Python breakpoints will not work inside Executor methods when called inside a Flow context manager. Nevertheless, import epdb; epdb.set_trace() will work just as a native python breakpoint. Note that you need to pip install epdb to have acces to this type of breakpoints.

✅ Do

  1. from jina import Flow, Executor, requests
  2. class CustomExecutor(Executor):
  3. @requests
  4. def foo(self, **kwargs):
  5. a = 25
  6. import epdb; epdb.set_trace()
  7. print(f'\n\na={a}\n\n')
  8. def main():
  9. f = Flow().add(uses=CustomExecutor)
  10. with f:
  11. f.post(on='')
  12. if __name__ == '__main__':
  13. main()

😔 Don’t

  1. from jina import Flow, Executor, requests
  2. class CustomExecutor(Executor):
  3. @requests
  4. def foo(self, **kwargs):
  5. a = 25
  6. breakpoint()
  7. print(f'\n\na={a}\n\n')
  8. def main():
  9. f = Flow().add(uses=CustomExecutor)
  10. with f:
  11. f.post(on='')
  12. if __name__ == '__main__':
  13. main()