扩展插件(Plugins)

emqttd消息服务器通过模块注册和钩子(Hooks)机制,支持用户开发扩展插件定制服务器认证鉴权与业务功能。

emqtt项目组开发维护的插件包括:

emqttd_plugin_template: 插件开发模版

emqttd插件实际是一个Erlang应用,带自身的配置文件’etc/plugin.config”,置于’emqttd/plugins’目录下。

plugins/emqttd_plugin_template是一个模版插件,典型目录结构:

目录/文件

说明

etc/plugin.config

插件配置文件

ebin/

插件程序文件目录

加载、卸载插件

管理命令行’./bin/emqttd_ctl’加载卸载插件。

加载插件:

  1. ./bin/emqttd_ctl plugins load <PluginName>

卸载插件:

  1. ./bin/emqttd_ctl plugins unload <PluginName>

查询插件:

  1. ./bin/emqttd_ctl plugins list

emqttd_dashboard: Dashboard插件

emqttd消息服务器的Web管理控制台。插件项目地址: https://github.com/emqtt/emqttd_dashboard

emqttd消息服务器默认加载Dashboard插件。URL地址: http://localhost:18083 ,缺省用户名/密码: admin/public。

Dashboard插件可查询emqttd基本信息、统计数据、度量数据,查询系统客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)。

_images/dashboard.png

Dashboard插件设置

plugins/emqttd_dashboard/etc/plugin.config:

  1. [
  2. {emqttd_dashboard, [
  3. {listener,
  4. {emqttd_dashboard, 18083, [
  5. {acceptors, 4},
  6. {max_clients, 512}]}
  7. }
  8. ]}
  9. ].

emqttd_auth_http: HTTP认证/访问控制插件

HTTP认证/访问控制插件: https://github.com/emqtt/emqttd_auth_http

Note

1.1版本支持

配置插件

plugins/emqttd_auth_http/etc/plugin.config:

  1. [
  2. {emqttd_auth_http, [
  3. %% Variables: %u = username, %c = clientid, %a = ipaddress, %t = topic
  4. {super_req, [
  5. {method, post},
  6. {url, "http://localhost:8080/mqtt/superuser"},
  7. {params, [
  8. {username, "%u"},
  9. {clientid, "%c"}
  10. ]}
  11. ]},
  12. {auth_req, [
  13. {method, post},
  14. {url, "http://localhost:8080/mqtt/auth"},
  15. {params, [
  16. {clientid, "%c"},
  17. {username, "%u"},
  18. {password, "%P"}
  19. ]}
  20. ]},
  21. %% 'access' parameter: sub = 1, pub = 2
  22. {acl_req, [
  23. {method, post},
  24. {url, "http://localhost:8080/mqtt/acl"},
  25. {params, [
  26. {access, "%A"},
  27. {username, "%u"},
  28. {clientid, "%c"},
  29. {ipaddr, "%a"},
  30. {topic, "%t"}
  31. ]}
  32. ]}
  33. ]}
  34. ].

HTTP API

认证/ACL成功,API返回200

认证/ACL失败,API返回4xx

加载插件

./bin/emqttd_ctl plugins load emqttd_auth_http

emqttd_plugin_mysql: MySQL认证/访问控制插件

MySQL认证/访问控制插件,基于MySQL库表认证鉴权: https://github.com/emqtt/emqttd_plugin_mysql

MQTT用户表

  1. CREATE TABLE `mqtt_user` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `username` varchar(100) DEFAULT NULL,
  4. `password` varchar(100) DEFAULT NULL,
  5. `salt` varchar(20) DEFAULT NULL,
  6. `is_superuser` tinyint(1) DEFAULT 0,
  7. `created` datetime DEFAULT NULL,
  8. PRIMARY KEY (`id`),
  9. UNIQUE KEY `mqtt_username` (`username`)
  10. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;

Note

MySQL插件可使用系统自有的用户表,通过’authquery’配置查询语句。

MQTT访问控制表

  1. CREATE TABLE `mqtt_acl` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
  4. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
  5. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  6. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
  7. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  8. `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  9. PRIMARY KEY (`id`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  11. INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
  12. VALUES
  13. (1,1,NULL,'$all',NULL,2,'#'),
  14. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
  15. (3,0,NULL,'$all',NULL,1,'eq #'),
  16. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
  17. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
  18. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

配置插件

plugins/emqttd_plugin_mysql/etc/plugin.config:

  1. [
  2. {emqttd_plugin_mysql, [
  3. {mysql_pool, [
  4. %% ecpool options
  5. {pool_size, 8},
  6. {auto_reconnect, 3},
  7. %% mysql options
  8. {host, "localhost"},
  9. {port, 3306},
  10. {user, ""},
  11. {password, ""},
  12. {database, "mqtt"},
  13. {encoding, utf8}
  14. ]},
  15. %% Variables: %u = username, %c = clientid, %a = ipaddress
  16. %% Superuser Query
  17. {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
  18. %% Authentication Query: select password only
  19. {authquery, "select password from mqtt_user where username = '%u' limit 1"},
  20. %% hash algorithm: plain, md5, sha, sha256, pbkdf2?
  21. {password_hash, sha256},
  22. %% select password with salt
  23. %% {authquery, "select password, salt from mqtt_user where username = '%u'"},
  24. %% sha256 with salt prefix
  25. %% {password_hash, {salt, sha256}},
  26. %% sha256 with salt suffix
  27. %% {password_hash, {sha256, salt}},
  28. %% '%a' = ipaddress, '%u' = username, '%c' = clientid
  29. %% Comment this query, the acl will be disabled
  30. {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
  31. %% If no ACL rules matched, return...
  32. {acl_nomatch, allow}
  33. ]}
  34. ].

加载插件

./bin/emqttd_ctl plugins load emqttd_plugin_mysql

emqttd_plugin_pgsql: PostgreSQL认证/访问控制插件

PostgreSQL认证/访问控制插件,基于PostgreSQL库表认证鉴权: https://github.com/emqtt/emqttd_plugin_pgsql

MQTT用户表

  1. CREATE TABLE mqtt_user (
  2. id SERIAL primary key,
  3. is_superuser boolean,
  4. username character varying(100),
  5. password character varying(100),
  6. salt character varying(40)
  7. );

MQTT访问控制表

  1. CREATE TABLE mqtt_acl (
  2. id SERIAL primary key,
  3. allow integer,
  4. ipaddr character varying(60),
  5. username character varying(100),
  6. clientid character varying(100),
  7. access integer,
  8. topic character varying(100)
  9. );
  10. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
  11. VALUES
  12. (1,1,NULL,'$all',NULL,2,'#'),
  13. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
  14. (3,0,NULL,'$all',NULL,1,'eq #'),
  15. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
  16. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
  17. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

配置插件

plugins/emqttd_plugin_pgsql/etc/plugin.config:

  1. [
  2. {emqttd_plugin_pgsql, [
  3. {pgsql_pool, [
  4. %% ecpool options
  5. {pool_size, 8},
  6. {auto_reconnect, 3},
  7. %% pgsql options
  8. {host, "localhost"},
  9. {port, 5432},
  10. {ssl, false},
  11. {username, "feng"},
  12. {password, ""},
  13. {database, "mqtt"},
  14. {encoding, utf8}
  15. ]},
  16. %% Variables: %u = username, %c = clientid, %a = ipaddress
  17. %% Superuser Query
  18. {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
  19. %% Authentication Query: select password only
  20. {authquery, "select password from mqtt_user where username = '%u' limit 1"},
  21. %% hash algorithm: plain, md5, sha, sha256, pbkdf2?
  22. {password_hash, sha256},
  23. %% select password with salt
  24. %% {authquery, "select password, salt from mqtt_user where username = '%u'"},
  25. %% sha256 with salt prefix
  26. %% {password_hash, {salt, sha256}},
  27. %% sha256 with salt suffix
  28. %% {password_hash, {sha256, salt}},
  29. %% Comment this query, the acl will be disabled. Notice: don't edit this query!
  30. {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl
  31. where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
  32. %% If no rules matched, return...
  33. {acl_nomatch, allow}
  34. ]}
  35. ].

加载插件

  1. ./bin/emqttd_ctl plugins load emqttd_plugin_pgsql

emqttd_plugin_redis: Redis认证/访问控制插件

基于Redis认证/访问控制: https://github.com/emqtt/emqttd_plugin_redis

配置插件

plugins/emqttd_plugin_redis/etc/plugin.config:

  1. [
  2. {emqttd_plugin_redis, [
  3. {eredis_pool, [
  4. %% ecpool options
  5. {pool_size, 8},
  6. {auto_reconnect, 2},
  7. %% eredis options
  8. {host, "127.0.0.1"},
  9. {port, 6379},
  10. {database, 0},
  11. {password, ""}
  12. ]},
  13. %% Variables: %u = username, %c = clientid
  14. %% HMGET mqtt_user:%u is_superuser
  15. {supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]},
  16. %% HMGET mqtt_user:%u password
  17. {authcmd, ["HGET", "mqtt_user:%u", "password"]},
  18. %% Password hash algorithm: plain, md5, sha, sha256, pbkdf2?
  19. {password_hash, sha256},
  20. %% SMEMBERS mqtt_acl:%u
  21. {aclcmd, ["SMEMBERS", "mqtt_acl:%u"]},
  22. %% If no rules matched, return...
  23. {acl_nomatch, deny},
  24. %% Load Subscriptions form Redis when client connected.
  25. {subcmd, ["HGETALL", "mqtt_subs:%u"]}
  26. ]}
  27. ].

用户Hash

默认基于用户Hash认证:

  1. HSET mqtt_user:<username> is_superuser 1
  2. HSET mqtt_user:<username> password "passwd"

ACL规则SET

默认采用SET存储ACL规则:

  1. SADD mqtt_acl:<username> "publish topic1"
  2. SADD mqtt_acl:<username> "subscribe topic2"
  3. SADD mqtt_acl:<username> "pubsub topic3"

订阅Hash

插件还支持Redis中创建MQTT订阅。当MQTT客户端连接成功,会自动从Redis加载订阅:

  1. HSET mqtt_subs:<username> topic1 0
  2. HSET mqtt_subs:<username> topic2 1
  3. HSET mqtt_subs:<username> topic3 2

加载插件

  1. ./bin/emqttd_ctl plugins load emqttd_plugin_redis

emqttd_plugin_mongo: MongoDB认证/访问控制插件

基于MongoDB认证/访问控制: https://github.com/emqtt/emqttd_plugin_mongo

配置插件

plugins/emqttd_plugin_mongo/etc/plugin.config:

  1. [
  2. {emqttd_plugin_mongo, [
  3. {mongo_pool, [
  4. {pool_size, 8},
  5. {auto_reconnect, 3},
  6. %% Mongodb Driver Opts
  7. {host, "localhost"},
  8. {port, 27017},
  9. %% {login, ""},
  10. %% {password, ""},
  11. {database, "mqtt"}
  12. ]},
  13. %% Variables: %u = username, %c = clientid
  14. %% Superuser Query
  15. {superquery, [
  16. {collection, "mqtt_user"},
  17. {super_field, "is_superuser"},
  18. {selector, {"username", "%u"}}
  19. ]},
  20. %% Authentication Query
  21. {authquery, [
  22. {collection, "mqtt_user"},
  23. {password_field, "password"},
  24. %% Hash Algorithm: plain, md5, sha, sha256, pbkdf2?
  25. {password_hash, sha256},
  26. {selector, {"username", "%u"}}
  27. ]},
  28. %% ACL Query: "%u" = username, "%c" = clientid
  29. {aclquery, [
  30. {collection, "mqtt_acl"},
  31. {selector, {"username", "%u"}}
  32. ]},
  33. %% If no ACL rules matched, return...
  34. {acl_nomatch, deny}
  35. ]}
  36. ].

MongoDB数据库

  1. use mqtt
  2. db.createCollection("mqtt_user")
  3. db.createCollection("mqtt_acl")
  4. db.mqtt_user.ensureIndex({"username":1})

Note

数据库、集合名称可自定义

用户集合(User Collection)

  1. {
  2. username: "user",
  3. password: "password hash",
  4. is_superuser: boolean (true, false),
  5. created: "datetime"
  6. }

示例:

  1. db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
  2. db.mqtt_user:insert({username: "root", is_superuser: true})

ACL集合(ACL Collection)

  1. {
  2. username: "username",
  3. clientid: "clientid",
  4. publish: ["topic1", "topic2", ...],
  5. subscribe: ["subtop1", "subtop2", ...],
  6. pubsub: ["topic/#", "topic1", ...]
  7. }

示例:

  1. db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
  2. db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})

加载插件

  1. ./bin/emqttd_ctl plugins load emqttd_plugin_mongo

emqttd_stomp: Stomp协议插件

Stomp协议插件。支持STOMP 1.0/1.1/1.2协议客户端连接emqttd,发布订阅MQTT消息。

配置插件

Note

Stomp协议端口: 61613

plugins/emqttd_stomp/etc/plugin.config:

  1. [
  2. {emqttd_stomp, [
  3. {default_user, [
  4. {login, "guest"},
  5. {passcode, "guest"}
  6. ]},
  7. {allow_anonymous, true},
  8. %%TODO: unused...
  9. {frame, [
  10. {max_headers, 10},
  11. {max_header_length, 1024},
  12. {max_body_length, 8192}
  13. ]},
  14. {listeners, [
  15. {emqttd_stomp, 61613, [
  16. {acceptors, 4},
  17. {max_clients, 512}
  18. ]}
  19. ]}
  20. ]}
  21. ].

加载插件

  1. ./bin/emqttd_ctl plugins load emqttd_stomp

emqttd_sockjs: Stomp/Sockjs插件

配置插件

Note

缺省端口: 61616

  1. [
  2. {emqttd_sockjs, [
  3. {sockjs, []},
  4. {cowboy_listener, {stomp_sockjs, 61616, 4}},
  5. ]}
  6. ].

加载插件

Note

需先加载emqttd_stomp插件

  1. ./bin/emqttd_ctl plugins load emqttd_stomp
  2. ./bin/emqttd_ctl plugins load emqttd_sockjs

插件演示页面

http://localhost:61616/index.html

emqttd_recon: Recon性能调试插件

emqttd_recon插件集成recon性能调测库,’./bin/emqttd_ctl’命令行注册recon命令。

加载插件

  1. ./bin/emqttd_ctl plugins load emqttd_recon

recon命令

  1. ./bin/emqttd_ctl recon
  2. recon memory #recon_alloc:memory/2
  3. recon allocated #recon_alloc:memory(allocated_types, current|max)
  4. recon bin_leak #recon:bin_leak(100)
  5. recon node_stats #recon:node_stats(10, 1000)
  6. recon remote_load Mod #recon:remote_load(Mod)

emqttd_reloader: 代码热加载插件

用于开发调试的代码热升级插件。加载该插件后,emqttd会自动热升级更新代码。

Note

产品部署环境不建议使用该插件

加载插件

  1. ./bin/emqttd_ctl plugins load emqttd_reloader

reload命令

  1. ./bin/emqttd_ctl reload
  2. reload <Module> # Reload a Module

emqttd插件开发

创建插件项目

github下载emqttd源码库,plugins/目录下创建插件应用。

模版代码请参考: emqttd_plugin_templage

注册认证/访问控制模块

认证演示模块 - emqttd_auth_demo.erl

  1. -module(emqttd_auth_demo).
  2. -behaviour(emqttd_auth_mod).
  3. -include("../../../include/emqttd.hrl").
  4. -export([init/1, check/3, description/0]).
  5. init(Opts) -> {ok, Opts}.
  6. check(#mqtt_client{client_id = ClientId, username = Username}, Password, _Opts) ->
  7. io:format("Auth Demo: clientId=~p, username=~p, password=~p~n",
  8. [ClientId, Username, Password]),
  9. ok.
  10. description() -> "Demo Auth Module".

访问控制演示模块 - emqttd_acl_demo.erl

  1. -module(emqttd_acl_demo).
  2. -include("../../../include/emqttd.hrl").
  3. %% ACL callbacks
  4. -export([init/1, check_acl/2, reload_acl/1, description/0]).
  5. init(Opts) ->
  6. {ok, Opts}.
  7. check_acl({Client, PubSub, Topic}, Opts) ->
  8. io:format("ACL Demo: ~p ~p ~p~n", [Client, PubSub, Topic]),
  9. allow.
  10. reload_acl(_Opts) ->
  11. ok.
  12. description() -> "ACL Module Demo".

注册认证、访问控制模块 - emqttd_plugin_template_app.erl

  1. ok = emqttd_access_control:register_mod(auth, emqttd_auth_demo, []),
  2. ok = emqttd_access_control:register_mod(acl, emqttd_acl_demo, []),

注册扩展钩子(Hooks)

通过钩子(Hook)处理客户端上下线、主题订阅、消息收发。

emqttd_plugin_template.erl:

  1. %% Called when the plugin application start
  2. load(Env) ->
  3. emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
  4. emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
  5. emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
  6. emqttd:hook('client.subscribe.after', fun ?MODULE:on_client_subscribe_after/3, [Env]),
  7. emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
  8. emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
  9. emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/3, [Env]),
  10. emqttd:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).

扩展钩子(Hook):

钩子

说明

client.connected

客户端上线

client.subscribe

客户端订阅主题前

client.subscribe.after

客户端订阅主题后

client.unsubscribe

客户端取消订阅主题

message.publish

MQTT消息发布

message.delivered

MQTT消息送达

message.acked

MQTT消息回执

client.disconnected

客户端连接断开

注册扩展命令行

扩展命令行演示模块 - emqttd_cli_demo.erl

  1. -module(emqttd_cli_demo).
  2. -include("../../../include/emqttd_cli.hrl").
  3. -export([cmd/1]).
  4. cmd(["arg1", "arg2"]) ->
  5. ?PRINT_MSG("ok");
  6. cmd(_) ->
  7. ?USAGE([{"cmd arg1 arg2", "cmd demo"}]).

注册命令行模块 - emqttd_plugin_template_app.erl

  1. emqttd_ctl:register_cmd(cmd, {emqttd_cli_demo, cmd}, []).

插件加载后,’./bin/emqttd_ctl’新增命令行:

  1. ./bin/emqttd_ctl cmd arg1 arg2