扩展插件 (Plugins)

EMQ X 消息服务器通过模块注册和钩子(Hooks)机制,支持用户开发扩展插件定制服务器认证鉴权与业务功能。

EMQ X 官方提供的插件包括:

插件

配置文件

说明

emqx_dashboard

etc/plugins/emqx_dashbord.conf

Web 控制台插件(默认加载)

emqx_management

etc/plugins/emqx_management.conf

HTTP API 与 CLI 管理插件

emqx_auth_clientid

etc/plugins/emqx_auth_clientid.conf

ClientId 认证插件

emqx_auth_username

etc/plugins/emqx_auth_username.conf

用户名、密码认证插件

emqx_auth_jwt

etc/plugins/emqx_auth_jwt.conf

JWT 认证/访问控制

emqx_auth_ldap

etc/plugins/emqx_auth_ldap.conf

LDAP 认证/访问控制

emqx_auth_http

etc/plugins/emqx_auth_http.conf

HTTP 认证/访问控制

emqx_auth_mongo

etc/plugins/emqx_auth_mongo.conf

MongoDB 认证/访问控制

emqx_auth_mysql

etc/plugins/emqx_auth_mysql.conf

MySQL 认证/访问控制

emqx_auth_pgsql

etc/plugins/emqx_auth_pgsql.conf

PostgreSQL 认证/访问控制

emqx_auth_redis

etc/plugins/emqx_auth_redis.conf

Redis 认证/访问控制

emqx_psk_file

etc/plugins/emqx_psk_file.conf

PSK 支持

emqx_web_hook

etc/plugins/emqx_web_hook.conf

Web Hook 插件

emqx_lua_hook

etc/plugins/emqx_lua_hook.conf

Lua Hook 插件

emqx_retainer

etc/plugins/emqx_retainer.conf

Retain 消息存储模块

emqx_rule_engine

etc/plugins/emqx_rule_engine.conf

规则引擎

emqx_bridge_mqtt

etc/plugins/emqx_bridge_mqtt.conf

MQTT 消息桥接插件

emqx_delayed_publish

etc/plugins/emqx_delayed_publish.conf

客户端延时发布消息支持

emqx_coap

etc/plugins/emqx_coap.conf

CoAP 协议支持

emqx_lwm2m

etc/plugins/emqx_lwm2m.conf

LwM2M 协议支持

emqx_sn

etc/plugins/emqx_sn.conf

MQTT-SN 协议支持

emqx_stomp

etc/plugins/emqx_stomp.conf

Stomp 协议支持

emqx_recon

etc/plugins/emqx_recon.conf

Recon 性能调试

emqx_reloader

etc/plugins/emqx_reloader.conf

Reloader 代码热加载插件

emqx_plugin_template

etc/plugins/emqx_plugin_template.conf

插件开发模版

其中插件的加载有四种方式:

  1. 默认加载

  2. 命令行启停插件

  3. 使用 Dashboard 启停插件

  4. 调用管理 API 启停插件

开启默认加载

如需在系统启动时就默认启动某插件,则直接在 data/loaded_plugins 配置入需要启动的插件,例如默认开启的加载的插件有:

  1. emqx_management.
  2. emqx_rule_engine.
  3. emqx_recon.
  4. emqx_retainer.
  5. emqx_dashboard.

命令行启停插件

在运行过程中,我们可以通过 CLI 命令的方式查看可用的插件列表、和启停某插件:

  1. ## 显示所有可用的插件列表
  2. ./bin/emqx_ctl plugins list
  3. ## 加载某插件
  4. ./bin/emqx_ctl plugins load emqx_auth_username
  5. ## 卸载某插件
  6. ./bin/emqx_ctl plugins unload emqx_auth_username
  7. ## 重新加载某插件
  8. ./bin/emqx_ctl plugins reload emqx_auth_username

使用 Dashboard 启停插件

如果 EMQ X 开启了 Dashbord 的插件(默认开启) 还可以直接通过访问 http://localhost:18083/plugins 中的插件管理页面启停、或者配置插件。

Dashboard 插件

emqx_dashboardEMQ X 消息服务器的 Web 管理控制台, 该插件默认开启。当 EMQ X 启动成功后,可访问 http://localhost:18083 进行查看,默认用户名/密码: admin/public。

Dashboard 中可查询 EMQ X 消息服务器基本信息、统计数据、负载情况,查询当前客户端列表(Connections)、会话(Sessions)、路由表(Topics)、订阅关系(Subscriptions) 等详细信息。

_images/dashboard.png

除此之外,Dashboard 默认提供了一系列的 REST API 供前端调用。其详情可以参考 Dashboard -> HTTP API 部分。

Dashboard 插件设置

etc/plugins/emqx_dashboard.conf:

  1. ## Dashboard 默认用户名/密码
  2. dashboard.default_user.login = admin
  3. dashboard.default_user.password = public
  4. ## Dashboard HTTP 服务端口配置
  5. dashboard.listener.http = 18083
  6. dashboard.listener.http.acceptors = 2
  7. dashboard.listener.http.max_clients = 512
  8. ## Dashboard HTTPS 服务端口配置
  9. ## dashboard.listener.https = 18084
  10. ## dashboard.listener.https.acceptors = 2
  11. ## dashboard.listener.https.max_clients = 512
  12. ## dashboard.listener.https.handshake_timeout = 15s
  13. ## dashboard.listener.https.certfile = etc/certs/cert.pem
  14. ## dashboard.listener.https.keyfile = etc/certs/key.pem
  15. ## dashboard.listener.https.cacertfile = etc/certs/cacert.pem
  16. ## dashboard.listener.https.verify = verify_peer
  17. ## dashboard.listener.https.fail_if_no_peer_cert = true

HTTP API 与 CLI 管理插件

emqx_managementEMQ X 消息服务器的 HTTP API 与 CLI 管理插件,该插件默认开启。当 EMQ X 启动成功后,用户即可通过该插件提供的 HTTP API 与 CLI 进行查询当前客户端列表等操作,详见 管理监控API (REST API)管理命令 (Commands)

HTTP API 与 CLI 管理设置

etc/plugins/emqx_management.conf:

  1. ## 最多返回多少条数据,用于分页机制
  2. management.max_row_limit = 10000
  3. ## 默认的应用 secret
  4. # management.application.default_secret = public
  5. ## Management HTTP 服务器端口配置
  6. management.listener.http = 8080
  7. management.listener.http.acceptors = 2
  8. management.listener.http.max_clients = 512
  9. management.listener.http.backlog = 512
  10. management.listener.http.send_timeout = 15s
  11. management.listener.http.send_timeout_close = on
  12. ## Management HTTPS 服务器端口配置
  13. ## management.listener.https = 8081
  14. ## management.listener.https.acceptors = 2
  15. ## management.listener.https.max_clients = 512
  16. ## management.listener.https.backlog = 512
  17. ## management.listener.https.send_timeout = 15s
  18. ## management.listener.https.send_timeout_close = on
  19. ## management.listener.https.certfile = etc/certs/cert.pem
  20. ## management.listener.https.keyfile = etc/certs/key.pem
  21. ## management.listener.https.cacertfile = etc/certs/cacert.pem
  22. ## management.listener.https.verify = verify_peer
  23. ## management.listener.https.fail_if_no_peer_cert = true

ClientID 认证插件

emqx_auth_clientid 目前只支持 连接认证,通过 clientidpassword 认证客户端。此插件在存储密码时会按照配置的 hash 算法将明文加密后存入。

ClientID 认证配置

etc/plugins/emqx_auth_clientid.conf:

  1. ## Default usernames Examples
  2. ##auth.client.1.clientid = id
  3. ##auth.client.1.password = passwd
  4. ##auth.client.2.clientid = dev:devid
  5. ##auth.client.2.password = passwd2
  6. ##auth.client.3.clientid = app:appid
  7. ##auth.client.3.password = passwd3
  8. ##auth.client.4.clientid = client~!@#$%^&*()_+
  9. ##auth.client.4.password = passwd~!@#$%^&*()_+
  10. ## 密码加密方式
  11. ## 枚举值: plain | md5 | sha | sha256
  12. auth.client.password_hash = sha256

Username 认证插件

emqx_auth_username 目前只支持 连接认证,通过 usernamepassword 认证客户端。此插件在存储密码时会按照配置的 hash 算法将明文加密后存入。

用户名认证配置

etc/plugins/emqx_auth_username.conf:

  1. ## Default usernames Examples:
  2. ##auth.user.1.username = admin
  3. ##auth.user.1.password = public
  4. ##auth.user.2.username = feng@emqtt.io
  5. ##auth.user.2.password = public
  6. ##auth.user.3.username = name~!@#$%^&*()_+
  7. ##auth.user.3.password = pwsswd~!@#$%^&*()_+
  8. ## 密码加密方式
  9. ## 枚举值: plain | md5 | sha | sha256
  10. auth.user.password_hash = sha256

JWT 认证插件

emqx_auth_jwt 支持基于 JWT 的方式,对连接的客户端进行认证,只支持 连接认证 功能。它会解析并校验 Token 的合理性和时效性、满足则允许连接。

JWT 认证配置

etc/plugins/emqx_auth_jwt.conf:

  1. ## HMAC Hash 算法密钥
  2. auth.jwt.secret = emqxsecret
  3. ## RSA 或 ECDSA 算法的公钥
  4. ## auth.jwt.pubkey = etc/certs/jwt_public_key.pem
  5. ## JWT 串的来源
  6. ## 枚举值: username | password
  7. auth.jwt.from = password

LDAP 认证/访问控制插件

emqx_auth_ldap 支持访问 LDAP 实现 连接认证访问控制 功能。

LDAP 认证插件配置

etc/plugins/emqx_auth_ldap.conf:

  1. auth.ldap.servers = 127.0.0.1
  2. auth.ldap.port = 389
  3. auth.ldap.pool = 8
  4. auth.ldap.bind_dn = cn=root,dc=emqx,dc=io
  5. auth.ldap.bind_password = public
  6. auth.ldap.timeout = 30s
  7. auth.ldap.device_dn = ou=device,dc=emqx,dc=io
  8. auth.ldap.match_objectclass = mqttUser
  9. auth.ldap.username.attributetype = uid
  10. auth.ldap.password.attributetype = userPassword
  11. auth.ldap.ssl = false
  12. ## auth.ldap.ssl.certfile = etc/certs/cert.pem
  13. ## auth.ldap.ssl.keyfile = etc/certs/key.pem
  14. ## auth.ldap.ssl.cacertfile = etc/certs/cacert.pem
  15. ## auth.ldap.ssl.verify = verify_peer
  16. ## auth.ldap.ssl.fail_if_no_peer_cert = true

HTTP 认证/访问控制插件

emqx_auth_http 插件实现 连接认证访问控制 的功能。它会将每个请求发送到指定的 HTTP 服务,通过其返回值来判断是否具有操作权限。

该插件总共支持三个请求分别为:

  1. auth.http.auth_req: 连接认证

  2. auth.http.super_req: 判断是否为超级用户

  3. auth.http.acl_req: 访问控制权限查询

每个请求的参数都支持使用真实的客户端的 Username, IP 地址等进行自定义。

注解

其中在 3.1 版本中新增的 %C %d 的支持。

HTTP 认证插件配置

etc/plugins/emqx_auth_http.conf:

  1. ## http 请求超时时间, 0 为不设置超时
  2. ## auth.http.request.timeout = 0
  3. ## http 建立 tcp 连接的超时时间, 默认与 'request.timeout' 一致
  4. ## auth.http.request.connect_timout = 0
  5. ## http 请求最大重试次数
  6. auth.http.request.retry_times = 3
  7. ## http 重试间隔
  8. auth.http.request.retry_interval = 1s
  9. ## 重试间隔的退避指数, 实际值 = `interval * backoff ^ times`
  10. auth.http.request.retry_backoff = 2.0
  11. ## https 证书配置
  12. ## auth.http.ssl.cacertfile = {{ platform_etc_dir }}/certs/ca.pem
  13. ## auth.http.ssl.certfile = {{ platform_etc_dir }}/certs/client-cert.pem
  14. ## auth.http.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
  15. ## 占位符:
  16. ## - %u: username
  17. ## - %c: clientid
  18. ## - %a: ipaddress
  19. ## - %P: password
  20. ## - %C: common name of client TLS cert
  21. ## - %d: subject of client TLS cert
  22. auth.http.auth_req = http://127.0.0.1:8080/mqtt/auth
  23. ## AUTH 请求的 HTTP 方法和参数配置
  24. auth.http.auth_req.method = post
  25. auth.http.auth_req.params = clientid=%c,username=%u,password=%P
  26. auth.http.super_req = http://127.0.0.1:8080/mqtt/superuser
  27. auth.http.super_req.method = post
  28. auth.http.super_req.params = clientid=%c,username=%u
  29. ## 占位符:
  30. ## - %A: 1 | 2, 1 = sub, 2 = pub
  31. ## - %u: username
  32. ## - %c: clientid
  33. ## - %a: ipaddress
  34. ## - %t: topic
  35. auth.http.acl_req = http://127.0.0.1:8080/mqtt/acl
  36. auth.http.acl_req.method = get
  37. auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

HTTP API 返回值处理

连接认证

  1. ## 认证成功
  2. HTTP Status Code: 200
  3. ## 忽略此次认证
  4. HTTP Status Code: 200
  5. Body: ignore
  6. ## 认证失败
  7. HTTP Status Code: Except 200

超级用户

  1. ## 确认为超级用户
  2. HTTP Status Code: 200
  3. ## 非超级用户
  4. HTTP Status Code: Except 200

访问控制

  1. ## 允许 Publish/Subscribe:
  2. HTTP Status Code: 200
  3. ## 忽略此次鉴权:
  4. HTTP Status Code: 200
  5. Body: ignore
  6. ## 拒绝该次 Publish/Subscribe:
  7. HTTP Status Code: Except 200

MySQL 认证/访问控制插件

emqx_auth_mysql 支持访问 MySQL 实现 连接认证访问控制 功能。要实现这些功能,我们需要在 MySQL 中创建两张表,其格式如下:

MQTT 用户表

  1. CREATE TABLE `mqtt_user` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `username` varchar(100) DEFAULT NULL,
  4. `password` varchar(100) DEFAULT NULL,
  5. `salt` varchar(35) DEFAULT NULL,
  6. `is_superuser` tinyint(1) DEFAULT 0,
  7. `created` datetime DEFAULT NULL,
  8. PRIMARY KEY (`id`),
  9. UNIQUE KEY `mqtt_username` (`username`)
  10. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;

注解

插件同样支持使用自定义结构的表,通过 auth_query 配置查询语句即可。

MQTT 访问控制表

  1. CREATE TABLE `mqtt_acl` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
  4. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
  5. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  6. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
  7. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  8. `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  9. PRIMARY KEY (`id`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  11. INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
  12. VALUES
  13. (1,1,NULL,'$all',NULL,2,'#'),
  14. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
  15. (3,0,NULL,'$all',NULL,1,'eq #'),
  16. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
  17. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
  18. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

配置 MySQL 认证鉴权插件

etc/plugins/emqx_auth_mysql.conf:

  1. ## Mysql 服务器地址
  2. auth.mysql.server = 127.0.0.1:3306
  3. ## Mysql 连接池大小
  4. auth.mysql.pool = 8
  5. ## Mysql 连接用户名
  6. ## auth.mysql.username =
  7. ## Mysql 连接密码
  8. ## auth.mysql.password =
  9. ## Mysql 认证用户表名
  10. auth.mysql.database = mqtt
  11. ## Mysql 查询超时时间
  12. auth.mysql.query_timeout = 5s
  13. ## 可用占位符:
  14. ## - %u: username
  15. ## - %c: clientid
  16. ## - %C: common name of client TLS cert
  17. ## - %d: subject of client TLS cert
  18. ## 注: 该条 SQL 必须且仅需查询 `password` 字段
  19. auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
  20. ## 密码加密方式: plain, md5, sha, sha256, pbkdf2
  21. auth.mysql.password_hash = sha256
  22. ## 超级用户查询语句
  23. auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
  24. ## ACL 查询语句
  25. ## 注: 可以增加 'ORDER BY' 子句以控制 ACL 规则的生效顺序
  26. auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

此外,为防止密码域过于简单而带来安全的隐患问题,该插件还支持密码加盐操作:

  1. ## 加盐密文格式
  2. ## auth.mysql.password_hash = salt,sha256
  3. ## auth.mysql.password_hash = salt,bcrypt
  4. ## auth.mysql.password_hash = sha256,salt
  5. ## pbkdf2 带 macfun 格式
  6. ## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
  7. ## auth.mysql.password_hash = pbkdf2,sha256,1000,20

注解

3.1 版本新增 %C %d 支持。

Postgres 认证插件

emqx_auth_pgsql 通过访问 Postgres 实现 连接认证访问控制 功能。同样需要定义两张表如下:

Postgres MQTT 用户表

  1. CREATE TABLE mqtt_user (
  2. id SERIAL primary key,
  3. is_superuser boolean,
  4. username character varying(100),
  5. password character varying(100),
  6. salt character varying(40)
  7. );

Postgres MQTT 访问控制表

  1. CREATE TABLE mqtt_acl (
  2. id SERIAL primary key,
  3. allow integer,
  4. ipaddr character varying(60),
  5. username character varying(100),
  6. clientid character varying(100),
  7. access integer,
  8. topic character varying(100)
  9. );
  10. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
  11. VALUES
  12. (1,1,NULL,'$all',NULL,2,'#'),
  13. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
  14. (3,0,NULL,'$all',NULL,1,'eq #'),
  15. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
  16. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
  17. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

配置 Postgres 认证鉴权插件

etc/plugins/emqx_auth_pgsql.conf:

  1. ## PostgreSQL 服务地址
  2. auth.pgsql.server = 127.0.0.1:5432
  3. ## PostgreSQL 连接池大小
  4. auth.pgsql.pool = 8
  5. auth.pgsql.username = root
  6. ## auth.pgsql.password =
  7. auth.pgsql.database = mqtt
  8. auth.pgsql.encoding = utf8
  9. ## 连接认证查询 SQL
  10. ## 占位符:
  11. ## - %u: username
  12. ## - %c: clientid
  13. ## - %C: common name of client TLS cert
  14. ## - %d: subject of client TLS cert
  15. auth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1
  16. ## 加密方式: plain | md5 | sha | sha256 | bcrypt
  17. auth.pgsql.password_hash = sha256
  18. ## 超级用户查询语句 (占位符与认证一致)
  19. auth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
  20. ## ACL 查询语句
  21. ##
  22. ## 占位符:
  23. ## - %a: ipaddress
  24. ## - %u: username
  25. ## - %c: clientid
  26. ## 注: 可以增加 'ORDER BY' 子句以控制 ACL 规则的生效顺序
  27. auth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

同样的 password_hash 可以配置为更为安全的模式:

  1. ## 加盐加密格式
  2. ## auth.pgsql.password_hash = salt,sha256
  3. ## auth.pgsql.password_hash = sha256,salt
  4. ## auth.pgsql.password_hash = salt,bcrypt
  5. ## pbkdf2 macfun 格式
  6. ## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
  7. ## auth.pgsql.password_hash = pbkdf2,sha256,1000,20

开启以下配置,则可支持 TLS 连接到 Postgres:

  1. ## 是否开启 SSL
  2. auth.pgsql.ssl = false
  3. ## 证书配置
  4. ## auth.pgsql.ssl_opts.keyfile =
  5. ## auth.pgsql.ssl_opts.certfile =
  6. ## auth.pgsql.ssl_opts.cacertfile =

注解

3.1 版本新增 %C %d 支持。

Redis 认证/访问控制插件

emqx_auth_redis 通过访问 Redis 数据以实现 连接认证访问控制 的功能。

配置 Redis 认证插件

etc/plugins/emqx_auth_redis.conf:

  1. ## Redis 服务集群类型
  2. ## 枚举值: single | sentinel | cluster
  3. auth.redis.type = single
  4. ## Redis 服务器地址
  5. ##
  6. ## Single Redis Server: 127.0.0.1:6379, localhost:6379
  7. ## Redis Sentinel: 127.0.0.1:26379,127.0.0.2:26379,127.0.0.3:26379
  8. ## Redis Cluster: 127.0.0.1:6379,127.0.0.2:6379,127.0.0.3:6379
  9. auth.redis.server = 127.0.0.1:6379
  10. ## Redis sentinel 名称
  11. ## auth.redis.sentinel = mymaster
  12. ## Redis 连接池大小
  13. auth.redis.pool = 8
  14. ## Redis database 序号
  15. auth.redis.database = 0
  16. ## Redis password.
  17. ## auth.redis.password =
  18. ## Redis 查询超时时间
  19. auth.redis.query_timeout = 5s
  20. ## 认证查询指令
  21. ## 占位符:
  22. ## - %u: username
  23. ## - %c: clientid
  24. ## - %C: common name of client TLS cert
  25. ## - %d: subject of client TLS cert
  26. auth.redis.auth_cmd = HMGET mqtt_user:%u password
  27. ## 密码加密方式.
  28. ## 枚举: plain | md5 | sha | sha256 | bcrypt
  29. auth.redis.password_hash = plain
  30. ## 超级用户查询指令 (占位符与认证一致)
  31. auth.redis.super_cmd = HGET mqtt_user:%u is_superuser
  32. ## ACL 查询指令
  33. ## 占位符:
  34. ## - %u: username
  35. ## - %c: clientid
  36. auth.redis.acl_cmd = HGETALL mqtt_acl:%u

同样,该插件支持更安全的密码格式:

  1. ## 加盐密文格式
  2. ## auth.redis.password_hash = salt,sha256
  3. ## auth.redis.password_hash = sha256,salt
  4. ## auth.redis.password_hash = salt,bcrypt
  5. ## pbkdf2 macfun 格式
  6. ## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
  7. ## auth.redis.password_hash = pbkdf2,sha256,1000,20

注解

3.1 版本新增 %C %d 支持。

Redis 用户 Hash

默认基于用户 Hash 认证:

  1. HSET mqtt_user:<username> is_superuser 1
  2. HSET mqtt_user:<username> password "passwd"
  3. HSET mqtt_user:<username> salt "salt"

Redis ACL 规则 Hash

默认采用 Hash 存储 ACL 规则:

  1. HSET mqtt_acl:<username> topic1 1
  2. HSET mqtt_acl:<username> topic2 2
  3. HSET mqtt_acl:<username> topic3 3

注解

1: subscribe, 2: publish, 3: pubsub

MongoDB 认证/访问控制插件

emqx_auth_mongo 通过访问 MongoDB 实现 连接认证访问控制 功能。

配置 MongoDB 认证插件

etc/plugins/emqx_auth_mongo.conf:

  1. ## MongoDB 拓扑类型
  2. ## 枚举: single | unknown | sharded | rs
  3. auth.mongo.type = single
  4. ## rs 模式下的 `set name`
  5. ## auth.mongo.rs_set_name =
  6. ## MongoDB 服务地址
  7. auth.mongo.server = 127.0.0.1:27017
  8. ## MongoDB 连接池大小
  9. auth.mongo.pool = 8
  10. ## 连接认证信息
  11. ## auth.mongo.login =
  12. ## auth.mongo.password =
  13. ## auth.mongo.auth_source = admin
  14. ## 认证数据表名
  15. auth.mongo.database = mqtt
  16. ## 查询超时时间
  17. auth.mongo.query_timeout = 5s
  18. ## 认证查询配置
  19. auth.mongo.auth_query.collection = mqtt_user
  20. auth.mongo.auth_query.password_field = password
  21. auth.mongo.auth_query.password_hash = sha256
  22. ## 连接认证查询字段列表
  23. ## 占位符:
  24. ## - %u: username
  25. ## - %c: clientid
  26. ## - %C: common name of client TLS cert
  27. ## - %d: subject of client TLS cert
  28. auth.mongo.auth_query.selector = username=%u
  29. ## 超级用户查询
  30. auth.mongo.super_query = on
  31. auth.mongo.super_query.collection = mqtt_user
  32. auth.mongo.super_query.super_field = is_superuser
  33. auth.mongo.super_query.selector = username=%u
  34. ## ACL 查询配置
  35. auth.mongo.acl_query = on
  36. auth.mongo.acl_query.collection = mqtt_acl
  37. auth.mongo.acl_query.selector = username=%u

注解

3.1 版本新增 %C %d 支持。

MongoDB 数据库

  1. use mqtt
  2. db.createCollection("mqtt_user")
  3. db.createCollection("mqtt_acl")
  4. db.mqtt_user.ensureIndex({"username":1})

注解

数据库、集合名称可自定义。

MongoDB 用户集合

  1. {
  2. username: "user",
  3. password: "password hash",
  4. is_superuser: boolean (true, false),
  5. created: "datetime"
  6. }

示例:

  1. db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
  2. db.mqtt_user:insert({username: "root", is_superuser: true})

MongoDB ACL 集合

  1. {
  2. username: "username",
  3. clientid: "clientid",
  4. publish: ["topic1", "topic2", ...],
  5. subscribe: ["subtop1", "subtop2", ...],
  6. pubsub: ["topic/#", "topic1", ...]
  7. }

示例:

  1. db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
  2. db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})

PSK 认证插件

emqx_psk_file 插件主要提供了 PSK 支持。其目的是用于在客户端建立 TLS/DTLS 连接时,通过 PSK 方式实现 连接认证 的功能。

配置 PSK 认证插件

etc/plugins/emqx_psk_file.conf:

  1. psk.file.path = etc/psk.txt

WebHook 插件

emqx_web_hook 插件可以将所有 EMQ X 的事件及消息都发送到指定的 HTTP 服务器。

配置 WebHook 插件

etc/plugins/emqx_web_hook.conf:

  1. ## 回调的 Web Server 地址
  2. web.hook.api.url = http://127.0.0.1:8080
  3. ## 编码 Payload 字段
  4. ## 枚举值: undefined | base64 | base62
  5. ## 默认值: undefined (不进行编码)
  6. ## web.hook.encode_payload = base64
  7. ## 消息、事件配置
  8. web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
  9. web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
  10. web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
  11. web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"}
  12. web.hook.rule.session.created.1 = {"action": "on_session_created"}
  13. web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"}
  14. web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
  15. web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"}
  16. web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
  17. web.hook.rule.message.deliver.1 = {"action": "on_message_deliver"}
  18. web.hook.rule.message.acked.1 = {"action": "on_message_acked"}

Lua 插件

emqx_lua_hook 插件将所有的事件和消息都发送到指定的 Lua 函数上。其具体使用参见其 README。

Retainer 插件

emqx_retainer 该插件设置为默认启动,为 EMQ X 提供 Retained 类型的消息支持。它会将所有主题的 Retained 消息存储在集群的数据库中,并待有客户端订阅该主题的时候将该消息投递出去。

配置 Retainer 插件

etc/plugins/emqx_retainer.conf:

  1. ## retained 消息存储方式
  2. ## - ram: 仅内存
  3. ## - disc: 内存和磁盘
  4. ## - disc_only: 仅磁盘
  5. retainer.storage_type = ram
  6. ## 最大存储数 (0表示未限制)
  7. retainer.max_retained_messages = 0
  8. ## 单条最大可存储消息大小
  9. retainer.max_payload_size = 1MB
  10. ## 过期时间, 0 表示永不过期
  11. ## 单位: h 小时; m 分钟; s 秒。如 60m 表示 60 分钟
  12. retainer.expiry_interval = 0

MQTT 消息桥接插件

桥接 的概念是 EMQ X 支持将自身某类主题的消息通过某种方式转发到另一个 MQTT Broker。

桥接集群 的不同在于:桥接不会复制主题树与路由表,只根据桥接规则转发 MQTT 消息。

目前 MQTT 消息插件支持的桥接方式有:

  • RPC 桥接:RPC 桥接只能在 EMQ X Broker 间使用,且不支持订阅远程节点的主题去同步数据

  • MQTT 桥接:MQTT 桥接同时支持转发和通过订阅主题来实现数据同步两种方式

在 EMQ X 中,通过修改 etc/plugins/emqx_bridge_mqtt.conf 来配置 bridge。EMQ X 根据不同的 name 来区分不同的 bridge。例如:

  1. ## 桥接地址: 使用节点名(nodename@host)则用于 RPC 桥接,使用 host:port 用于 MQTT 连接
  2. bridge.mqtt.aws.address = 127.0.0.1:1883

该项配置声明了一个名为 aws 的 bridge 并指定以 MQTT 的方式桥接到 127.0.0.1:1883 这台 MQTT 服务器

在需要创建多个 bridge 时,可以先复制其全部的配置项,在通过使用不同的 name 来标示(比如 bridge.mqtt.$name.address 其中 $name 指代的为 bridge 的名称)

配置 MQTT 消息桥接插件

etc/plugins/emqx_bridge_mqtt.conf

  1. ## 桥接地址: 使用节点名(nodename@host)则用于 RPC 桥接,使用 host:port 用于 MQTT 连接
  2. bridge.mqtt.aws.address = 192.168.1.2:1883
  3. ## 桥接的协议版本
  4. ## 枚举值: mqttv3 | mqttv4 | mqttv5
  5. bridge.mqtt.aws.proto_ver = mqttv4
  6. ## 客户端的 client_id
  7. bridge.mqtt.aws.client_id = bridge_emq
  8. ## 客户端的 clean_start 字段
  9. ## 注: 有些 MQTT Broker 需要将 clean_start 值设成 `true`
  10. bridge.mqtt.aws.clean_start = true
  11. ## 客户端的 username 字段
  12. bridge.mqtt.aws.username = user
  13. ## 客户端的 password 字段
  14. bridge.mqtt.aws.password = passwd
  15. ## 客户端是否使用 ssl 来连接远程服务器
  16. bridge.mqtt.aws.ssl = off
  17. ## 客户端 SSL 连接的 CA 证书 (PEM格式)
  18. bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem
  19. ## 客户端 SSL 连接的 SSL 证书
  20. bridge.mqtt.aws.certfile = etc/certs/client-cert.pem
  21. ## 客户端 SSL 连接的密钥文件
  22. bridge.mqtt.aws.keyfile = etc/certs/client-key.pem
  23. ## SSL 加密方式
  24. bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
  25. ## TLS PSK 的加密套件
  26. ## 注意 'listener.ssl.external.ciphers' 和 'listener.ssl.external.psk_ciphers' 不能同时配置
  27. ##
  28. ## See 'https://tools.ietf.org/html/rfc4279#section-2'.
  29. ## bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
  30. ## 客户端的心跳间隔
  31. bridge.mqtt.aws.keepalive = 60s
  32. ## 支持的 TLS 版本
  33. bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
  34. ## 需要被转发的消息的主题
  35. bridge.mqtt.aws.forwards = sensor1/#,sensor2/#
  36. ## 挂载点(mountpoint)
  37. bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/
  38. ## 订阅对端的主题
  39. bridge.mqtt.aws.subscription.1.topic = cmd/topic1
  40. ## 订阅对端主题的 QoS
  41. bridge.mqtt.aws.subscription.1.qos = 1
  42. ## 桥接的重连间隔
  43. ## 默认: 30秒
  44. bridge.mqtt.aws.reconnect_interval = 30s
  45. ## QoS1/QoS2 消息的重传间隔
  46. bridge.mqtt.aws.retry_interval = 20s
  47. ## Inflight 大小.
  48. bridge.mqtt.aws.max_inflight_batches = 32
  49. ## emqx_bridge 内部用于 batch 的消息数量
  50. bridge.mqtt.aws.queue.batch_count_limit = 32
  51. ## emqx_bridge 内部用于 batch 的消息字节数
  52. bridge.mqtt.aws.queue.batch_bytes_limit = 1000MB
  53. ## 放置 replayq 队列的路径,如果没有在配置中指定该项,那么 replayq
  54. ## 将会以 `mem-only` 的模式运行,消息不会缓存到磁盘上。
  55. bridge.mqtt.aws.queue.replayq_dir = data/emqx_aws_bridge/
  56. ## Replayq 数据段大小
  57. bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB

Delayed Publish 插件

emqx_delayed_publish 提供了延迟发送消息的功能。当客户端使用特殊主题前缀 $delayed/<seconds>/ 发布消息到 EMQ X 时,EMQ X 将在 <seconds> 秒后发布该主题消息。

CoAP 协议插件

emqx_coap 提供对 CoAP 协议(RFC 7252)的支持。

配置 CoAP 协议插件

etc/plugins/emqx_coap.conf:

  1. coap.port = 5683
  2. coap.keepalive = 120s
  3. coap.enable_stats = off

若开启以下配置,则可以支持 DTLS:

  1. ## DTLS 监听端口
  2. coap.dtls.port = 5684
  3. coap.dtls.keyfile = {{ platform_etc_dir }}/certs/key.pem
  4. coap.dtls.certfile = {{ platform_etc_dir }}/certs/cert.pem
  5. ## 双向认证相关
  6. ## coap.dtls.verify = verify_peer
  7. ## coap.dtls.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
  8. ## coap.dtls.fail_if_no_peer_cert = false

测试 CoAP 插件

我们可以通过安装 libcoap 来测试 EMQ X 对 CoAP 协议的支持情况。

  1. yum install libcoap
  2. % coap client publish message
  3. coap-client -m put -e "qos=0&retain=0&message=payload&topic=hello" coap://localhost/mqtt

LwM2M 协议插件

emqx_lwm2m 提供对 LwM2M 协议的支持。

配置 LwM2M 插件

etc/plugins/emqx_lwm2m.conf:

  1. ## LwM2M 监听端口
  2. lwm2m.port = 5683
  3. ## Lifetime 限制
  4. lwm2m.lifetime_min = 1s
  5. lwm2m.lifetime_max = 86400s
  6. ## Q Mode 模式下 `time window` 长度, 单位秒。
  7. ## 超过该 window 的消息都将被缓存
  8. #lwm2m.qmode_time_window = 22
  9. ## LwM2M 是否部署在 coaproxy 后
  10. #lwm2m.lb = coaproxy
  11. ## 设备上线后,主动 observe 所有的 objects
  12. #lwm2m.auto_observe = off
  13. # 主题挂载点
  14. # Placeholders supported:
  15. # '%e': Endpoint Name
  16. # '%a': IP Address
  17. lwm2m.mountpoint = lwm2m/%e/
  18. ## client register 成功后主动向 EMQ X 订阅的主题
  19. ## 占位符:
  20. ## '%e': Endpoint Name
  21. ## '%a': IP Address
  22. lwm2m.topics.command = dn/#
  23. ## client 应答消息(response) 到 EMQ X 的主题
  24. lwm2m.topics.response = up/resp
  25. ## client 通知类消息(noify message) 到 EMQ X 的主题
  26. lwm2m.topics.notify = up/notify
  27. ## client 注册类消息(register message) 到 EMQ X 的主题
  28. lwm2m.topics.register = up/resp
  29. # client 更新类消息(update message) 到 EMQ X 的主题
  30. lwm2m.topics.update = %e/up/resp
  31. # Object 定义的 xml 文件位置
  32. lwm2m.xml_dir = etc/lwm2m_xml

同样可以通过以下配置打开 DTLS 支持:

  1. # DTLS 证书配置
  2. lwm2m.certfile = etc/certs/cert.pem
  3. lwm2m.keyfile = etc/certs/key.pem

MQTT-SN 协议插件

emqx_sn 插件提供对 MQTT-SN 协议的支持。

配置 MQTT-SN 协议插件

etc/plugins/emqx_sn.conf:

  1. mqtt.sn.port = 1884

Stomp 协议插件

emqx_stomp 提供对 Stomp 协议的支持。支持客户端通过 Stomp 1.0/1.1/1.2 协议连接 EMQ X,发布订阅 MQTT 消息。

配置 Stomp 插件

注解

Stomp 协议端口: 61613

etc/plugins/emqx_stomp.conf:

  1. stomp.default_user.login = guest
  2. stomp.default_user.passcode = guest
  3. stomp.allow_anonymous = true
  4. stomp.frame.max_headers = 10
  5. stomp.frame.max_header_length = 1024
  6. stomp.frame.max_body_length = 8192
  7. stomp.listener = 61613
  8. stomp.listener.acceptors = 4
  9. stomp.listener.max_clients = 512

Recon 性能调试插件

emqx_recon 插件集成了 recon 性能调测库,可用于查看当前系统的一些状态信息,例如:

  1. ./bin/emqx_ctl recon
  2. recon memory #recon_alloc:memory/2
  3. recon allocated #recon_alloc:memory(allocated_types, current|max)
  4. recon bin_leak #recon:bin_leak(100)
  5. recon node_stats #recon:node_stats(10, 1000)
  6. recon remote_load Mod #recon:remote_load(Mod)

配置 Recon 插件

etc/plugins/emqx_recon.conf:

  1. %% Garbage Collection: 10 minutes
  2. recon.gc_interval = 600

Reloader 热加载插件

emqx_reloader 用于开发调试的代码热升级插件。加载该插件后 EMQ X 会根据配置的时间间隔自动热升级更新代码。

同时,也提供了 CLI 命令来指定 reload 某一个模块:

  1. ./bin/emqx_ctl reload <Module>

注解

产品部署环境不建议使用该插件。

配置 Reloader 插件

etc/plugins/emqx_reloader.conf:

  1. reloader.interval = 60
  2. reloader.logfile = log/reloader.log

插件开发模版

emqx_plugin_template 是一个 EMQ X 插件模板,在功能上并无任何意义。

开发者需要自定义插件时,可以查看该插件的代码和结构,以更快地开发一个标准的 EMQ X 插件。插件实际是一个普通的 Erlang Application,其配置文件为: etc/${PluginName}.config

EMQ X R3.2 插件开发

创建插件项目

参考 emqx_plugin_template 插件模版创建新的插件项目。

注解

<plugin name>_app.erl 文件中必须加上标签 -emqx_plugin(?MODULE). 以表明这是一个 EMQ X 的插件。

创建认证/访问控制模块

认证演示模块 - emqx_auth_demo.erl

  1. -module(emqx_auth_demo).
  2. -export([ init/1
  3. , check/2
  4. , description/0
  5. ]).
  6. init(Opts) -> {ok, Opts}.
  7. check(_Credentials = #{client_id := ClientId, username := Username, password := Password}, _State) ->
  8. io:format("Auth Demo: clientId=~p, username=~p, password=~p~n", [ClientId, Username, Password]),
  9. ok.
  10. description() -> "Auth Demo Module".

访问控制演示模块 - emqx_acl_demo.erl

  1. -module(emqx_acl_demo).
  2. -include_lib("emqx/include/emqx.hrl").
  3. %% ACL callbacks
  4. -export([ init/1
  5. , check_acl/5
  6. , reload_acl/1
  7. , description/0
  8. ]).
  9. init(Opts) ->
  10. {ok, Opts}.
  11. check_acl({Credentials, PubSub, _NoMatchAction, Topic}, _State) ->
  12. io:format("ACL Demo: ~p ~p ~p~n", [Credentials, PubSub, Topic]),
  13. allow.
  14. reload_acl(_State) ->
  15. ok.
  16. description() -> "ACL Demo Module".

注册认证、访问控制模块 - emqx_plugin_template_app.erl

  1. ok = emqx:hook('client.authenticate', fun emqx_auth_demo:check/2, []),
  2. ok = emqx:hook('client.check_acl', fun emqx_acl_demo:check_acl/5, []).

注册钩子(Hooks)

通过钩子(Hook)处理客户端上下线、主题订阅、消息收发。

emqx_plugin_template.erl:

  1. %% Called when the plugin application start
  2. load(Env) ->
  3. emqx:hook('client.authenticate', fun ?MODULE:on_client_authenticate/2, [Env]),
  4. emqx:hook('client.check_acl', fun ?MODULE:on_client_check_acl/5, [Env]),
  5. emqx:hook('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
  6. emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
  7. emqx:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
  8. emqx:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
  9. emqx:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
  10. emqx:hook('session.resumed', fun ?MODULE:on_session_resumed/3, [Env]),
  11. emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
  12. emqx:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
  13. emqx:hook('session.terminated', fun ?MODULE:on_session_terminated/3, [Env]),
  14. emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
  15. emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
  16. emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]),
  17. emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, [Env]).

所有可用钩子(Hook)说明:

钩子

说明

client.authenticate

连接认证

client.check_acl

ACL 校验

client.connected

客户端上线

client.disconnected

客户端连接断开

client.subscribe

客户端订阅主题

client.unsubscribe

客户端取消订阅主题

session.created

会话创建

session.resumed

会话恢复

session.subscribed

会话订阅主题后

session.unsubscribed

会话取消订阅主题后

session.terminated

会话终止

message.publish

MQTT 消息发布

message.deliver

MQTT 消息进行投递

message.acked

MQTT 消息回执

message.dropped

MQTT 消息丢弃

注册 CLI 命令

扩展命令行演示模块 - emqx_cli_demo.erl

  1. -module(emqx_cli_demo).
  2. -export([cmd/1]).
  3. cmd(["arg1", "arg2"]) ->
  4. emqx_cli:print("ok");
  5. cmd(_) ->
  6. emqx_cli:usage([{"cmd arg1 arg2", "cmd demo"}]).

注册命令行模块 - emqx_plugin_template_app.erl

  1. ok = emqx_ctl:register_command(cmd, {emqx_cli_demo, cmd}, []),

插件加载后,./bin/emqx_ctl 新增命令行:

  1. ./bin/emqx_ctl cmd arg1 arg2

插件配置文件

插件自带配置文件放置在 etc/${plugin_name}.conf|configEMQ X 支持两种插件配置格式:

  1. Erlang 原生配置文件格式 - ${plugin_name}.config:

    1. [
    2. {plugin_name, [
    3. {key, value}
    4. ]}
    5. ].
  2. sysctl 的 k = v 通用格式 - ${plugin_name}.conf:

    1. plugin_name.key = value

注解

k = v 格式配置需要插件开发者创建 priv/plugin_name.schema 映射文件。

编译发布插件

  1. clone emqx-rel 项目:
  1. git clone https://github.com/emqx/emqx-rel.git
  1. rebar.config 添加依赖:
  1. {deps,
  2. [ {plugin_name, {git, "url_of_plugin", {tag, "tag_of_plugin"}}}
  3. , ....
  4. ....
  5. ]
  6. }
  1. rebar.config 中 relx 段落添加:
  1. {relx,
  2. [...
  3. , ...
  4. , {release, {emqx, git_describe},
  5. [
  6. {plugin_name, load},
  7. ]
  8. }
  9. ]
  10. }