钩子(Hook)设计

钩子(Hook)定义

EMQ X 消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):

钩子说明
client.authenticate客户端认证
client.check_acl客户端 ACL 检查
client.connected客户端上线
client.subscribe客户端订阅主题前
client.unsubscribe客户端取消订阅主题
session.subscribed客户端订阅主题后
session.unsubscribed客户端取消订阅主题后
message.publishMQTT 消息发布
message.deliverMQTT 消息投递前
message.ackedMQTT 消息回执
client.disconnected客户端连接断开

钩子(Hook) 采用职责链设计模式( Chain-of-responsibility_pattern 钩子(Hook)设计 - 图1 (opens new window) ),扩展模块或插件向钩子注册回调函数,系统在客户端上下线、主题订阅或消息发布确认时,触发钩子顺序执行回调函数:

image

不同钩子的回调函数输入参数不同,用户可参考插件模版的 emqx_plugin_template 钩子(Hook)设计 - 图3 (opens new window) 模块,每个回调函数应该返回:

返回说明
ok继续执行
{ok, NewAcc}返回累积参数继续执行
stop停止执行
{stop, NewAcc}返回累积参数停止执行

钩子(Hook)实现

emqx 模块封装了 Hook 接口:

  1. -spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}).
  2. hook(HookPoint, Action) ->
  3. emqx_hooks:add(HookPoint, Action).
  4. -spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer())
  5. -> ok | {error, already_exists}).
  6. hook(HookPoint, Action, Priority) when is_integer(Priority) ->
  7. emqx_hooks:add(HookPoint, Action, Priority);
  8. hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
  9. emqx_hooks:add(HookPoint, Action, Filter);
  10. hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
  11. emqx_hooks:add(HookPoint, Action, InitArgs).
  12. -spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer())
  13. -> ok | {error, already_exists}).
  14. hook(HookPoint, Action, Filter, Priority) ->
  15. emqx_hooks:add(HookPoint, Action, Filter, Priority).
  16. -spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok).
  17. unhook(HookPoint, Action) ->
  18. emqx_hooks:del(HookPoint, Action).
  19. -spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
  20. run_hook(HookPoint, Args) ->
  21. emqx_hooks:run(HookPoint, Args).
  22. -spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
  23. run_fold_hook(HookPoint, Args, Acc) ->
  24. emqx_hooks:run_fold(HookPoint, Args, Acc).

钩子(Hook)使用

emqx_plugin_template 钩子(Hook)设计 - 图4 (opens new window) 提供了全部钩子的使用示例,例如端到端的消息处理回调:

  1. -module(emqx_plugin_template).
  2. -export([load/1, unload/0]).
  3. -export([on_message_publish/2, on_message_deliver/3, on_message_acked/3]).
  4. load(Env) ->
  5. emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
  6. emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
  7. emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).
  8. on_message_publish(Message, _Env) ->
  9. io:format("publish ~s~n", [emqx_message:format(Message)]),
  10. {ok, Message}.
  11. on_message_deliver(Credentials, Message, _Env) ->
  12. io:format("deliver to client ~s: ~s~n", [Credentials, emqx_message:format(Message)]),
  13. {ok, Message}.
  14. on_message_acked(Credentials, Message, _Env) ->
  15. io:format("client ~s acked: ~s~n", [Credentials, emqx_message:format(Message)]),
  16. {ok, Message}.
  17. unload() ->
  18. emqx:unhook('message.publish', fun ?MODULE:on_message_publish/2),
  19. emqx:unhook('message.acked', fun ?MODULE:on_message_acked/3),
  20. emqx:unhook('message.deliver', fun ?MODULE:on_message_deliver/3).