jina.serve.networking module

class jina.serve.networking.ReplicaList[source]

Bases: object

Maintains a list of connections to replicas and uses round robin for selecting a replica

  • add_connection(address)[source]

    Add connection with address to the connection list :type address: str :param address: Target address of this connection

  • async remove_connection(address)[source]

    Remove connection with address from the connection list :type address: str :param address: Remove connection for this address :returns: The removed connection or None if there was not any for the given address

  • get_next_connection()[source]

    Returns a connection from the list. Strategy is round robin :returns: A connection from the pool

  • get_all_connections()[source]

    Returns all available connections :returns: A complete list of all connections from the pool

  • has_connection(address)[source]

    Checks if a connection for ip exists in the list :type address: str :param address: The address to check :rtype: bool :returns: True if a connection for the ip exists in the list

  • has_connections()[source]

    Checks if this contains any connection :rtype: bool :returns: True if any connection is managed, False otherwise

  • async close()[source]

    Close all connections and clean up internal state

class jina.serve.networking.GrpcConnectionPool(logger=None)[source]

Bases: object

Manages a list of grpc connections.

  • Parameters

    logger (Optional[JinaLogger]) – the logger to use

  • send_request(request, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None)[source]

    Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request. :type request: Request :param request: a single request to send :type deployment: str :param deployment: name of the Jina deployment to send the message to :type head: bool :param head: If True it is send to the head, otherwise to the worker pods :type shard_id: Optional[int] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type polling_type: PollingType :param polling_type: defines if the message should be send to any or all pooled connections for the target :type endpoint: Optional[str] :param endpoint: endpoint to target with the request :rtype: List[Task] :return: list of asyncio.Task items for each send call

  • send_requests(requests, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None)[source]

    Send a request to target via one or all of the pooled connections, depending on polling_type

    • Parameters

      • requests (List[Request]) – request (DataRequest/ControlRequest) to send

      • deployment (str) – name of the Jina deployment to send the request to

      • head (bool) – If True it is send to the head, otherwise to the worker pods

      • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

      • polling_type (PollingType) – defines if the request should be send to any or all pooled connections for the target

      • endpoint (Optional[str]) – endpoint to target with the requests

      Return type

      List[Task]

      Returns

      list of asyncio.Task items for each send call

  • send_request_once(request, deployment, head=False, shard_id=None)[source]

    Send msg to target via only one of the pooled connections :type request: Request :param request: request to send :type deployment: str :param deployment: name of the Jina deployment to send the message to :type head: bool :param head: If True it is send to the head, otherwise to the worker pods :type shard_id: Optional[int] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :rtype: Task :return: asyncio.Task representing the send call

  • send_requests_once(requests, deployment, head=False, shard_id=None, endpoint=None)[source]

    Send a request to target via only one of the pooled connections

    • Parameters

      • requests (List[Request]) – request to send

      • deployment (str) – name of the Jina deployment to send the request to

      • head (bool) – If True it is send to the head, otherwise to the worker pods

      • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

      • endpoint (Optional[str]) – endpoint to target with the requests

      Return type

      Task

      Returns

      asyncio.Task representing the send call

  • add_connection(deployment, address, head=False, shard_id=None)[source]

    Adds a connection for a deployment to this connection pool

    • Parameters

      • deployment (str) – The deployment the connection belongs to, like ‘encoder’

      • head (Optional[bool]) – True if the connection is for a head

      • address (str) – Address used for the grpc connection, format is <host>:<port>

      • shard_id (Optional[int]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads

  • async remove_connection(deployment, address, head=False, shard_id=None)[source]

    Removes a connection to a deployment

    • Parameters

      • deployment (str) – The deployment the connection belongs to, like ‘encoder’

      • address (str) – Address used for the grpc connection, format is <host>:<port>

      • head (Optional[bool]) – True if the connection is for a head

      • shard_id (Optional[int]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads

      Returns

      The removed connection, None if it did not exist

  • start()[source]

    Starts the connection pool

  • async close()[source]

    Closes the connection pool

  • static get_grpc_channel(address, options=None, asyncio=False, https=False, root_certificates=None)[source]

    Creates a grpc channel to the given address

    • Parameters

      • address (str) – The address to connect to, format is <host>:<port>

      • options (Optional[list]) – A list of options to pass to the grpc channel

      • asyncio (Optional[bool]) – If True, use the asyncio implementation of the grpc channel

      • https (Optional[bool]) – If True, use https for the grpc channel

      • root_certificates (Optional[str]) – The path to the root certificates for https, only used if https is True

      Return type

      Channel

      Returns

      A grpc channel or an asyncio channel

  • static activate_worker_sync(worker_host, worker_port, target_head, shard_id=None)[source]

    Register a given worker to a head by sending an activate request

    • Parameters

      • worker_host (str) – the host address of the worker

      • worker_port (int) – the port of the worker

      • target_head (str) – address of the head to send the activate request to

      • shard_id (Optional[int]) – id of the shard the worker belongs to

      Return type

      ControlRequest

      Returns

      the response request

  • async static activate_worker(worker_host, worker_port, target_head, shard_id=None)[source]

    Register a given worker to a head by sending an activate request

    • Parameters

      • worker_host (str) – the host address of the worker

      • worker_port (int) – the port of the worker

      • target_head (str) – address of the head to send the activate request to

      • shard_id (Optional[int]) – id of the shard the worker belongs to

      Return type

      ControlRequest

      Returns

      the response request

  • async static deactivate_worker(worker_host, worker_port, target_head, shard_id=None)[source]

    Remove a given worker to a head by sending a deactivate request

    • Parameters

      • worker_host (str) – the host address of the worker

      • worker_port (int) – the port of the worker

      • target_head (str) – address of the head to send the deactivate request to

      • shard_id (Optional[int]) – id of the shard the worker belongs to

      Return type

      ControlRequest

      Returns

      the response request

  • static send_request_sync(request, target, timeout=100.0, https=False, root_certificates=None, endpoint=None)[source]

    Sends a request synchronically to the target via grpc

    • Parameters

      • request (Request) – the request to send

      • target (str) – where to send the request to, like 127.0.0.1:8080

      • timeout – timeout for the send

      • https – if True, use https for the grpc channel

      • root_certificates (Optional[str]) – the path to the root certificates for https, only used if https is True

      • endpoint (Optional[str]) – endpoint to target with the request

      Return type

      Request

      Returns

      the response request

  • static get_default_grpc_options()[source]

    Returns a list of default options used for creating grpc channels. Documentation is here https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h :returns: list of tuples defining grpc parameters

  • async static send_request_async(request, target, timeout=1.0, https=False, root_certificates=None)[source]

    Sends a request asynchronously to the target via grpc

    • Parameters

      • request (Request) – the request to send

      • target (str) – where to send the request to, like 127.0.0.1:8080

      • timeout (float) – timeout for the send

      • https (bool) – if True, use https for the grpc channel

      • root_certificates (Optional[str]) – the path to the root certificates for https, only u

      Return type

      Request

      Returns

      the response request

  • static create_async_channel_stub(address, https=False, root_certificates=None)[source]

    Creates an async GRPC Channel. This channel has to be closed eventually!

    • Parameters

      • address – the address to create the connection to, like 127.0.0.0.1:8080

      • https – if True, use https for the grpc channel

      • root_certificates (Optional[str]) – the path to the root certificates for https, only u

      Return type

      Tuple[JinaSingleDataRequestRPCStub, JinaDataRequestRPCStub, JinaControlRequestRPCStub, Channel]

      Returns

      DataRequest/ControlRequest stubs and an async grpc channel

class jina.serve.networking.K8sGrpcConnectionPool(namespace, client, logger=None)[source]

Bases: jina.serve.networking.GrpcConnectionPool

Manages grpc connections to replicas in a K8s deployment.

  • Parameters

    • namespace (str) – K8s namespace to operate in

    • client (kubernetes.client.CoreV1Api) – K8s client

    • logger (JinaLogger) – the logger to use

  • K8S_PORT_EXPOSE = 8080

  • K8S_PORT_IN = 8081

  • K8S_PORT_USES_BEFORE = 8082

  • K8S_PORT_USES_AFTER = 8083

  • start()[source]

    Subscribe to the K8s API and watch for changes in Pods

  • run()[source]

    Subscribes on MODIFIED events from list_namespaced_pod AK8s PI

  • async close()[source]

    Closes the connection pool

jina.serve.networking.is_remote_local_connection(first, second)[source]

Decides, whether first is remote host and second is localhost

  • Parameters

    • first (str) – the ip or host name of the first runtime

    • second (str) – the ip or host name of the second runtime

    Returns

    True, if first is remote and second is local

jina.serve.networking.create_connection_pool(k8s_connection_pool=False, k8s_namespace=None, logger=None)[source]

Creates the appropriate connection pool based on parameters :type k8s_namespace: Optional[str] :param k8s_namespace: k8s namespace the pool will live in, None if outside K8s :type k8s_connection_pool: bool :param k8s_connection_pool: flag to indicate if K8sGrpcConnectionPool should be used, defaults to true in K8s :type logger: Optional[JinaLogger] :param logger: the logger to use :rtype: GrpcConnectionPool :return: A connection pool object

jina.serve.networking.host_is_local(hostname)[source]

Check if hostname is point to localhost :param hostname: host to check :return: True if hostname means localhost, False otherwise