jina package

Subpackages

Submodules

Module contents

Top-level module of Jina.

The primary function of this module is to import all of the public Jina interfaces into a single place. The interfaces themselves are located in sub-modules, as described below.

class jina.AsyncFlow(**, asyncio: Optional[bool] = ‘False’, host: Optional[str] = “‘0.0.0.0’”, https: Optional[bool] = ‘False’, port: Optional[int] = ‘None’, protocol: Optional[str] = “‘GRPC’”, proxy: Optional[bool] = ‘False’, results_as_docarray: Optional[bool] = ‘False’, **kwargs*)[source]

class jina.AsyncFlow(**, compress: Optional[str] = “‘NONE’”, compress_min_bytes: Optional[int] = ‘1024’, compress_min_ratio: Optional[float] = ‘1.1’, connection_list: Optional[str] = ‘None’, cors: Optional[bool] = ‘False’, daemon: Optional[bool] = ‘False’, default_swagger_ui: Optional[bool] = ‘False’, deployments_addresses: Optional[str] = “‘{}’”, description: Optional[str] = ‘None’, env: Optional[dict] = ‘None’, expose_endpoints: Optional[str] = ‘None’, expose_public: Optional[bool] = ‘False’, graph_description: Optional[str] = “‘{}’”, host: Optional[str] = “‘0.0.0.0’”, host_in: Optional[str] = “‘0.0.0.0’”, log_config: Optional[str] = ‘None’, name: Optional[str] = “‘gateway’”, native: Optional[bool] = ‘False’, no_crud_endpoints: Optional[bool] = ‘False’, no_debug_endpoints: Optional[bool] = ‘False’, polling: Optional[str] = “‘ANY’”, port_expose: Optional[int] = ‘None’, port_in: Optional[int] = ‘None’, prefetch: Optional[int] = ‘0’, protocol: Optional[str] = “‘GRPC’”, proxy: Optional[bool] = ‘False’, py_modules: Optional[List[str]] = ‘None’, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, replicas: Optional[int] = ‘1’, runtime_backend: Optional[str] = “‘PROCESS’”, runtime_cls: Optional[str] = “‘GRPCGatewayRuntime’”, shards: Optional[int] = ‘1’, timeout_ctrl: Optional[int] = ‘60’, timeout_ready: Optional[int] = ‘600000’, title: Optional[str] = ‘None’, uses: Optional[Union[str, Type[BaseExecutor], dict]] = “‘BaseExecutor’”, uses_after_address: Optional[str] = ‘None’, uses_before_address: Optional[str] = ‘None’, uses_metas: Optional[dict] = ‘None’, uses_requests: Optional[dict] = ‘None’, uses_with: Optional[dict] = ‘None’, uvicorn_kwargs: Optional[dict] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*)

class jina.AsyncFlow(**, env: Optional[dict] = ‘None’, inspect: Optional[str] = “‘COLLECT’”, log_config: Optional[str] = ‘None’, name: Optional[str] = ‘None’, polling: Optional[str] = “‘ANY’”, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, timeout_ctrl: Optional[int] = ‘60’, uses: Optional[str] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*)

Bases: jina.clients.mixin.AsyncPostMixin, jina.orchestrate.flow.base.Flow

AsyncFlow is the asynchronous version of the Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

  1. from jina import AsyncFlow
  2. from jina.types.document.generators import from_ndarray
  3. import numpy as np
  4. with AsyncFlow().add() as f:
  5. await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.

See also

Asynchronous in REPL: Autoawait

https://ipython.readthedocs.io/en/stable/interactive/autoawait.html

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently.

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.

jina.Client(args=None, \*kwargs*)[source]

Jina Python client.

  • Parameters

    • args (Optional[ForwardRef]) – Namespace args.

    • kwargs – Additional arguments.

    Return type

    Union[ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef]

    Returns

    An instance of GRPCClient or WebSocketClient.

class jina.Document[source]

class jina.Document(_obj: Optional[Document] = None, copy: bool = False)

class jina.Document(_obj: Optional[Dict], copy: bool = False, field_resolver: Optional[Dict[str, str]] = None, unknown_fields_handler: str = ‘catch’)

class jina.Document(parent_id: Optional[str] = None, granularity: Optional[int] = None, adjacency: Optional[int] = None, blob: Optional[bytes] = None, tensor: Optional[ArrayType] = None, mime_type: Optional[str] = None, text: Optional[str] = None, content: Optional[DocumentContentType] = None, weight: Optional[float] = None, uri: Optional[str] = None, tags: Optional[Dict[str, StructValueType]] = None, offset: Optional[float] = None, location: Optional[List[float]] = None, embedding: Optional[ArrayType] = None, modality: Optional[str] = None, evaluations: Optional[Dict[str, Dict[str, StructValueType]]] = None, scores: Optional[Dict[str, Dict[str, StructValueType]]] = None, chunks: Optional[Sequence[Document]] = None, matches: Optional[Sequence[Document]] = None)

Bases: docarray.document.mixins.AllMixins, docarray.base.BaseDCType

class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, copy: bool = False)[source]

class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = ‘sqlite’, config: Optional[Union[SqliteConfig, Dict]] = None)

class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = ‘weaviate’, config: Optional[Union[WeaviateConfig, Dict]] = None)

class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = ‘pqlite’, config: Optional[Union[PqliteConfig, Dict]] = None)

Bases: docarray.array.mixins.AllMixins, docarray.array.base.BaseDocumentArray

jina.Executor

alias of jina.serve.executors.BaseExecutor

class jina.Flow(**, asyncio: Optional[bool] = ‘False’, host: Optional[str] = “‘0.0.0.0’”, https: Optional[bool] = ‘False’, port: Optional[int] = ‘None’, protocol: Optional[str] = “‘GRPC’”, proxy: Optional[bool] = ‘False’, results_as_docarray: Optional[bool] = ‘False’, **kwargs*)[source]

class jina.Flow(**, compress: Optional[str] = “‘NONE’”, compress_min_bytes: Optional[int] = ‘1024’, compress_min_ratio: Optional[float] = ‘1.1’, connection_list: Optional[str] = ‘None’, cors: Optional[bool] = ‘False’, daemon: Optional[bool] = ‘False’, default_swagger_ui: Optional[bool] = ‘False’, deployments_addresses: Optional[str] = “‘{}’”, description: Optional[str] = ‘None’, env: Optional[dict] = ‘None’, expose_endpoints: Optional[str] = ‘None’, expose_public: Optional[bool] = ‘False’, graph_description: Optional[str] = “‘{}’”, host: Optional[str] = “‘0.0.0.0’”, host_in: Optional[str] = “‘0.0.0.0’”, log_config: Optional[str] = ‘None’, name: Optional[str] = “‘gateway’”, native: Optional[bool] = ‘False’, no_crud_endpoints: Optional[bool] = ‘False’, no_debug_endpoints: Optional[bool] = ‘False’, polling: Optional[str] = “‘ANY’”, port_expose: Optional[int] = ‘None’, port_in: Optional[int] = ‘None’, prefetch: Optional[int] = ‘0’, protocol: Optional[str] = “‘GRPC’”, proxy: Optional[bool] = ‘False’, py_modules: Optional[List[str]] = ‘None’, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, replicas: Optional[int] = ‘1’, runtime_backend: Optional[str] = “‘PROCESS’”, runtime_cls: Optional[str] = “‘GRPCGatewayRuntime’”, shards: Optional[int] = ‘1’, timeout_ctrl: Optional[int] = ‘60’, timeout_ready: Optional[int] = ‘600000’, title: Optional[str] = ‘None’, uses: Optional[Union[str, Type[BaseExecutor], dict]] = “‘BaseExecutor’”, uses_after_address: Optional[str] = ‘None’, uses_before_address: Optional[str] = ‘None’, uses_metas: Optional[dict] = ‘None’, uses_requests: Optional[dict] = ‘None’, uses_with: Optional[dict] = ‘None’, uvicorn_kwargs: Optional[dict] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*)

class jina.Flow(**, env: Optional[dict] = ‘None’, inspect: Optional[str] = “‘COLLECT’”, log_config: Optional[str] = ‘None’, name: Optional[str] = ‘None’, polling: Optional[str] = “‘ANY’”, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, timeout_ctrl: Optional[int] = ‘60’, uses: Optional[str] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*)

Bases: jina.clients.mixin.PostMixin, jina.jaml.JAMLCompatible, contextlib.ExitStack

Flow is how Jina streamlines and distributes Executors.

  • property last_deployment

    Last deployment

  • needs(needs, name=’joiner’, \args, **kwargs*)[source]

    Add a blocker to the Flow, wait until all pods defined in needs completed.

    • Parameters

      • needs (Union[Tuple[str], List[str]]) – list of service names to wait

      • name (str) – the name of this joiner, by default is joiner

      • args – additional positional arguments forwarded to the add function

      • kwargs – additional key value arguments forwarded to the add function

      Return type

      Flow

      Returns

      the modified Flow

  • needs_all(name=’joiner’, \args, **kwargs*)[source]

    Collect all hanging Deployments so far and add a blocker to the Flow; wait until all handing pods completed.

    • Parameters

      • name (str) – the name of this joiner (default is joiner)

      • args – additional positional arguments which are forwarded to the add and needs function

      • kwargs – additional key value arguments which are forwarded to the add and needs function

      Return type

      Flow

      Returns

      the modified Flow

  • add(**, connection_list: Optional[str] = ‘None’, daemon: Optional[bool] = ‘False’, docker_kwargs: Optional[dict] = ‘None’, entrypoint: Optional[str] = ‘None’, env: Optional[dict] = ‘None’, expose_public: Optional[bool] = ‘False’, external: Optional[bool] = ‘False’, force_update: Optional[bool] = ‘False’, gpus: Optional[str] = ‘None’, host: Optional[str] = “‘0.0.0.0’”, host_in: Optional[str] = “‘0.0.0.0’”, install_requirements: Optional[bool] = ‘False’, log_config: Optional[str] = ‘None’, name: Optional[str] = ‘None’, native: Optional[bool] = ‘False’, polling: Optional[str] = “‘ANY’”, port_in: Optional[int] = ‘None’, port_jinad: Optional[int] = ‘8000’, pull_latest: Optional[bool] = ‘False’, py_modules: Optional[List[str]] = ‘None’, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, quiet_remote_logs: Optional[bool] = ‘False’, replicas: Optional[int] = ‘1’, runtime_backend: Optional[str] = “‘PROCESS’”, runtime_cls: Optional[str] = “‘WorkerRuntime’”, shards: Optional[int] = ‘1’, timeout_ctrl: Optional[int] = ‘60’, timeout_ready: Optional[int] = ‘600000’, upload_files: Optional[List[str]] = ‘None’, uses: Optional[Union[str, Type[‘BaseExecutor’], dict]] = “‘BaseExecutor’”, uses_after: Optional[Union[str, Type[‘BaseExecutor’], dict]] = ‘None’, uses_after_address: Optional[str] = ‘None’, uses_before: Optional[Union[str, Type[‘BaseExecutor’], dict]] = ‘None’, uses_before_address: Optional[str] = ‘None’, uses_metas: Optional[dict] = ‘None’, uses_requests: Optional[dict] = ‘None’, uses_with: Optional[dict] = ‘None’, volumes: Optional[List[str]] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*) → Union[‘Flow’, ‘AsyncFlow’][source]

    Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with set() or deleted with remove()

    • Parameters

      • needs (Union[str, Tuple[str], List[str], None]) – the name of the Deployment(s) that this Deployment receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.

      • deployment_role (DeploymentRoleType) – the role of the Deployment, used for visualization and route planning

      • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

      • kwargs – other keyword-value arguments that the Deployment CLI supports

      Return type

      Flow

      Returns

      a (new) Flow object with modification

  • inspect(name=’inspect’, \args, **kwargs*)[source]

    Add an inspection on the last changed Deployment in the Flow

    Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

    1. Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow
    2. |
    3. -- PUB-SUB -- InspectDeployment (Hanging)

    In this way, InspectDeployment looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

    This function is very handy for introducing an Evaluator into the Flow.

    See also

    gather_inspect()

    • Parameters

      • name (str) – name of the Deployment

      • args – args for .add()

      • kwargs – kwargs for .add()

      Return type

      Flow

      Returns

      the new instance of the Flow

  • gather_inspect(name=’gather_inspect’, include_last_deployment=True, \args, **kwargs*)[source]

    Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.

    Note

    If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

    • Parameters

      • name (str) – the name of the gather Deployment

      • include_last_deployment (bool) – if to include the last modified Deployment in the Flow

      • args – args for .add()

      • kwargs – kwargs for .add()

      Return type

      Flow

      Returns

      the modified Flow or the copy of it

    See also

    inspect()

  • build(copy_flow=False)[source]

    Build the current Flow and make it ready to use

    Note

    No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

    • Parameters

      copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

      Return type

      Flow

      Returns

      the current Flow (by default)

    Note

    copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

    1. f = Flow()
    2. with f:
    3. f.index()
    4. with f.build(copy_flow=True) as fl:
    5. fl.search()
  • start()[source]

    Start to run all Deployments in this Flow.

    Remember to close the Flow with close().

    Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.orchestrate.pods.Pod

    • Returns

      this instance

  • property num_deployments: int

    Get the number of Deployments in this Flow

    • Return type

      int

  • property num_pods: int

    Get the number of pods (shards count) in this Flow

    • Return type

      int

  • property client: BaseClient

    Return a BaseClient object attach to this Flow.

  • plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]

    Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

    Example,

    1. flow = Flow().add(name='deployment_a').plot('flow.svg')
    • Parameters

      • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

      • vertical_layout (bool) – top-down or left-right layout

      • inline_display (bool) – show image directly inside the Jupyter Notebook

      • build (bool) – build the Flow first before plotting, gateway connection can be better showed

      • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

      Return type

      Flow

      Returns

      the Flow

  • property port_expose: int

    Return the exposed port of the gateway .. # noqa: DAR201

    • Return type

      int

  • property host: str

    Return the local address of the gateway .. # noqa: DAR201

    • Return type

      str

  • property address_private: str

    Return the private IP address of the gateway for connecting from other machine in the same network

    • Return type

      str

  • property address_public: str

    Return the public IP address of the gateway for connecting from other machine in the public network

    • Return type

      str

  • block(stop_event=None)[source]

    Block the Flow until stop_event is set or user hits KeyboardInterrupt

    • Parameters

      stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.

  • property protocol: jina.enums.GatewayProtocolType

    Return the protocol of this Flow

  • property workspace: str

    Return the workspace path of the flow.

    • Return type

      str

  • property workspace_id: Dict[str, str]

    Get all Deployments’ workspace_id values in a dict

    • Return type

      Dict[str, str]

  • property env: Optional[Dict]

    Get all envs to be set in the Flow

    • Return type

      Optional[Dict]

      Returns

      envs as dict

  • expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]

    expose_endpoint(exec_endpoint: str, **, path: Optional[str] = ‘None’, status_code: int = ‘200’, tags: Optional[List[str]] = ‘None’, summary: Optional[str] = ‘None’, description: Optional[str] = ‘None’, response_description: str = “‘Successful Response’”, deprecated: Optional[bool] = ‘None’, methods: Optional[List[str]] = ‘None’, operation_id: Optional[str] = ‘None’, response_model_by_alias: bool = ‘True’, response_model_exclude_unset: bool = ‘False’, response_model_exclude_defaults: bool = ‘False’, response_model_exclude_none: bool = ‘False’, include_in_schema: bool = ‘True’, name: Optional[str] = ‘None’*)

    Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

    After expose, you can send data request directly to http://hostname:port/endpoint.

    • Parameters

      exec_endpoint (str) – the endpoint string, by convention starts with /

    # noqa: DAR101 # noqa: DAR102

  • join(needs, name=’joiner’, \args, **kwargs*)

    Add a blocker to the Flow, wait until all pods defined in needs completed.

    • Parameters

      • needs (Union[Tuple[str], List[str]]) – list of service names to wait

      • name (str) – the name of this joiner, by default is joiner

      • args – additional positional arguments forwarded to the add function

      • kwargs – additional key value arguments forwarded to the add function

      Return type

      Flow

      Returns

      the modified Flow

  • rolling_update(deployment_name, uses_with=None)[source]

    Reload all replicas of a deployment sequentially

    • Parameters

      • deployment_name (str) – deployment to update

      • uses_with (Optional[Dict]) – a Dictionary of arguments to restart the executor with

  • to_k8s_yaml(output_base_path, k8s_namespace=None, k8s_connection_pool=True)[source]

    Converts the Flow into a set of yaml deployments to deploy in Kubernetes :type output_base_path: str :param output_base_path: The base path where to dump all the yaml files :type k8s_namespace: Optional[str] :param k8s_namespace: The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used. :type k8s_connection_pool: bool :param k8s_connection_pool: Boolean indicating wether the kubernetes connection pool should be used inside the Executor Runtimes.

  • to_docker_compose_yaml(output_path=None, network_name=None)[source]

    Converts the Flow into a yaml file to run with docker-compose up :type output_path: Optional[str] :param output_path: The output path for the yaml file :type network_name: Optional[str] :param network_name: The name of the network that will be used by the deployment name

  • scale(deployment_name, replicas)[source]

    Scale the amount of replicas of a given Executor.

    • Parameters

      • deployment_name (str) – deployment to update

      • replicas (int) – The number of replicas to scale to

  • property client_args: argparse.Namespace

    Get Client settings.

    # noqa: DAR201

    • Return type

      Namespace

  • property gateway_args: argparse.Namespace

    Get Gateway settings.

    # noqa: DAR201

    • Return type

      Namespace

  • update_network_interface(\*kwargs*)[source]

    Update the network interface of this Flow (affects Gateway & Client)

    • Parameters

      kwargs – new network settings

jina.requests(func=None, **, on=None*)[source]

@requests defines when a function will be invoked. It has a keyword on= to define the endpoint.

A class method decorated with plan @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.

  • Parameters

    • func (Callable[[ForwardRef, Dict, ForwardRef, List[ForwardRef], List[ForwardRef]], Union[ForwardRef, Dict, None]]) – the method to decorate

    • on (Union[str, Sequence[str], None]) – the endpoint string, by convention starts with /

    Returns

    decorated function