Extension Hook

The Extension Hook is supported by the emqx-exhook plugin. It allows users to process EMQX’s Hooks using other programming languages.

In this way, other programming languages can handle emqx events for the purpose of customizing and extending emqx. For example, users can use other programming languages to implement:

  • Authorization for client access
  • ACL authentication for publishing/subscribing
  • Persistence and bridging for messages
  • Process client connected/disconnected events

Design

The emqx-exhook plugin uses gRPCExtension Hook - 图1 (opens new window) as the communication framework for RPC.

The architecture is as illustrated below:

  1. EMQX
  2. +========================+ +========+==========+
  3. | ExHook | | | |
  4. | +----------------+ | gRPC | gRPC | User's |
  5. | | gRPC Client | ------------------> | Server | Codes |
  6. | +----------------+ | (HTTP/2) | | |
  7. | | | | |
  8. +========================+ +========+==========+

It indicates that EMQX acts as a gRPC client, sending hook events from EMQX to the user’s gRPC server.

Consistent with EMQX’s native hooks, it also supports a chained approach to calculating and returning.

chain_of_responsiblity

APIs

As the event handler, i.e. user implemented server side of gRPC. It can define the list of hooks that need to be mounted, and the implement callback functions for how to go about handling each event when it arrives.

These interfaces are defined as a gRPC service called HookProvider:

  1. syntax = "proto3";
  2. package emqx.exhook.v1;
  3. service HookProvider {
  4. rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
  5. rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
  6. rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
  7. rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
  8. rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
  9. rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
  10. rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
  11. rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
  12. rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
  13. rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
  14. rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
  15. rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
  16. rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
  17. rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
  18. rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
  19. rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
  20. rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
  21. rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
  22. rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
  23. rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
  24. rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
  25. }

The HookProvider part:

  • OnProviderLoaded: Defines how the HookProvider is loaded and return the list of hooks that need to be mounted. Only the hooks in this list will be called back to the HookProivder service.
  • OnProviderUnloaded: Defines how the HookProvider is unloaded, only for notification.

The hook events part:

  • The methods prefixed with OnClient, OnSession, and OnMessage correspond to the methods in hooks. They have the same call timing and a similar argument list.
  • Only OnClientAuthenticate, OnClientCheckAcl, OnMessagePublish are allowed to carry the return values to EMQX, other callbacks are not supported.
  • Specifically, for message type hooks: message.publish, message.delivered, message.acked, message.dropped, it is possible to carry a list of topic filters for these hooks when returning a list of hooks. Then when a message event is triggered, only subject-specific messages that can match any filter in the subject filter list will be sent to the user’s gRPC server. In this way, the gRPC server can process messages under only the topics it cares about, to avoid consuming redundant gRPC requests.

For details of the interface and parameter data structures refer to: exhook.protoExtension Hook - 图3 (opens new window)

TIP

It should be noted that the hooks and the topic filter list configured by the message type hook hooks are confirmed only once when the HookProvider is loaded. And the subsequent gRPC requests will be based on the configured when loading.
If the list of hooks to be mounted has changed, or the list of topic filters concerned by the message type hook has changed, it needs to be reloaded. That is, the ExHook plugin/module needs to be restarted in EMQX.

Developing Guide

The user needs to implement the gRPC service of HookProvider to receive callback events from EMQX.

The main development steps are as following:

  1. Copy the lib/emqx_exhook-<x.y.z>/priv/protos/exhook.proto file to your project.
  2. Generate the code for the gRPC server side of exhook.proto using the gRPC framework for the corresponding programming language.
  3. Implement the interfaces defined in exhook.proto on demand

Once the development is complete, the service needs to be deployed to a server that can communicate with EMQX and ensure that the ports are open.

Then modify the server configuration in etc/plugins/emqx_exhook.conf, for example:

  1. exhook.server.default.url = http://127.0.0.1:9000

Start the emqx_exhook plugin and observe the output.

One of the gRPC frameworks for each language can be found at: grpc-ecosystem/awesome-grpcExtension Hook - 图4 (opens new window)

We also provide sample programs for some common programming languages: emqx-extension-examplesExtension Hook - 图5 (opens new window)