OpenTSDB

EMQX 支持与 OpenTSDB 集成,因此可以将 MQTT 消息保存到 OpenTSDB 以便后续进行分析和检索。

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用OpenTSDB - 图1 (opens new window)

前置条件

特性

快速开始教程

本节介绍如何配置 OpenTSDB 数据桥接,包括如何设置 OpenTSDB 服务器、创建数据桥接和规则以将数据转发到 OpenTSDB、以及如何测试数据桥接和规则。

本教程假定您在本地机器上同时运行 EMQX 和 OpenTSDB。如果您在远程运行 OpenTSDB 和 EMQX,请相应地调整设置。

本教程将使用如下数据进行演示

  • 主题: t/opents
  • 载荷:
  1. {
  2. "metric": "cpu",
  3. "tags": {
  4. "host": "serverA"
  5. },
  6. "value":12
  7. }

安装 OpenTSDB

通过 Docker 安装并启动 OpenTSDB:

  1. docker pull petergrace/opentsdb-docker
  2. docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker

创建 OpenTSDB 桥接

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。

  2. 点击页面右上角的创建

  3. 在数据桥接类型中选择 OpenTSDB,点击下一步。

  4. 输入数据桥接名称,要求是大小写英文字母或数字组合。

  5. 输入 OpenTSDB 连接信息:

    • 服务器地址填写 http://127.0.0.1:4242,如果您在远程运行 OpenTSDB 服务器,需填写实际地址。
    • 其他选项使用默认值即可。
  6. 高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数,详细请参考数据桥接简介中的配置参数。

  7. 在完成创建之前,您可以点击测试连接来测试桥接可以连接到 OpenTSDB 服务器。

  8. 点击创建按钮完成数据桥接创建。

    在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 OpenTSDB 的数据。您也可以按照创建 OpenTSDB 数据桥接规则章节的步骤来创建规则。

创建 OpenTSDB 数据桥接规则

至此您已经完成数据桥接创建,接下来将继续创建一条规则来指定需要写入的数据。

  1. 转到 Dashboard 数据集成 -> 规则页面。

  2. 点击页面右上角的创建

  3. 输入规则 ID my_rule,在 SQL 编辑器中输入规则。例如将 t/# 主题的 MQTT 消息存储至 OpenTSDB,需输入以下 SQL 语法:

    注意:如果您希望制定自己的 SQL 语法,需要确保规则选出的字段(SELECT 部分)包含所有 SQL 模板中用到的变量。

    1. SELECT
    2. payload.metric as metric, payload.tags as tags, payload.value as value
    3. FROM
    4. "t/#"
  4. 点击添加动作,在动作下拉框中选择使用数据桥接转发选项,选择先前创建好的 OpenTSDB 数据桥接。点击添加

  5. 点击最下方创建按钮完成规则创建。

至此您已经完成整个创建过程,可以前往 数据集成 -> Flows 页面查看拓扑图,此时应当看到 t/# 主题的消息经过名为 my_rule 的规则处理,处理结果交由 OpenTSDB 存储。

测试桥接和规则

使用 MQTTX 向 t/opents 主题发布一条消息:

  1. mqttx pub -i emqx_c -t t/opents -m '{"metric":"cpu","tags":{"host":"serverA"},"value":12}'

查看 OpenTSDB 的数据桥接中的运行统计,命中、发送成功次数均 +1。

查看数据是否已经写入 OpenTSDB 中:

  1. curl -X POST -H "Accept: Application/json" -H "Content-Type: application/json" http://localhost:4242/api/query -d '{
  2. "start": "1h-ago",
  3. "queries": [
  4. {
  5. "aggregator": "last",
  6. "metric": "cpu",
  7. "tags": {
  8. "host": "*"
  9. }
  10. }
  11. ],
  12. "showTSUIDs": "true",
  13. "showQuery": "true",
  14. "delete": "false"
  15. }'

查询结果经格式化输出后如下:

  1. [
  2. {
  3. "metric": "cpu",
  4. "tags": {
  5. "host": "serverA"
  6. },
  7. "aggregateTags": [],
  8. "query": {
  9. "aggregator": "last",
  10. "metric": "cpu",
  11. "tsuids": null,
  12. "downsample": null,
  13. "rate": false,
  14. "filters": [
  15. {
  16. "tagk": "host",
  17. "filter": "*",
  18. "group_by": true,
  19. "type": "wildcard"
  20. }
  21. ],
  22. "percentiles": null,
  23. "index": 0,
  24. "rateOptions": null,
  25. "filterTagKs": [
  26. "AAAB"
  27. ],
  28. "explicitTags": false,
  29. "useFuzzyFilter": true,
  30. "preAggregate": false,
  31. "rollupUsage": null,
  32. "rollupTable": "raw",
  33. "showHistogramBuckets": false,
  34. "useMultiGets": true,
  35. "tags": {
  36. "host": "wildcard(*)"
  37. },
  38. "histogramQuery": false
  39. },
  40. "tsuids": [
  41. "000001000001000001"
  42. ],
  43. "dps": {
  44. "1683532519": 12
  45. }
  46. }
  47. ]%