Extension Protocol

Extension Protocol is provided by the emqx-exproto plugin, which allows other programming languages (e.g. Python, Java, etc.) to process bytes directly for parsing private protocols and provides a Pub/Sub interface for message exchange with EMQX.

This feature gives EMQX the power of scalability to handle any private protocol in a user-friendly programming language and enjoy the benefits of extremely high concurrent connections brought by the EMQX.

Features

  • Extremely scalable. Supports all major programming languages using gRPC as the RPC communication framework
  • Fully asynchronous IO. The connection layer is implemented as fully asynchronous non-blocking I/O
  • Transparent connection layer. Full support for TCP/TLS UDP/DTLS type connection management, and provides a unified API for the upper layers
  • Connection management capabilities. For example, maximum number of connections, connection and throughput rate limits, IP blacklisting, etc.

Design

Extension-Protocol Arch

The interfaces provided by emqx-exproto:

  1. Connection Layer: This layer mainly maintains the life cycle of the socket, and the sending/receiving of data. Included:

    • Listens on a port. When a new TCP/UDP connection arrives, a connection process is started to maintain the state of the connection.
    • Call the OnSocketCreated callback. Used to notify the user’s server that a new connection has been established.
    • Call OnSocketClosed callback. Used to notify the user’s server that a connection has been closed.
    • Call OnReceivedBytes callback. Used to notify the user’s server that the connection received new packets.
    • Provides the Send interface. Called by user’s server to send packets.
    • Provides the Close interface. Called by user’s server. For actively closing the connection.
  2. Protocol/Session Layer: This layer primarily provides the PUB/SUB interface for message interoperability with the EMQX. Includes:

    • Provides the Authenticate interface. Called by user’s server to register clients into EMQX.
    • Provides the StartTimer interface. Called by user’s server to start a timer such as a heartbeat for the connected process.
    • Provides the Publish interface. Called by user’s server to publish messages to the EMQX.
    • Provides the Subscribe interface. Called by user’s server to subscribe to a topic to receive certain downlink messages from the EMQX.
    • Provides the Unsubscribe interface. Called by user’s server to unsubscribe from a topic.
    • Calls the OnTimerTimeout callback. Used to handle timer timeout events
    • Call the OnReceivedMessages callback. Used to receive downlink messages (After a successful subscription to a topic, this method will be called back if there are messages on the topic)

APIs

From a gRPC perspective, ExProto acts as a client to send callback requests to the ConnectionHandler service. It also acts as a server side to provide the ConnectionAdapter service to user’s server to provide calls to various interfaces. As shown in the figure.

Extension Protocol gRPC Arch

The services definition see: exproto.protoExtension Proto - 图3 (opens new window)

For examples:

  1. syntax = "proto3";
  2. package emqx.exproto.v1;
  3. // The Broker side service. It provides a set of APIs to
  4. // handle a protocol access
  5. service ConnectionAdapter {
  6. // -- socket layer
  7. rpc Send(SendBytesRequest) returns (CodeResponse) {};
  8. rpc Close(CloseSocketRequest) returns (CodeResponse) {};
  9. // -- protocol layer
  10. rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {};
  11. rpc StartTimer(TimerRequest) returns (CodeResponse) {};
  12. // -- pub/sub layer
  13. rpc Publish(PublishRequest) returns (CodeResponse) {};
  14. rpc Subscribe(SubscribeRequest) returns (CodeResponse) {};
  15. rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {};
  16. }
  17. service ConnectionHandler {
  18. // -- socket layer
  19. rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {};
  20. rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {};
  21. rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {};
  22. // -- pub/sub layer
  23. rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {};
  24. rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {};
  25. }

Developing Guide

The user needs to implement the gRPC service of ConnectionHandler to receive callback events from EMQX.

The main development steps are as following:

  1. Copy the lib/emqx_exproto-<x.y.z>/priv/protos/exproto.proto file to your project.
  2. Generate the code for the gRPC server and client side of exproto.proto using the gRPC framework for the corresponding programming language.
  3. Implement the interfaces defined in exhook.proto on demand

Once the development is complete, the service needs to be deployed to a server that can communicate with EMQX and ensure that the ports are open.

Then modify the server configuration in etc/plugins/emqx_exproto.conf, for example:

  1. ## The ConnectionAdapter services listen on
  2. exproto.server.http.port = 9100
  3. ## The ExProto listen on for accepting Client connection
  4. exproto.listener.protoname = tcp://0.0.0.0:7993
  5. ## The ConnectionHandler callback address
  6. exproto.listener.protoname.connection_handler_url = http://127.0.0.1:9001

Start the emqx_exproto plugin and observe the output.

One of the gRPC frameworks for each language can be found at: grpc-ecosystem/awesome-grpcExtension Proto - 图4 (opens new window)

We also provide sample programs for some common programming languages: emqx-extension-examplesExtension Proto - 图5 (opens new window)