数据订阅

基于数据天然的时间序列特性,TDengine 的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine 中每一张表均可视为一个标准的消息队列。

TDengine 内嵌支持轻量级的消息订阅与推送服务。使用系统提供的 API,用户可使用普通查询语句订阅数据库中的一张或多张表。订阅的逻辑和操作状态的维护均是由客户端完成,客户端定时轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户。

TDengine 的订阅与推送服务的状态是由客户端维持,TDengine 服务端并不维持。因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。

TDengine 的 API 中,与订阅相关的主要有以下三个:

  1. taos_subscribe
  2. taos_consume
  3. taos_unsubscribe

这些 API 的文档请见 C/C++ Connector,下面仍以智能电表场景为例介绍一下它们的具体用法(超级表和子表结构请参考上一节“连续查询”),完整的示例代码可以在 这里 找到。

如果我们希望当某个电表的电流超过一定限制(比如 10A)后能得到通知并进行一些处理, 有两种方法:一是分别对每张子表进行查询,每次查询后记录最后一条数据的时间戳,后续只查询这个时间戳之后的数据:

  1. select * from D1001 where ts > {last_timestamp1} and current > 10;
  2. select * from D1002 where ts > {last_timestamp2} and current > 10;
  3. ...

这确实可行,但随着电表数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响,当电表数增长到一定的程度,系统就无法承受了。

另一种方法是对超级表进行查询。这样,无论有多少电表,都只需一次查询:

  1. select * from meters where ts > {last_timestamp} and current > 10;

但是,如何选择 last_timestamp 就成了一个新的问题。因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大;另一方面,不同电表的数据到达 TDengine 的时间也会有差异。所以,如果我们在查询中使用最慢的那台电表的数据的时间戳作为 last_timestamp,就可能重复读入其它电表的数据;如果使用最快的电表的时间戳,其它电表的数据就可能被漏掉。

TDengine 的订阅功能为上面这个问题提供了一个彻底的解决方案。

首先是使用 taos_subscribe 创建订阅:

  1. TAOS_SUB* tsub = NULL;
  2. if (async) {
  3.   // create an asynchronized subscription, the callback function will be called every 1s
  4.   tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
  5. } else {
  6.   // create an synchronized subscription, need to call 'taos_consume' manually
  7.   tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
  8. }

TDengine 中的订阅既可以是同步的,也可以是异步的,上面的代码会根据从命令行获取的参数 async 的值来决定使用哪种方式。这里,同步的意思是用户程序要直接调用 taos_consume 来拉取数据,而异步则由 API 在内部的另一个线程中调用 taos_consume,然后把拉取到的数据交给回调函数 subscribe_callback去处理。(注意,subscribe_callback 中不宜做较为耗时的操作,否则有可能导致客户端阻塞等不可控的问题。)

参数 taos 是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在 API 的内部线程中被调用,而 TDengine 的部分 API 不是线程安全的。

参数 sql 是查询语句,可以在其中使用 where 子句指定过滤条件。在我们的例子中,如果只想订阅电流超过 10A 时的数据,可以这样写:

  1. select * from meters where current > 10;

注意,这里没有指定起始时间,所以会读到所有时间的数据。如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:

  1. select * from meters where ts > now - 1d and current > 10;

订阅的 topic 实际上是它的名字,因为订阅功能是在客户端 API 中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。

如果名为 topic 的订阅不存在,参数 restart 没有意义;但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个 topic 时,restart 就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。本例中,如果 restarttrue(非零值),用户程序肯定会读到所有数据。但如果这个订阅之前就存在了,并且已经读取了一部分数据,且 restartfalse0),用户程序就不会读到之前已经读取的数据了。

taos_subscribe的最后一个参数是以毫秒为单位的轮询周期。在同步模式下,如果前后两次调用 taos_consume 的时间间隔小于此时间,taos_consume 会阻塞,直到间隔超过此时间。异步模式下,这个时间是两次调用回调函数的最小时间间隔。

taos_subscribe 的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅 API 不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。

订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:

  1. if (async) {
  2.   getchar();
  3. } else while(1) {
  4.   TAOS_RES* res = taos_consume(tsub);
  5.   if (res == NULL) {
  6.     printf("failed to consume data.");
  7.     break;
  8.   } else {
  9.     print_result(res, blockFetch);
  10.     getchar();
  11.   }
  12. }

这里是一个 while 循环,用户每按一次回车键就调用一次 taos_consume,而 taos_consume 的返回值是查询到的结果集,与 taos_use_result 完全相同,例子中使用这个结果集的代码是函数 print_result

  1. void print_result(TAOS_RES* res, int blockFetch) {
  2.   TAOS_ROW row = NULL;
  3.   int num_fields = taos_num_fields(res);
  4.   TAOS_FIELD* fields = taos_fetch_fields(res);
  5.   int nRows = 0;
  6.   if (blockFetch) {
  7.     nRows = taos_fetch_block(res, &row);
  8.     for (int i = 0; i < nRows; i++) {
  9.       char temp[256];
  10.       taos_print_row(temp, row + i, fields, num_fields);
  11.       puts(temp);
  12.     }
  13.   } else {
  14.     while ((row = taos_fetch_row(res))) {
  15.       char temp[256];
  16.       taos_print_row(temp, row, fields, num_fields);
  17.       puts(temp);
  18.       nRows++;
  19.     }
  20.   }
  21.   printf("%d rows consumed.\n", nRows);
  22. }

其中的 taos_print_row 用于处理订阅到数据,在我们的例子中,它会打印出所有符合条件的记录。而异步模式下,消费订阅到的数据则显得更为简单:

  1. void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  2.   print_result(res, *(int*)param);
  3. }

当要结束一次数据订阅时,需要调用 taos_unsubscribe

  1. taos_unsubscribe(tsub, keep);

其第二个参数,用于决定是否在客户端保留订阅的进度信息。如果这个参数是false0),那无论下次调用 taos_subscribe 时的 restart 参数是什么,订阅都只能重新开始。另外,进度信息的保存位置是 {DataDir}/subscribe/ 这个目录下,每个订阅有一个与其 topic 同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。

代码介绍完毕,我们来看一下实际的运行效果。假设:

  • 示例代码已经下载到本地
  • TDengine 也已经在同一台机器上安装好
  • 示例所需的数据库、超级表、子表已经全部创建好

则可以在示例代码所在目录执行以下命令来编译并启动示例程序:

  1. make
  2. ./subscribe -sql='select * from meters where current > 10;'

示例程序启动后,打开另一个终端窗口,启动 TDengine CLI 向 D1001 插入一条电流为 12A 的数据:

  1. $ taos
  2. > use test;
  3. > insert into D1001 values(now, 12, 220, 1);

这时,因为电流超过了 10A,您应该可以看到示例程序将它输出到了屏幕上。您可以继续插入一些数据观察示例程序的输出。

示例程序

下面的示例程序展示是如何使用连接器订阅所有电流超过 10A 的记录。

准备数据

  1. # create database "power"
  2. taos> create database power;
  3. # use "power" as the database in following operations
  4. taos> use power;
  5. # create super table "meters"
  6. taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int);
  7. # create tabes using the schema defined by super table "meters"
  8. taos> create table d1001 using meters tags ("California.SanFrancisco", 2);
  9. taos> create table d1002 using meters tags ("California.LosAngeles", 2);
  10. # insert some rows
  11. taos> insert into d1001 values("2020-08-15 12:00:00.000", 12, 220, 1),("2020-08-15 12:10:00.000", 12.3, 220, 2),("2020-08-15 12:20:00.000", 12.2, 220, 1);
  12. taos> insert into d1002 values("2020-08-15 12:00:00.000", 9.9, 220, 1),("2020-08-15 12:10:00.000", 10.3, 220, 1),("2020-08-15 12:20:00.000", 11.2, 220, 1);
  13. # filter out the rows in which current is bigger than 10A
  14. taos> select * from meters where current > 10;
  15. ts | current | voltage | phase | location | groupid |
  16. ===========================================================================================================
  17. 2020-08-15 12:10:00.000 | 10.30000 | 220 | 1 | California.LosAngeles | 2 |
  18. 2020-08-15 12:20:00.000 | 11.20000 | 220 | 1 | California.LosAngeles | 2 |
  19. 2020-08-15 12:00:00.000 | 12.00000 | 220 | 1 | California.SanFrancisco | 2 |
  20. 2020-08-15 12:10:00.000 | 12.30000 | 220 | 2 | California.SanFrancisco | 2 |
  21. 2020-08-15 12:20:00.000 | 12.20000 | 220 | 1 | California.SanFrancisco | 2 |
  22. Query OK, 5 row(s) in set (0.004896s)

示例代码

  • Java
  • Python
  • Rust
  • C
  1. package com.taos.example;
  2. import com.taosdata.jdbc.TSDBConnection;
  3. import com.taosdata.jdbc.TSDBDriver;
  4. import com.taosdata.jdbc.TSDBResultSet;
  5. import com.taosdata.jdbc.TSDBSubscribe;
  6. import java.sql.Connection;
  7. import java.sql.DriverManager;
  8. import java.sql.ResultSetMetaData;
  9. import java.sql.SQLException;
  10. import java.util.Properties;
  11. import java.util.concurrent.TimeUnit;
  12. public class SubscribeDemo {
  13. private static final String topic = "topic-meter-current-bg-10";
  14. private static final String sql = "select * from meters where current > 10";
  15. public static void main(String[] args) {
  16. Connection connection = null;
  17. TSDBSubscribe subscribe = null;
  18. try {
  19. Class.forName("com.taosdata.jdbc.TSDBDriver");
  20. Properties properties = new Properties();
  21. properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
  22. properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
  23. String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata";
  24. connection = DriverManager.getConnection(jdbcUrl, properties);
  25. // create subscribe
  26. subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true);
  27. int count = 0;
  28. while (count < 10) {
  29. // wait 1 second to avoid frequent calls to consume
  30. TimeUnit.SECONDS.sleep(1);
  31. // consume
  32. TSDBResultSet resultSet = subscribe.consume();
  33. if (resultSet == null) {
  34. continue;
  35. }
  36. ResultSetMetaData metaData = resultSet.getMetaData();
  37. while (resultSet.next()) {
  38. int columnCount = metaData.getColumnCount();
  39. for (int i = 1; i <= columnCount; i++) {
  40. System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t");
  41. }
  42. System.out.println();
  43. count++;
  44. }
  45. }
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. } finally {
  49. try {
  50. if (null != subscribe)
  51. // close subscribe
  52. subscribe.close(true);
  53. if (connection != null)
  54. connection.close();
  55. } catch (SQLException throwable) {
  56. throwable.printStackTrace();
  57. }
  58. }
  59. }
  60. }

查看源码

数据订阅 - 图1note

目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 TimerTask 等方式达到同样的效果。

  1. """
  2. Python asynchronous subscribe demo.
  3. run on Linux system with: python3 subscribe_demo.py
  4. """
  5. from ctypes import c_void_p
  6. import taos
  7. import time
  8. def query_callback(p_sub, p_result, p_param, code):
  9. """
  10. :param p_sub: pointer returned by native API -- taos_subscribe
  11. :param p_result: pointer to native TAOS_RES
  12. :param p_param: None
  13. :param code: error code
  14. :return: None
  15. """
  16. print("in callback")
  17. result = taos.TaosResult(c_void_p(p_result))
  18. # raise exception if error occur
  19. result.check_error(code)
  20. for row in result.rows_iter():
  21. print(row)
  22. print(f"{result.row_count} rows consumed.")
  23. if __name__ == '__main__':
  24. conn = taos.connect()
  25. restart = True
  26. topic = "topic-meter-current-bg"
  27. sql = "select * from power.meters where current > 10" # Error sql
  28. interval = 2000 # consumption interval in microseconds.
  29. _ = conn.subscribe(restart, topic, sql, interval, query_callback)
  30. # Note: we received the return value as _ above, to avoid the TaosSubscription object to be deleted by gc.
  31. while True:
  32. time.sleep(10) # use Ctrl + C to interrupt

查看源码

  1. fn main() {
  2. }

查看源码

  1. // A simple demo for asynchronous subscription.
  2. // compile with:
  3. // gcc -o subscribe_demo subscribe_demo.c -ltaos
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <taos.h>
  8. int nTotalRows;
  9. /**
  10. * @brief callback function of subscription.
  11. *
  12. * @param tsub
  13. * @param res
  14. * @param param. the additional parameter passed to taos_subscribe
  15. * @param code. error code
  16. */
  17. void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) {
  18. if (code != 0) {
  19. printf("error: %d\n", code);
  20. exit(EXIT_FAILURE);
  21. }
  22. TAOS_ROW row = NULL;
  23. int num_fields = taos_num_fields(res);
  24. TAOS_FIELD* fields = taos_fetch_fields(res);
  25. int nRows = 0;
  26. while ((row = taos_fetch_row(res))) {
  27. char buf[4096] = {0};
  28. taos_print_row(buf, row, fields, num_fields);
  29. puts(buf);
  30. nRows++;
  31. }
  32. nTotalRows += nRows;
  33. printf("%d rows consumed.\n", nRows);
  34. }
  35. int main() {
  36. TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
  37. if (taos == NULL) {
  38. printf("failed to connect to server\n");
  39. exit(EXIT_FAILURE);
  40. }
  41. int restart = 1; // if the topic already exists, where to subscribe from the begin.
  42. const char* topic = "topic-meter-current-bg-10";
  43. const char* sql = "select * from power.meters where current > 10";
  44. void* param = NULL; // additional parameter.
  45. int interval = 2000; // consumption interval in microseconds.
  46. TAOS_SUB* tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, interval);
  47. // wait for insert from others process. you can open TDengine CLI to insert some records for test.
  48. getchar(); // press Enter to stop
  49. printf("total rows consumed: %d\n", nTotalRows);
  50. int keep = 0; // whether to keep subscribe process
  51. taos_unsubscribe(tsub, keep);
  52. taos_close(taos);
  53. taos_cleanup();
  54. }

查看源码

运行示例程序

示例程序会先消费符合查询条件的所有历史数据:

  1. ts: 1597464000000 current: 12.0 voltage: 220 phase: 1 location: California.SanFrancisco groupid : 2
  2. ts: 1597464600000 current: 12.3 voltage: 220 phase: 2 location: California.SanFrancisco groupid : 2
  3. ts: 1597465200000 current: 12.2 voltage: 220 phase: 1 location: California.SanFrancisco groupid : 2
  4. ts: 1597464600000 current: 10.3 voltage: 220 phase: 1 location: California.LosAngeles groupid : 2
  5. ts: 1597465200000 current: 11.2 voltage: 220 phase: 1 location: California.LosAngeles groupid : 2

接着,使用 TDengine CLI 向表中新增一条数据:

  1. # taos
  2. taos> use power;
  3. taos> insert into d1001 values(now, 12.4, 220, 1);

因为这条数据的电流大于 10A,示例程序会将其消费:

  1. ts: 1651146662805 current: 12.4 voltage: 220 phase: 1 location: California.SanFrancisco groupid: 2