查询数据

主要查询功能

TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或连接器发送 SQL 语句,用户还可以通过 TDengine 命令行工具 taos 手动执行 SQL 即席查询(Ad-Hoc Query)。TDengine 支持如下查询功能:

  • 单列、多列数据查询
  • 标签和数值的多种过滤条件:>, <, =, <>, like 等
  • 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset)
  • 数值列及聚合结果的四则运算
  • 时间戳对齐的连接查询(Join Query: 隐式连接)操作
  • 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等

例如:在命令行工具 taos 中,从表 d1001 中查询出 voltage > 215 的记录,按时间降序排列,仅仅输出 2 条。

  1. taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
  2. ts | current | voltage | phase |
  3. ======================================================================================
  4. 2018-10-03 14:38:16.800 | 12.30000 | 221 | 0.31000 |
  5. 2018-10-03 14:38:15.000 | 12.60000 | 218 | 0.33000 |
  6. Query OK, 2 row(s) in set (0.001100s)

为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。TDengine 还支持连续查询。

具体的查询语法请看 TAOS SQL 的数据查询 章节。

多表聚合查询

物联网场景中,往往同一个类型的数据采集点有多个。TDengine 采用超级表(STable)的概念来描述某一个类型的数据采集点,一张普通的表来描述一个具体的数据采集点。同时 TDengine 使用标签来描述数据采集点的静态属性,一个具体的数据采集点有具体的标签值。通过指定标签的过滤条件,TDengine 提供了一高效的方法将超级表(某一类型的数据采集点)所属的子表进行聚合查询。对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样。

示例一

在 TAOS Shell,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。

  1. taos> SELECT AVG(voltage) FROM meters GROUP BY location;
  2. avg(voltage) | location |
  3. =============================================================
  4. 222.000000000 | California.LosAngeles |
  5. 219.200000000 | California.SanFrancisco |
  6. Query OK, 2 row(s) in set (0.002136s)

示例二

在 TAOS shell, 查找 groupId 为 2 的所有智能电表过去 24 小时的记录条数,电流的最大值。

  1. taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now - 24h;
  2. cunt(*) | max(current) |
  3. ==================================
  4. 5 | 13.4 |
  5. Query OK, 1 row(s) in set (0.002136s)

TDengine 仅容许对属于同一个超级表的表之间进行聚合查询,不同超级表之间的聚合查询不支持。在 TAOS SQL 的数据查询 一章,查询类操作都会注明是否支持超级表。

降采样查询、插值

物联网场景里,经常需要通过降采样(down sampling)将采集的数据按时间段进行聚合。TDengine 提供了一个简便的关键词 interval 让按照时间窗口的查询操作变得极为简单。比如,将智能电表 d1001 采集的电流值每 10 秒钟求和

  1. taos> SELECT sum(current) FROM d1001 INTERVAL(10s);
  2. ts | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:00.000 | 10.300000191 |
  5. 2018-10-03 14:38:10.000 | 24.900000572 |
  6. Query OK, 2 row(s) in set (0.000883s)

降采样操作也适用于超级表,比如:将加利福尼亚州所有智能电表采集的电流值每秒钟求和

  1. taos> SELECT SUM(current) FROM meters where location like "California%" INTERVAL(1s);
  2. ts | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:04.000 | 10.199999809 |
  5. 2018-10-03 14:38:05.000 | 32.900000572 |
  6. 2018-10-03 14:38:06.000 | 11.500000000 |
  7. 2018-10-03 14:38:15.000 | 12.600000381 |
  8. 2018-10-03 14:38:16.000 | 36.000000000 |
  9. Query OK, 5 row(s) in set (0.001538s)

降采样操作也支持时间偏移,比如:将所有智能电表采集的电流值每秒钟求和,但要求每个时间窗口从 500 毫秒开始

  1. taos> SELECT SUM(current) FROM meters INTERVAL(1s, 500a);
  2. ts | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:04.500 | 11.189999809 |
  5. 2018-10-03 14:38:05.500 | 31.900000572 |
  6. 2018-10-03 14:38:06.500 | 11.600000000 |
  7. 2018-10-03 14:38:15.500 | 12.300000381 |
  8. 2018-10-03 14:38:16.500 | 35.000000000 |
  9. Query OK, 5 row(s) in set (0.001521s)

物联网场景里,每个数据采集点采集数据的时间是难同步的,但很多分析算法(比如 FFT)需要把采集的数据严格按照时间等间隔的对齐,在很多系统里,需要应用自己写程序来处理,但使用 TDengine 的降采样操作就轻松解决。

如果一个时间间隔里,没有采集的数据,TDengine 还提供插值计算的功能。

语法规则细节请见 TAOS SQL 的按时间窗口切分聚合 章节。

示例代码

查询数据

SQL 写入 一章,我们创建了 power 数据库,并向 meters 表写入了一些数据,以下示例代码展示如何查询这个表的数据。

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  • PHP
  1. package com.taos.example;
  2. import java.sql.*;
  3. public class RestQueryExample {
  4. private static Connection getConnection() throws SQLException {
  5. String jdbcUrl = "jdbc:TAOS-RS://localhost:6041/power?user=root&password=taosdata";
  6. return DriverManager.getConnection(jdbcUrl);
  7. }
  8. private static void printRow(ResultSet rs) throws SQLException {
  9. ResultSetMetaData meta = rs.getMetaData();
  10. for (int i = 1; i <= meta.getColumnCount(); i++) {
  11. String value = rs.getString(i);
  12. System.out.print(value);
  13. System.out.print("\t");
  14. }
  15. System.out.println();
  16. }
  17. private static void printColName(ResultSet rs) throws SQLException {
  18. ResultSetMetaData meta = rs.getMetaData();
  19. for (int i = 1; i <= meta.getColumnCount(); i++) {
  20. String colLabel = meta.getColumnLabel(i);
  21. System.out.print(colLabel);
  22. System.out.print("\t");
  23. }
  24. System.out.println();
  25. }
  26. private static void processResult(ResultSet rs) throws SQLException {
  27. printColName(rs);
  28. while (rs.next()) {
  29. printRow(rs);
  30. }
  31. }
  32. private static void queryData() throws SQLException {
  33. try (Connection conn = getConnection()) {
  34. try (Statement stmt = conn.createStatement()) {
  35. ResultSet rs = stmt.executeQuery("SELECT AVG(voltage) FROM meters GROUP BY location");
  36. processResult(rs);
  37. }
  38. }
  39. }
  40. public static void main(String[] args) throws SQLException {
  41. queryData();
  42. }
  43. }
  44. // possible output:
  45. // avg(voltage) location
  46. // 222.0 California.LosAngeles
  47. // 219.0 California.SanFrancisco

查看源码

通过迭代逐行获取查询结果。

  1. def query_api_demo(conn: taos.TaosConnection):
  2. result: taos.TaosResult = conn.query("SELECT tbname, * FROM meters LIMIT 2")
  3. print("field count:", result.field_count)
  4. print("meta of fields[1]:", result.fields[1])
  5. print("======================Iterate on result=========================")
  6. for row in result:
  7. print(row)
  8. # field count: 7
  9. # meta of fields[1]: {name: ts, type: 9, bytes: 8}
  10. # ======================Iterate on result=========================
  11. # ('d1003', datetime.datetime(2018, 10, 3, 14, 38, 5, 500000), 11.800000190734863, 221, 0.2800000011920929, 'california.losangeles', 2)
  12. # ('d1003', datetime.datetime(2018, 10, 3, 14, 38, 16, 600000), 13.399999618530273, 223, 0.28999999165534973, 'california.losangeles', 2)

查看源码

一次获取所有查询结果,并把每一行转化为一个字典返回。

  1. def fetch_all_demo(conn: taos.TaosConnection):
  2. result: taos.TaosResult = conn.query("SELECT ts, current FROM meters LIMIT 2")
  3. rows = result.fetch_all_into_dict()
  4. print("row count:", result.row_count)
  5. print("===============all data===================")
  6. print(rows)
  7. # row count: 2
  8. # ===============all data===================
  9. # [{'ts': datetime.datetime(2018, 10, 3, 14, 38, 5, 500000), 'current': 11.800000190734863},
  10. # {'ts': datetime.datetime(2018, 10, 3, 14, 38, 16, 600000), 'current': 13.399999618530273}]

查看源码

  1. package main
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "time"
  6. _ "github.com/taosdata/driver-go/v2/taosRestful"
  7. )
  8. func main() {
  9. var taosDSN = "root:taosdata@http(localhost:6041)/power"
  10. taos, err := sql.Open("taosRestful", taosDSN)
  11. if err != nil {
  12. fmt.Println("failed to connect TDengine, err:", err)
  13. return
  14. }
  15. defer taos.Close()
  16. rows, err := taos.Query("SELECT ts, current FROM meters LIMIT 2")
  17. if err != nil {
  18. fmt.Println("failed to select from table, err:", err)
  19. return
  20. }
  21. defer rows.Close()
  22. for rows.Next() {
  23. var r struct {
  24. ts time.Time
  25. current float32
  26. }
  27. err := rows.Scan(&r.ts, &r.current)
  28. if err != nil {
  29. fmt.Println("scan error:\n", err)
  30. return
  31. }
  32. fmt.Println(r.ts, r.current)
  33. }
  34. }

查看源码

  1. use libtaos::*;
  2. fn taos_connect() -> Result<Taos, Error> {
  3. TaosCfgBuilder::default()
  4. .ip("localhost")
  5. .user("root")
  6. .pass("taosdata")
  7. .db("power")
  8. .port(6030u16)
  9. .build()
  10. .expect("TaosCfg builder error")
  11. .connect()
  12. }
  13. #[tokio::main]
  14. async fn main() -> Result<(), Error> {
  15. let taos = taos_connect().expect("connect error");
  16. let result = taos.query("SELECT ts, current FROM meters LIMIT 2").await?;
  17. // print column names
  18. let meta: Vec<ColumnMeta> = result.column_meta;
  19. for column in meta {
  20. print!("{}\t", column.name)
  21. }
  22. println!();
  23. // print rows
  24. let rows: Vec<Vec<Field>> = result.rows;
  25. for row in rows {
  26. for field in row {
  27. print!("{}\t", field);
  28. }
  29. println!();
  30. }
  31. Ok(())
  32. }
  33. // output:
  34. // ts current
  35. // 2022-03-28 09:56:51.249 10.3
  36. // 2022-03-28 09:56:51.749 12.6

查看源码

  1. const taos = require("td2.0-connector");
  2. const conn = taos.connect({ host: "localhost", database: "power" });
  3. const cursor = conn.cursor();
  4. const query = cursor.query("SELECT ts, current FROM meters LIMIT 2");
  5. query.execute().then(function (result) {
  6. result.pretty();
  7. });
  8. // output:
  9. // Successfully connected to TDengine
  10. // Query OK, 2 row(s) in set (0.00317767s)
  11. // ts | current |
  12. // =======================================================
  13. // 2018-10-03 14:38:05.000 | 10.3 |
  14. // 2018-10-03 14:38:15.000 | 12.6 |

查看源码

  1. using TDengineDriver;
  2. using System.Runtime.InteropServices;
  3. namespace TDengineExample
  4. {
  5. internal class QueryExample
  6. {
  7. static void Main()
  8. {
  9. IntPtr conn = GetConnection();
  10. // run query
  11. IntPtr res = TDengine.Query(conn, "SELECT * FROM test.meters LIMIT 2");
  12. if (TDengine.ErrorNo(res) != 0)
  13. {
  14. Console.WriteLine("Failed to query since: " + TDengine.Error(res));
  15. TDengine.Close(conn);
  16. TDengine.Cleanup();
  17. return;
  18. }
  19. // get filed count
  20. int fieldCount = TDengine.FieldCount(res);
  21. Console.WriteLine("fieldCount=" + fieldCount);
  22. // print column names
  23. List<TDengineMeta> metas = TDengine.FetchFields(res);
  24. for (int i = 0; i < metas.Count; i++)
  25. {
  26. Console.Write(metas[i].name + "\t");
  27. }
  28. Console.WriteLine();
  29. // print values
  30. IntPtr row;
  31. while ((row = TDengine.FetchRows(res)) != IntPtr.Zero)
  32. {
  33. List<TDengineMeta> metaList = TDengine.FetchFields(res);
  34. int numOfFiled = TDengine.FieldCount(res);
  35. List<String> dataRaw = new List<string>();
  36. IntPtr colLengthPrt = TDengine.FetchLengths(res);
  37. int[] colLengthArr = new int[numOfFiled];
  38. Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled);
  39. for (int i = 0; i < numOfFiled; i++)
  40. {
  41. TDengineMeta meta = metaList[i];
  42. IntPtr data = Marshal.ReadIntPtr(row, IntPtr.Size * i);
  43. if (data == IntPtr.Zero)
  44. {
  45. Console.Write("NULL\t");
  46. continue;
  47. }
  48. switch ((TDengineDataType)meta.type)
  49. {
  50. case TDengineDataType.TSDB_DATA_TYPE_BOOL:
  51. bool v1 = Marshal.ReadByte(data) == 0 ? false : true;
  52. Console.Write(v1.ToString() + "\t");
  53. break;
  54. case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
  55. sbyte v2 = (sbyte)Marshal.ReadByte(data);
  56. Console.Write(v2.ToString() + "\t");
  57. break;
  58. case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
  59. short v3 = Marshal.ReadInt16(data);
  60. Console.Write(v3.ToString() + "\t");
  61. break;
  62. case TDengineDataType.TSDB_DATA_TYPE_INT:
  63. int v4 = Marshal.ReadInt32(data);
  64. Console.Write(v4.ToString() + "\t");
  65. break;
  66. case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
  67. long v5 = Marshal.ReadInt64(data);
  68. Console.Write(v5.ToString() + "\t");
  69. break;
  70. case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
  71. float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
  72. Console.Write(v6.ToString() + "\t");
  73. break;
  74. case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
  75. double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
  76. Console.Write(v7.ToString() + "\t");
  77. break;
  78. case TDengineDataType.TSDB_DATA_TYPE_BINARY:
  79. string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  80. Console.Write(v8 + "\t");
  81. break;
  82. case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
  83. long v9 = Marshal.ReadInt64(data);
  84. Console.Write(v9.ToString() + "\t");
  85. break;
  86. case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
  87. string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  88. Console.Write(v10 + "\t");
  89. break;
  90. case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
  91. byte v12 = Marshal.ReadByte(data);
  92. Console.Write(v12.ToString() + "\t");
  93. break;
  94. case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
  95. ushort v13 = (ushort)Marshal.ReadInt16(data);
  96. Console.Write(v13.ToString() + "\t");
  97. break;
  98. case TDengineDataType.TSDB_DATA_TYPE_UINT:
  99. uint v14 = (uint)Marshal.ReadInt32(data);
  100. Console.Write(v14.ToString() + "\t");
  101. break;
  102. case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
  103. ulong v15 = (ulong)Marshal.ReadInt64(data);
  104. Console.Write(v15.ToString() + "\t");
  105. break;
  106. case TDengineDataType.TSDB_DATA_TYPE_JSONTAG:
  107. string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  108. Console.Write(v16 + "\t");
  109. break;
  110. default:
  111. Console.Write("nonsupport data type value");
  112. break;
  113. }
  114. }
  115. Console.WriteLine();
  116. }
  117. if (TDengine.ErrorNo(res) != 0)
  118. {
  119. Console.WriteLine($"Query is not complete, Error {TDengine.ErrorNo(res)} {TDengine.Error(res)}");
  120. }
  121. // exit
  122. TDengine.FreeResult(res);
  123. TDengine.Close(conn);
  124. TDengine.Cleanup();
  125. }
  126. static IntPtr GetConnection()
  127. {
  128. string host = "localhost";
  129. short port = 6030;
  130. string username = "root";
  131. string password = "taosdata";
  132. string dbname = "power";
  133. var conn = TDengine.Connect(host, username, password, dbname, port);
  134. if (conn == IntPtr.Zero)
  135. {
  136. Console.WriteLine("Connect to TDengine failed");
  137. System.Environment.Exit(0);
  138. }
  139. else
  140. {
  141. Console.WriteLine("Connect to TDengine success");
  142. }
  143. return conn;
  144. }
  145. }
  146. }
  147. // output:
  148. // Connect to TDengine success
  149. // fieldCount=6
  150. // ts current voltage phase location groupid
  151. // 1648432611249 10.3 219 0.31 California.SanFrancisco 2
  152. // 1648432611749 12.6 218 0.33 California.SanFrancisco 2

查看源码

  1. // compile with:
  2. // gcc -o query_example query_example.c -ltaos
  3. #include <inttypes.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <taos.h>
  8. typedef int16_t VarDataLenT;
  9. #define TSDB_NCHAR_SIZE sizeof(int32_t)
  10. #define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
  11. #define GET_FLOAT_VAL(x) (*(float *)(x))
  12. #define GET_DOUBLE_VAL(x) (*(double *)(x))
  13. #define varDataLen(v) ((VarDataLenT *)(v))[0]
  14. int printRow(char *str, TAOS_ROW row, TAOS_FIELD *fields, int numFields) {
  15. int len = 0;
  16. char split = ' ';
  17. for (int i = 0; i < numFields; ++i) {
  18. if (i > 0) {
  19. str[len++] = split;
  20. }
  21. if (row[i] == NULL) {
  22. len += sprintf(str + len, "%s", "NULL");
  23. continue;
  24. }
  25. switch (fields[i].type) {
  26. case TSDB_DATA_TYPE_TINYINT:
  27. len += sprintf(str + len, "%d", *((int8_t *)row[i]));
  28. break;
  29. case TSDB_DATA_TYPE_UTINYINT:
  30. len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
  31. break;
  32. case TSDB_DATA_TYPE_SMALLINT:
  33. len += sprintf(str + len, "%d", *((int16_t *)row[i]));
  34. break;
  35. case TSDB_DATA_TYPE_USMALLINT:
  36. len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
  37. break;
  38. case TSDB_DATA_TYPE_INT:
  39. len += sprintf(str + len, "%d", *((int32_t *)row[i]));
  40. break;
  41. case TSDB_DATA_TYPE_UINT:
  42. len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
  43. break;
  44. case TSDB_DATA_TYPE_BIGINT:
  45. len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
  46. break;
  47. case TSDB_DATA_TYPE_UBIGINT:
  48. len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
  49. break;
  50. case TSDB_DATA_TYPE_FLOAT: {
  51. float fv = 0;
  52. fv = GET_FLOAT_VAL(row[i]);
  53. len += sprintf(str + len, "%f", fv);
  54. } break;
  55. case TSDB_DATA_TYPE_DOUBLE: {
  56. double dv = 0;
  57. dv = GET_DOUBLE_VAL(row[i]);
  58. len += sprintf(str + len, "%lf", dv);
  59. } break;
  60. case TSDB_DATA_TYPE_BINARY:
  61. case TSDB_DATA_TYPE_NCHAR: {
  62. int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
  63. memcpy(str + len, row[i], charLen);
  64. len += charLen;
  65. } break;
  66. case TSDB_DATA_TYPE_TIMESTAMP:
  67. len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
  68. break;
  69. case TSDB_DATA_TYPE_BOOL:
  70. len += sprintf(str + len, "%d", *((int8_t *)row[i]));
  71. default:
  72. break;
  73. }
  74. }
  75. return len;
  76. }
  77. /**
  78. * @brief print column name and values of each row
  79. *
  80. * @param res
  81. * @return int
  82. */
  83. static int printResult(TAOS_RES *res) {
  84. int numFields = taos_num_fields(res);
  85. TAOS_FIELD *fields = taos_fetch_fields(res);
  86. char header[256] = {0};
  87. int len = 0;
  88. for (int i = 0; i < numFields; ++i) {
  89. len += sprintf(header + len, "%s ", fields[i].name);
  90. }
  91. puts(header);
  92. TAOS_ROW row = NULL;
  93. while ((row = taos_fetch_row(res))) {
  94. char temp[256] = {0};
  95. printRow(temp, row, fields, numFields);
  96. puts(temp);
  97. }
  98. }
  99. int main() {
  100. TAOS *taos = taos_connect("localhost", "root", "taosdata", "power", 6030);
  101. if (taos == NULL) {
  102. puts("failed to connect to server");
  103. exit(EXIT_FAILURE);
  104. }
  105. TAOS_RES *res = taos_query(taos, "SELECT * FROM meters LIMIT 2");
  106. if (taos_errno(res) != 0) {
  107. printf("failed to execute taos_query. error: %s\n", taos_errstr(res));
  108. exit(EXIT_FAILURE);
  109. }
  110. printResult(res);
  111. taos_free_result(res);
  112. taos_close(taos);
  113. taos_cleanup();
  114. }
  115. // output:
  116. // ts current voltage phase location groupid
  117. // 1648432611249 10.300000 219 0.310000 California.SanFrancisco 2
  118. // 1648432611749 12.600000 218 0.330000 California.SanFrancisco 2

查看源码

  1. <?php
  2. use TDengine\Connection;
  3. use TDengine\Exception\TDengineException;
  4. try {
  5. // instantiate
  6. $host = 'localhost';
  7. $port = 6030;
  8. $username = 'root';
  9. $password = 'taosdata';
  10. $dbname = 'power';
  11. $connection = new Connection($host, $port, $username, $password, $dbname);
  12. // connect
  13. $connection->connect();
  14. $resource = $connection->query('SELECT ts, current FROM meters LIMIT 2');
  15. var_dump($resource->fetch());
  16. } catch (TDengineException $e) {
  17. // throw exception
  18. throw $e;
  19. }

查看源码

查询数据 - 图1note
  1. 无论是使用 REST 连接还是原生连接的连接器,以上示例代码都能正常工作。
  2. 唯一需要注意的是:由于 REST 接口无状态, 不能使用 use db 语句来切换数据库。

异步查询

除同步查询 API 之外,TDengine 还提供性能更高的异步调用 API 处理数据插入、查询操作。在软硬件环境相同的情况下,异步 API 处理数据插入的速度比同步 API 快 2-4 倍。异步 API 采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步 API 在网络延迟严重的情况下,优点尤为突出。

需要注意的是,只有使用原生连接的连接器,才能使用异步查询功能。

  • Python
  • C#
  • C
  1. import time
  2. from ctypes import *
  3. from taos import *
  4. def fetch_callback(p_param, p_result, num_of_rows):
  5. print("fetched ", num_of_rows, "rows")
  6. p = cast(p_param, POINTER(Counter))
  7. result = TaosResult(p_result)
  8. if num_of_rows == 0:
  9. print("fetching completed")
  10. p.contents.done = True
  11. result.close()
  12. return
  13. if num_of_rows < 0:
  14. p.contents.done = True
  15. result.check_error(num_of_rows)
  16. result.close()
  17. return None
  18. for row in result.rows_iter(num_of_rows):
  19. print(row)
  20. p.contents.count += result.row_count
  21. result.fetch_rows_a(fetch_callback, p_param)
  22. def query_callback(p_param, p_result, code):
  23. if p_result is None:
  24. return
  25. result = TaosResult(p_result)
  26. if code == 0:
  27. result.fetch_rows_a(fetch_callback, p_param)
  28. result.check_error(code)
  29. class Counter(Structure):
  30. _fields_ = [("count", c_int), ("done", c_bool)]
  31. def __str__(self):
  32. return "{ count: %d, done: %s }" % (self.count, self.done)
  33. def test_query(conn):
  34. counter = Counter(count=0)
  35. conn.query_a("select ts, current, voltage from power.meters", query_callback, byref(counter))
  36. while not counter.done:
  37. print(counter)
  38. time.sleep(1)
  39. print(counter)
  40. conn.close()
  41. if __name__ == "__main__":
  42. test_query(connect())
  43. # possible output:
  44. # { count: 0, done: False }
  45. # fetched 8 rows
  46. # 1538548685000 10.300000 219
  47. # 1538548695000 12.600000 218
  48. # 1538548696800 12.300000 221
  49. # 1538548696650 10.300000 218
  50. # 1538548685500 11.800000 221
  51. # 1538548696600 13.400000 223
  52. # 1538548685500 10.800000 223
  53. # 1538548686500 11.500000 221
  54. # fetched 0 rows
  55. # fetching completed
  56. # { count: 8, done: True }

查看源码

查询数据 - 图2note

这个示例程序,目前在 Windows 系统上还无法运行

  1. using TDengineDriver;
  2. using System.Runtime.InteropServices;
  3. namespace TDengineExample
  4. {
  5. public class AsyncQueryExample
  6. {
  7. static void Main()
  8. {
  9. IntPtr conn = GetConnection();
  10. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  11. TDengine.QueryAsync(conn, "select * from meters", queryAsyncCallback, IntPtr.Zero);
  12. Thread.Sleep(2000);
  13. TDengine.Close(conn);
  14. TDengine.Cleanup();
  15. }
  16. static void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  17. {
  18. if (code == 0 && taosRes != IntPtr.Zero)
  19. {
  20. FetchRowAsyncCallback fetchRowAsyncCallback = new FetchRowAsyncCallback(FetchRowCallback);
  21. TDengine.FetchRowAsync(taosRes, fetchRowAsyncCallback, param);
  22. }
  23. else
  24. {
  25. Console.WriteLine($"async query data failed, failed code {code}");
  26. }
  27. }
  28. static void FetchRowCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  29. {
  30. if (numOfRows > 0)
  31. {
  32. Console.WriteLine($"{numOfRows} rows async retrieved");
  33. DisplayRes(taosRes);
  34. TDengine.FetchRowAsync(taosRes, FetchRowCallback, param);
  35. }
  36. else
  37. {
  38. if (numOfRows == 0)
  39. {
  40. Console.WriteLine("async retrieve complete.");
  41. }
  42. else
  43. {
  44. Console.WriteLine($"FetchRowAsync callback error, error code {numOfRows}");
  45. }
  46. TDengine.FreeResult(taosRes);
  47. }
  48. }
  49. public static void DisplayRes(IntPtr res)
  50. {
  51. if (!IsValidResult(res))
  52. {
  53. TDengine.Cleanup();
  54. System.Environment.Exit(1);
  55. }
  56. List<TDengineMeta> metaList = TDengine.FetchFields(res);
  57. int fieldCount = metaList.Count;
  58. // metaList.ForEach((item) => { Console.Write("{0} ({1}) \t|\t", item.name, item.size); });
  59. List<object> dataList = QueryRes(res, metaList);
  60. for (int index = 0; index < dataList.Count; index++)
  61. {
  62. if (index % fieldCount == 0 && index != 0)
  63. {
  64. Console.WriteLine("");
  65. }
  66. Console.Write("{0} \t|\t", dataList[index].ToString());
  67. }
  68. Console.WriteLine("");
  69. }
  70. public static bool IsValidResult(IntPtr res)
  71. {
  72. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  73. {
  74. if (res != IntPtr.Zero)
  75. {
  76. Console.Write("reason: " + TDengine.Error(res));
  77. return false;
  78. }
  79. Console.WriteLine("");
  80. return false;
  81. }
  82. return true;
  83. }
  84. private static List<object> QueryRes(IntPtr res, List<TDengineMeta> meta)
  85. {
  86. IntPtr taosRow;
  87. List<object> dataRaw = new();
  88. while ((taosRow = TDengine.FetchRows(res)) != IntPtr.Zero)
  89. {
  90. dataRaw.AddRange(FetchRow(taosRow, res));
  91. }
  92. if (TDengine.ErrorNo(res) != 0)
  93. {
  94. Console.Write("Query is not complete, Error {0} {1}", TDengine.ErrorNo(res), TDengine.Error(res));
  95. }
  96. TDengine.FreeResult(res);
  97. Console.WriteLine("");
  98. return dataRaw;
  99. }
  100. public static List<object> FetchRow(IntPtr taosRow, IntPtr taosRes)//, List<TDengineMeta> metaList, int numOfFiled
  101. {
  102. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  103. int numOfFiled = TDengine.FieldCount(taosRes);
  104. List<object> dataRaw = new();
  105. IntPtr colLengthPrt = TDengine.FetchLengths(taosRes);
  106. int[] colLengthArr = new int[numOfFiled];
  107. Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled);
  108. for (int i = 0; i < numOfFiled; i++)
  109. {
  110. TDengineMeta meta = metaList[i];
  111. IntPtr data = Marshal.ReadIntPtr(taosRow, IntPtr.Size * i);
  112. if (data == IntPtr.Zero)
  113. {
  114. dataRaw.Add("NULL");
  115. continue;
  116. }
  117. switch ((TDengineDataType)meta.type)
  118. {
  119. case TDengineDataType.TSDB_DATA_TYPE_BOOL:
  120. bool v1 = Marshal.ReadByte(data) != 0;
  121. dataRaw.Add(v1);
  122. break;
  123. case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
  124. sbyte v2 = (sbyte)Marshal.ReadByte(data);
  125. dataRaw.Add(v2);
  126. break;
  127. case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
  128. short v3 = Marshal.ReadInt16(data);
  129. dataRaw.Add(v3);
  130. break;
  131. case TDengineDataType.TSDB_DATA_TYPE_INT:
  132. int v4 = Marshal.ReadInt32(data);
  133. dataRaw.Add(v4);
  134. break;
  135. case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
  136. long v5 = Marshal.ReadInt64(data);
  137. dataRaw.Add(v5);
  138. break;
  139. case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
  140. float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
  141. dataRaw.Add(v6);
  142. break;
  143. case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
  144. double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
  145. dataRaw.Add(v7);
  146. break;
  147. case TDengineDataType.TSDB_DATA_TYPE_BINARY:
  148. string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  149. dataRaw.Add(v8);
  150. break;
  151. case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
  152. long v9 = Marshal.ReadInt64(data);
  153. dataRaw.Add(v9);
  154. break;
  155. case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
  156. string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  157. dataRaw.Add(v10);
  158. break;
  159. case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
  160. byte v12 = Marshal.ReadByte(data);
  161. dataRaw.Add(v12.ToString());
  162. break;
  163. case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
  164. ushort v13 = (ushort)Marshal.ReadInt16(data);
  165. dataRaw.Add(v13);
  166. break;
  167. case TDengineDataType.TSDB_DATA_TYPE_UINT:
  168. uint v14 = (uint)Marshal.ReadInt32(data);
  169. dataRaw.Add(v14);
  170. break;
  171. case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
  172. ulong v15 = (ulong)Marshal.ReadInt64(data);
  173. dataRaw.Add(v15);
  174. break;
  175. case TDengineDataType.TSDB_DATA_TYPE_JSONTAG:
  176. string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  177. dataRaw.Add(v16);
  178. break;
  179. default:
  180. dataRaw.Add("nonsupport data type");
  181. break;
  182. }
  183. }
  184. return dataRaw;
  185. }
  186. static IntPtr GetConnection()
  187. {
  188. string host = "localhost";
  189. short port = 6030;
  190. string username = "root";
  191. string password = "taosdata";
  192. string dbname = "power";
  193. var conn = TDengine.Connect(host, username, password, dbname, port);
  194. if (conn == IntPtr.Zero)
  195. {
  196. Console.WriteLine("Connect to TDengine failed");
  197. Environment.Exit(0);
  198. }
  199. else
  200. {
  201. Console.WriteLine("Connect to TDengine success");
  202. }
  203. return conn;
  204. }
  205. }
  206. }
  207. //output:
  208. // Connect to TDengine success
  209. // 8 rows async retrieved
  210. // 1538548685500 | 11.8 | 221 | 0.28 | california.losangeles | 2 |
  211. // 1538548696600 | 13.4 | 223 | 0.29 | california.losangeles | 2 |
  212. // 1538548685000 | 10.8 | 223 | 0.29 | california.losangeles | 3 |
  213. // 1538548686500 | 11.5 | 221 | 0.35 | california.losangeles | 3 |
  214. // 1538548685000 | 10.3 | 219 | 0.31 | california.sanfrancisco | 2 |
  215. // 1538548695000 | 12.6 | 218 | 0.33 | california.sanfrancisco | 2 |
  216. // 1538548696800 | 12.3 | 221 | 0.31 | california.sanfrancisco | 2 |
  217. // 1538548696650 | 10.3 | 218 | 0.25 | california.sanfrancisco | 3 |
  218. // async retrieve complete.

查看源码

  1. /**
  2. * @brief call back function of taos_fetch_row_a
  3. *
  4. * @param param : the third parameter you passed to taos_fetch_row_a
  5. * @param res : pointer of TAOS_RES
  6. * @param numOfRow : number of rows fetched in this batch. will be 0 if there is no more data.
  7. * @return void*
  8. */
  9. void *fetch_row_callback(void *param, TAOS_RES *res, int numOfRow) {
  10. printf("numOfRow = %d \n", numOfRow);
  11. int numFields = taos_num_fields(res);
  12. TAOS_FIELD *fields = taos_fetch_fields(res);
  13. TAOS *_taos = (TAOS *)param;
  14. if (numOfRow > 0) {
  15. for (int i = 0; i < numOfRow; ++i) {
  16. TAOS_ROW row = taos_fetch_row(res);
  17. char temp[256] = {0};
  18. printRow(temp, row, fields, numFields);
  19. puts(temp);
  20. }
  21. taos_fetch_rows_a(res, fetch_row_callback, _taos);
  22. } else {
  23. printf("no more data, close the connection.\n");
  24. taos_free_result(res);
  25. taos_close(_taos);
  26. taos_cleanup();
  27. }
  28. }
  29. /**
  30. * @brief callback function of taos_query_a
  31. *
  32. * @param param: the fourth parameter you passed to taos_query_a
  33. * @param res : the result set
  34. * @param code : status code
  35. * @return void*
  36. */
  37. void *select_callback(void *param, TAOS_RES *res, int code) {
  38. printf("query callback ...\n");
  39. TAOS *_taos = (TAOS *)param;
  40. if (code == 0 && res) {
  41. printHeader(res);
  42. taos_fetch_rows_a(res, fetch_row_callback, _taos);
  43. } else {
  44. printf("failed to execute taos_query. error: %s\n", taos_errstr(res));
  45. taos_free_result(res);
  46. taos_close(_taos);
  47. taos_cleanup();
  48. exit(EXIT_FAILURE);
  49. }
  50. }
  51. int main() {
  52. TAOS *taos = taos_connect("localhost", "root", "taosdata", "power", 6030);
  53. if (taos == NULL) {
  54. puts("failed to connect to server");
  55. exit(EXIT_FAILURE);
  56. }
  57. // param one is the connection returned by taos_connect.
  58. // param two is the SQL to execute.
  59. // param three is the callback function.
  60. // param four can be any pointer. It will be passed to your callback function as the first parameter. we use taos
  61. // here, because we want to close it after getting data.
  62. taos_query_a(taos, "SELECT * FROM meters", select_callback, taos);
  63. sleep(1);
  64. }
  65. // output:
  66. // query callback ...
  67. // ts current voltage phase location groupid
  68. // numOfRow = 8
  69. // 1538548685500 11.800000 221 0.280000 california.losangeles 2
  70. // 1538548696600 13.400000 223 0.290000 california.losangeles 2
  71. // 1538548685000 10.800000 223 0.290000 california.losangeles 3
  72. // 1538548686500 11.500000 221 0.350000 california.losangeles 3
  73. // 1538548685000 10.300000 219 0.310000 california.sanfrancisco 2
  74. // 1538548695000 12.600000 218 0.330000 california.sanfrancisco 2
  75. // 1538548696800 12.300000 221 0.310000 california.sanfrancisco 2
  76. // 1538548696650 10.300000 218 0.250000 california.sanfrancisco 3
  77. // numOfRow = 0
  78. // no more data, close the connection.

查看源码