查询数据

主要查询功能

TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或连接器发送 SQL 语句,用户还可以通过 TDengine 命令行工具 taos 手动执行 SQL 即席查询(Ad-Hoc Query)。TDengine 支持如下查询功能:

  • 单列、多列数据查询
  • 标签和数值的多种过滤条件:>, <, =, <>, like 等
  • 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset)
  • 时间窗口(Interval)、会话窗口(Session)和状态窗口(State_window)等窗口切分聚合查询
  • 数值列及聚合结果的四则运算
  • 时间戳对齐的连接查询(Join Query: 隐式连接)操作
  • 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等

例如:在命令行工具 taos 中,从表 d1001 中查询出 voltage > 215 的记录,按时间降序排列,仅仅输出 2 条。

  1. taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
  2. ts | current | voltage | phase |
  3. ======================================================================================
  4. 2018-10-03 14:38:16.800 | 12.30000 | 221 | 0.31000 |
  5. 2018-10-03 14:38:15.000 | 12.60000 | 218 | 0.33000 |
  6. Query OK, 2 row(s) in set (0.001100s)

为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。

具体的查询语法请看 TDengine SQL 的数据查询 章节。

多表聚合查询

物联网场景中,往往同一个类型的数据采集点有多个。TDengine 采用超级表(STable)的概念来描述某一个类型的数据采集点,一张普通的表来描述一个具体的数据采集点。同时 TDengine 使用标签来描述数据采集点的静态属性,一个具体的数据采集点有具体的标签值。通过指定标签的过滤条件,TDengine 提供了一高效的方法将超级表(某一类型的数据采集点)所属的子表进行聚合查询。对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样。

示例一

在 TDengine CLI,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。

  1. taos> SELECT AVG(voltage), location FROM meters GROUP BY location;
  2. avg(voltage) | location |
  3. ===============================================================================================
  4. 219.200000000 | California.SanFrancisco |
  5. 221.666666667 | California.LosAngeles |
  6. Query OK, 2 rows in database (0.005995s)

示例二

在 TDengine CLI, 查找 groupId 为 2 的所有智能电表的记录条数,电流的最大值。

  1. taos> SELECT count(*), max(current) FROM meters where groupId = 2;
  2. cunt(*) | max(current) |
  3. ==================================
  4. 5 | 13.4 |
  5. Query OK, 1 row(s) in set (0.002136s)

TDengine SQL 的数据查询 一章,查询类操作都会注明是否支持超级表。

降采样查询、插值

物联网场景里,经常需要通过降采样(down sampling)将采集的数据按时间段进行聚合。TDengine 提供了一个简便的关键词 interval 让按照时间窗口的查询操作变得极为简单。比如,将智能电表 d1001 采集的电流值每 10 秒钟求和

  1. taos> SELECT _wstart, sum(current) FROM d1001 INTERVAL(10s);
  2. _wstart | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:00.000 | 10.300000191 |
  5. 2018-10-03 14:38:10.000 | 24.900000572 |
  6. Query OK, 2 rows in database (0.003139s)

降采样操作也适用于超级表,比如:将加利福尼亚州所有智能电表采集的电流值每秒钟求和

  1. taos> SELECT _wstart, SUM(current) FROM meters where location like "California%" INTERVAL(1s);
  2. _wstart | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:04.000 | 10.199999809 |
  5. 2018-10-03 14:38:05.000 | 23.699999809 |
  6. 2018-10-03 14:38:06.000 | 11.500000000 |
  7. 2018-10-03 14:38:15.000 | 12.600000381 |
  8. 2018-10-03 14:38:16.000 | 34.400000572 |
  9. Query OK, 5 rows in database (0.007413s)

降采样操作也支持时间偏移,比如:将所有智能电表采集的电流值每秒钟求和,但要求每个时间窗口从 500 毫秒开始

  1. taos> SELECT _wstart, SUM(current) FROM meters INTERVAL(1s, 500a);
  2. _wstart | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:03.500 | 10.199999809 |
  5. 2018-10-03 14:38:04.500 | 10.300000191 |
  6. 2018-10-03 14:38:05.500 | 13.399999619 |
  7. 2018-10-03 14:38:06.500 | 11.500000000 |
  8. 2018-10-03 14:38:14.500 | 12.600000381 |
  9. 2018-10-03 14:38:16.500 | 34.400000572 |
  10. Query OK, 6 rows in database (0.005515s)

物联网场景里,每个数据采集点采集数据的时间是难同步的,但很多分析算法(比如 FFT)需要把采集的数据严格按照时间等间隔的对齐,在很多系统里,需要应用自己写程序来处理,但使用 TDengine 的降采样操作就轻松解决。

如果一个时间间隔里,没有采集的数据,TDengine 还提供插值计算的功能。

语法规则细节请见 TDengine SQL 的按时间窗口切分聚合 章节。

示例代码

查询数据

SQL 写入 一章,我们创建了 power 数据库,并向 meters 表写入了一些数据,以下示例代码展示如何查询这个表的数据。

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  • PHP
  1. package com.taos.example;
  2. import java.sql.*;
  3. public class RestQueryExample {
  4. private static Connection getConnection() throws SQLException {
  5. String jdbcUrl = "jdbc:TAOS-RS://localhost:6041/power?user=root&password=taosdata";
  6. return DriverManager.getConnection(jdbcUrl);
  7. }
  8. private static void printRow(ResultSet rs) throws SQLException {
  9. ResultSetMetaData meta = rs.getMetaData();
  10. for (int i = 1; i <= meta.getColumnCount(); i++) {
  11. String value = rs.getString(i);
  12. System.out.print(value);
  13. System.out.print("\t");
  14. }
  15. System.out.println();
  16. }
  17. private static void printColName(ResultSet rs) throws SQLException {
  18. ResultSetMetaData meta = rs.getMetaData();
  19. for (int i = 1; i <= meta.getColumnCount(); i++) {
  20. String colLabel = meta.getColumnLabel(i);
  21. System.out.print(colLabel);
  22. System.out.print("\t");
  23. }
  24. System.out.println();
  25. }
  26. private static void processResult(ResultSet rs) throws SQLException {
  27. printColName(rs);
  28. while (rs.next()) {
  29. printRow(rs);
  30. }
  31. }
  32. private static void queryData() throws SQLException {
  33. try (Connection conn = getConnection()) {
  34. try (Statement stmt = conn.createStatement()) {
  35. ResultSet rs = stmt.executeQuery("SELECT AVG(voltage) FROM meters GROUP BY location");
  36. processResult(rs);
  37. }
  38. }
  39. }
  40. public static void main(String[] args) throws SQLException {
  41. queryData();
  42. }
  43. }
  44. // possible output:
  45. // avg(voltage) location
  46. // 222.0 California.LosAngeles
  47. // 219.0 California.SanFrancisco

查看源码

通过迭代逐行获取查询结果。

  1. def query_api_demo(conn: taos.TaosConnection):
  2. result: taos.TaosResult = conn.query("SELECT tbname, * FROM meters LIMIT 2")
  3. print("field count:", result.field_count)
  4. print("meta of fields[1]:", result.fields[1])
  5. print("======================Iterate on result=========================")
  6. for row in result:
  7. print(row)
  8. # field count: 7
  9. # meta of fields[1]: {name: ts, type: 9, bytes: 8}
  10. # ======================Iterate on result=========================
  11. # ('d1003', datetime.datetime(2018, 10, 3, 14, 38, 5, 500000), 11.800000190734863, 221, 0.2800000011920929, 'california.losangeles', 2)
  12. # ('d1003', datetime.datetime(2018, 10, 3, 14, 38, 16, 600000), 13.399999618530273, 223, 0.28999999165534973, 'california.losangeles', 2)

查看源码

一次获取所有查询结果,并把每一行转化为一个字典返回。

  1. def fetch_all_demo(conn: taos.TaosConnection):
  2. result: taos.TaosResult = conn.query("SELECT ts, current FROM meters LIMIT 2")
  3. rows = result.fetch_all_into_dict()
  4. print("row count:", result.row_count)
  5. print("===============all data===================")
  6. print(rows)
  7. # row count: 2
  8. # ===============all data===================
  9. # [{'ts': datetime.datetime(2018, 10, 3, 14, 38, 5, 500000), 'current': 11.800000190734863},
  10. # {'ts': datetime.datetime(2018, 10, 3, 14, 38, 16, 600000), 'current': 13.399999618530273}]

查看源码

  1. package main
  2. import (
  3. "database/sql"
  4. "log"
  5. "time"
  6. _ "github.com/taosdata/driver-go/v3/taosRestful"
  7. )
  8. func main() {
  9. var taosDSN = "root:taosdata@http(localhost:6041)/power"
  10. taos, err := sql.Open("taosRestful", taosDSN)
  11. if err != nil {
  12. log.Fatalln("failed to connect TDengine, err:", err)
  13. }
  14. defer taos.Close()
  15. rows, err := taos.Query("SELECT ts, current FROM meters LIMIT 2")
  16. if err != nil {
  17. log.Fatalln("failed to select from table, err:", err)
  18. }
  19. defer rows.Close()
  20. for rows.Next() {
  21. var r struct {
  22. ts time.Time
  23. current float32
  24. }
  25. err := rows.Scan(&r.ts, &r.current)
  26. if err != nil {
  27. log.Fatalln("scan error:\n", err)
  28. return
  29. }
  30. log.Println(r.ts, r.current)
  31. }
  32. }

查看源码

  1. use taos::sync::*;
  2. fn main() -> anyhow::Result<()> {
  3. let taos = TaosBuilder::from_dsn("ws:///power")?.build()?;
  4. let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?;
  5. // print column names
  6. let meta = result.fields();
  7. println!("{}", meta.iter().map(|field| field.name()).join("\t"));
  8. // print rows
  9. let rows = result.rows();
  10. for row in rows {
  11. let row = row?;
  12. for (_name, value) in row {
  13. print!("{}\t", value);
  14. }
  15. println!();
  16. }
  17. Ok(())
  18. }
  19. // output(suppose you are in +8 timezone):
  20. // ts current
  21. // 2018-10-03T14:38:05+08:00 10.3
  22. // 2018-10-03T14:38:15+08:00 12.6

查看源码

  1. const taos = require("@tdengine/client");
  2. const conn = taos.connect({ host: "localhost", database: "power" });
  3. const cursor = conn.cursor();
  4. const query = cursor.query("SELECT ts, current FROM meters LIMIT 2");
  5. query.execute().then(function (result) {
  6. result.pretty();
  7. });
  8. // output:
  9. // Successfully connected to TDengine
  10. // ts | current |
  11. // =======================================================
  12. // 2018-10-03 14:38:05.000 | 10.3 |
  13. // 2018-10-03 14:38:15.000 | 12.6 |

查看源码

  1. using TDengineDriver;
  2. using TDengineDriver.Impl;
  3. using System.Runtime.InteropServices;
  4. namespace TDengineExample
  5. {
  6. internal class QueryExample
  7. {
  8. static void Main()
  9. {
  10. IntPtr conn = GetConnection();
  11. try
  12. {
  13. // run query
  14. IntPtr res = TDengine.Query(conn, "SELECT * FROM meters LIMIT 2");
  15. if (TDengine.ErrorNo(res) != 0)
  16. {
  17. throw new Exception("Failed to query since: " + TDengine.Error(res));
  18. }
  19. // get filed count
  20. int fieldCount = TDengine.FieldCount(res);
  21. Console.WriteLine("fieldCount=" + fieldCount);
  22. // print column names
  23. List<TDengineMeta> metas = LibTaos.GetMeta(res);
  24. for (int i = 0; i < metas.Count; i++)
  25. {
  26. Console.Write(metas[i].name + "\t");
  27. }
  28. Console.WriteLine();
  29. // print values
  30. List<Object> resData = LibTaos.GetData(res);
  31. for (int i = 0; i < resData.Count; i++)
  32. {
  33. Console.Write($"|{resData[i].ToString()} \t");
  34. if (((i + 1) % metas.Count == 0))
  35. {
  36. Console.WriteLine("");
  37. }
  38. }
  39. Console.WriteLine();
  40. // Free result after use
  41. TDengine.FreeResult(res);
  42. }
  43. finally
  44. {
  45. TDengine.Close(conn);
  46. }
  47. }
  48. static IntPtr GetConnection()
  49. {
  50. string host = "localhost";
  51. short port = 6030;
  52. string username = "root";
  53. string password = "taosdata";
  54. string dbname = "power";
  55. var conn = TDengine.Connect(host, username, password, dbname, port);
  56. if (conn == IntPtr.Zero)
  57. {
  58. throw new Exception("Connect to TDengine failed");
  59. }
  60. else
  61. {
  62. Console.WriteLine("Connect to TDengine success");
  63. }
  64. return conn;
  65. }
  66. }
  67. }
  68. // output:
  69. // Connect to TDengine success
  70. // fieldCount=6
  71. // ts current voltage phase location groupid
  72. // 1648432611249 10.3 219 0.31 California.SanFrancisco 2
  73. // 1648432611749 12.6 218 0.33 California.SanFrancisco 2

查看源码

  1. // compile with:
  2. // gcc -o query_example query_example.c -ltaos
  3. #include <inttypes.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <taos.h>
  8. typedef int16_t VarDataLenT;
  9. #define TSDB_NCHAR_SIZE sizeof(int32_t)
  10. #define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
  11. #define GET_FLOAT_VAL(x) (*(float *)(x))
  12. #define GET_DOUBLE_VAL(x) (*(double *)(x))
  13. #define varDataLen(v) ((VarDataLenT *)(v))[0]
  14. int printRow(char *str, TAOS_ROW row, TAOS_FIELD *fields, int numFields) {
  15. int len = 0;
  16. char split = ' ';
  17. for (int i = 0; i < numFields; ++i) {
  18. if (i > 0) {
  19. str[len++] = split;
  20. }
  21. if (row[i] == NULL) {
  22. len += sprintf(str + len, "%s", "NULL");
  23. continue;
  24. }
  25. switch (fields[i].type) {
  26. case TSDB_DATA_TYPE_TINYINT:
  27. len += sprintf(str + len, "%d", *((int8_t *)row[i]));
  28. break;
  29. case TSDB_DATA_TYPE_UTINYINT:
  30. len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
  31. break;
  32. case TSDB_DATA_TYPE_SMALLINT:
  33. len += sprintf(str + len, "%d", *((int16_t *)row[i]));
  34. break;
  35. case TSDB_DATA_TYPE_USMALLINT:
  36. len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
  37. break;
  38. case TSDB_DATA_TYPE_INT:
  39. len += sprintf(str + len, "%d", *((int32_t *)row[i]));
  40. break;
  41. case TSDB_DATA_TYPE_UINT:
  42. len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
  43. break;
  44. case TSDB_DATA_TYPE_BIGINT:
  45. len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
  46. break;
  47. case TSDB_DATA_TYPE_UBIGINT:
  48. len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
  49. break;
  50. case TSDB_DATA_TYPE_FLOAT: {
  51. float fv = 0;
  52. fv = GET_FLOAT_VAL(row[i]);
  53. len += sprintf(str + len, "%f", fv);
  54. } break;
  55. case TSDB_DATA_TYPE_DOUBLE: {
  56. double dv = 0;
  57. dv = GET_DOUBLE_VAL(row[i]);
  58. len += sprintf(str + len, "%lf", dv);
  59. } break;
  60. case TSDB_DATA_TYPE_BINARY:
  61. case TSDB_DATA_TYPE_NCHAR: {
  62. int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
  63. memcpy(str + len, row[i], charLen);
  64. len += charLen;
  65. } break;
  66. case TSDB_DATA_TYPE_TIMESTAMP:
  67. len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
  68. break;
  69. case TSDB_DATA_TYPE_BOOL:
  70. len += sprintf(str + len, "%d", *((int8_t *)row[i]));
  71. default:
  72. break;
  73. }
  74. }
  75. return len;
  76. }
  77. /**
  78. * @brief print column name and values of each row
  79. *
  80. * @param res
  81. * @return int
  82. */
  83. static int printResult(TAOS_RES *res) {
  84. int numFields = taos_num_fields(res);
  85. TAOS_FIELD *fields = taos_fetch_fields(res);
  86. char header[256] = {0};
  87. int len = 0;
  88. for (int i = 0; i < numFields; ++i) {
  89. len += sprintf(header + len, "%s ", fields[i].name);
  90. }
  91. puts(header);
  92. TAOS_ROW row = NULL;
  93. while ((row = taos_fetch_row(res))) {
  94. char temp[256] = {0};
  95. printRow(temp, row, fields, numFields);
  96. puts(temp);
  97. }
  98. }
  99. int main() {
  100. TAOS *taos = taos_connect("localhost", "root", "taosdata", "power", 6030);
  101. if (taos == NULL) {
  102. puts("failed to connect to server");
  103. exit(EXIT_FAILURE);
  104. }
  105. TAOS_RES *res = taos_query(taos, "SELECT * FROM meters LIMIT 2");
  106. if (taos_errno(res) != 0) {
  107. printf("failed to execute taos_query. error: %s\n", taos_errstr(res));
  108. exit(EXIT_FAILURE);
  109. }
  110. printResult(res);
  111. taos_free_result(res);
  112. taos_close(taos);
  113. taos_cleanup();
  114. }
  115. // output:
  116. // ts current voltage phase location groupid
  117. // 1648432611249 10.300000 219 0.310000 California.SanFrancisco 2
  118. // 1648432611749 12.600000 218 0.330000 California.SanFrancisco 2

查看源码

  1. <?php
  2. use TDengine\Connection;
  3. use TDengine\Exception\TDengineException;
  4. try {
  5. // instantiate
  6. $host = 'localhost';
  7. $port = 6030;
  8. $username = 'root';
  9. $password = 'taosdata';
  10. $dbname = 'power';
  11. $connection = new Connection($host, $port, $username, $password, $dbname);
  12. // connect
  13. $connection->connect();
  14. $resource = $connection->query('SELECT ts, current FROM meters LIMIT 2');
  15. var_dump($resource->fetch());
  16. } catch (TDengineException $e) {
  17. // throw exception
  18. throw $e;
  19. }

查看源码

查询数据 - 图1note
  1. 无论是使用 REST 连接还是原生连接的连接器,以上示例代码都能正常工作。
  2. 唯一需要注意的是:由于 REST 接口无状态, 不能使用 use db 语句来切换数据库。

异步查询

除同步查询 API 之外,TDengine 还提供性能更高的异步调用 API 处理数据插入、查询操作。在软硬件环境相同的情况下,异步 API 处理数据插入的速度比同步 API 快 2-4 倍。异步 API 采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步 API 在网络延迟严重的情况下,优点尤为突出。

需要注意的是,只有使用原生连接的连接器,才能使用异步查询功能。

  • Python
  • C#
  • C
  1. import time
  2. from ctypes import *
  3. from taos import *
  4. def fetch_callback(p_param, p_result, num_of_rows):
  5. print("fetched ", num_of_rows, "rows")
  6. p = cast(p_param, POINTER(Counter))
  7. result = TaosResult(p_result)
  8. if num_of_rows == 0:
  9. print("fetching completed")
  10. p.contents.done = True
  11. result.close()
  12. return
  13. if num_of_rows < 0:
  14. p.contents.done = True
  15. result.check_error(num_of_rows)
  16. result.close()
  17. return None
  18. for row in result.rows_iter(num_of_rows):
  19. print(row)
  20. p.contents.count += result.row_count
  21. result.fetch_rows_a(fetch_callback, p_param)
  22. def query_callback(p_param, p_result, code):
  23. if p_result is None:
  24. return
  25. result = TaosResult(p_result)
  26. if code == 0:
  27. result.fetch_rows_a(fetch_callback, p_param)
  28. result.check_error(code)
  29. class Counter(Structure):
  30. _fields_ = [("count", c_int), ("done", c_bool)]
  31. def __str__(self):
  32. return "{ count: %d, done: %s }" % (self.count, self.done)
  33. def test_query(conn):
  34. counter = Counter(count=0)
  35. conn.query_a("select ts, current, voltage from power.meters", query_callback, byref(counter))
  36. while not counter.done:
  37. print(counter)
  38. time.sleep(1)
  39. print(counter)
  40. conn.close()
  41. if __name__ == "__main__":
  42. test_query(connect())
  43. # possible output:
  44. # { count: 0, done: False }
  45. # fetched 8 rows
  46. # 1538548685000 10.300000 219
  47. # 1538548695000 12.600000 218
  48. # 1538548696800 12.300000 221
  49. # 1538548696650 10.300000 218
  50. # 1538548685500 11.800000 221
  51. # 1538548696600 13.400000 223
  52. # 1538548685500 10.800000 223
  53. # 1538548686500 11.500000 221
  54. # fetched 0 rows
  55. # fetching completed
  56. # { count: 8, done: True }

查看源码

查询数据 - 图2note

这个示例程序,目前在 Windows 系统上还无法运行

  1. using System;
  2. using System.Collections.Generic;
  3. using TDengineDriver;
  4. using TDengineDriver.Impl;
  5. using System.Runtime.InteropServices;
  6. namespace TDengineExample
  7. {
  8. public class AsyncQueryExample
  9. {
  10. static void Main()
  11. {
  12. IntPtr conn = GetConnection();
  13. try
  14. {
  15. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  16. TDengine.QueryAsync(conn, "select * from meters", queryAsyncCallback, IntPtr.Zero);
  17. Thread.Sleep(2000);
  18. }
  19. finally
  20. {
  21. TDengine.Close(conn);
  22. }
  23. }
  24. static void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  25. {
  26. if (code == 0 && taosRes != IntPtr.Zero)
  27. {
  28. FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
  29. TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
  30. }
  31. else
  32. {
  33. throw new Exception($"async query data failed,code:{code},reason:{TDengine.Error(taosRes)}");
  34. }
  35. }
  36. // Iteratively call this interface until "numOfRows" is no greater than 0.
  37. static void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  38. {
  39. if (numOfRows > 0)
  40. {
  41. Console.WriteLine($"{numOfRows} rows async retrieved");
  42. IntPtr pdata = TDengine.GetRawBlock(taosRes);
  43. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  44. List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
  45. for (int i = 0; i < dataList.Count; i++)
  46. {
  47. if (i != 0 && (i + 1) % metaList.Count == 0)
  48. {
  49. Console.WriteLine("{0}\t|", dataList[i]);
  50. }
  51. else
  52. {
  53. Console.Write("{0}\t|", dataList[i]);
  54. }
  55. }
  56. Console.WriteLine("");
  57. TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
  58. }
  59. else
  60. {
  61. if (numOfRows == 0)
  62. {
  63. Console.WriteLine("async retrieve complete.");
  64. }
  65. else
  66. {
  67. throw new Exception($"FetchRawBlockCallback callback error, error code {numOfRows}");
  68. }
  69. TDengine.FreeResult(taosRes);
  70. }
  71. }
  72. static IntPtr GetConnection()
  73. {
  74. string host = "localhost";
  75. short port = 6030;
  76. string username = "root";
  77. string password = "taosdata";
  78. string dbname = "power";
  79. var conn = TDengine.Connect(host, username, password, dbname, port);
  80. if (conn == IntPtr.Zero)
  81. {
  82. throw new Exception("Connect to TDengine failed");
  83. }
  84. else
  85. {
  86. Console.WriteLine("Connect to TDengine success");
  87. }
  88. return conn;
  89. }
  90. }
  91. }
  92. // //output:
  93. // // Connect to TDengine success
  94. // // 8 rows async retrieved
  95. // // 1538548685500 | 11.8 | 221 | 0.28 | california.losangeles | 2 |
  96. // // 1538548696600 | 13.4 | 223 | 0.29 | california.losangeles | 2 |
  97. // // 1538548685000 | 10.8 | 223 | 0.29 | california.losangeles | 3 |
  98. // // 1538548686500 | 11.5 | 221 | 0.35 | california.losangeles | 3 |
  99. // // 1538548685000 | 10.3 | 219 | 0.31 | california.sanfrancisco | 2 |
  100. // // 1538548695000 | 12.6 | 218 | 0.33 | california.sanfrancisco | 2 |
  101. // // 1538548696800 | 12.3 | 221 | 0.31 | california.sanfrancisco | 2 |
  102. // // 1538548696650 | 10.3 | 218 | 0.25 | california.sanfrancisco | 3 |
  103. // // async retrieve complete.

查看源码

  1. /**
  2. * @brief call back function of taos_fetch_row_a
  3. *
  4. * @param param : the third parameter you passed to taos_fetch_row_a
  5. * @param res : pointer of TAOS_RES
  6. * @param numOfRow : number of rows fetched in this batch. will be 0 if there is no more data.
  7. * @return void*
  8. */
  9. void *fetch_row_callback(void *param, TAOS_RES *res, int numOfRow) {
  10. printf("numOfRow = %d \n", numOfRow);
  11. int numFields = taos_num_fields(res);
  12. TAOS_FIELD *fields = taos_fetch_fields(res);
  13. TAOS *_taos = (TAOS *)param;
  14. if (numOfRow > 0) {
  15. for (int i = 0; i < numOfRow; ++i) {
  16. TAOS_ROW row = taos_fetch_row(res);
  17. char temp[256] = {0};
  18. printRow(temp, row, fields, numFields);
  19. puts(temp);
  20. }
  21. taos_fetch_rows_a(res, fetch_row_callback, _taos);
  22. } else {
  23. printf("no more data, close the connection.\n");
  24. taos_free_result(res);
  25. taos_close(_taos);
  26. taos_cleanup();
  27. }
  28. }
  29. /**
  30. * @brief callback function of taos_query_a
  31. *
  32. * @param param: the fourth parameter you passed to taos_query_a
  33. * @param res : the result set
  34. * @param code : status code
  35. * @return void*
  36. */
  37. void *select_callback(void *param, TAOS_RES *res, int code) {
  38. printf("query callback ...\n");
  39. TAOS *_taos = (TAOS *)param;
  40. if (code == 0 && res) {
  41. printHeader(res);
  42. taos_fetch_rows_a(res, fetch_row_callback, _taos);
  43. } else {
  44. printf("failed to execute taos_query. error: %s\n", taos_errstr(res));
  45. taos_free_result(res);
  46. taos_close(_taos);
  47. taos_cleanup();
  48. exit(EXIT_FAILURE);
  49. }
  50. }
  51. int main() {
  52. TAOS *taos = taos_connect("localhost", "root", "taosdata", "power", 6030);
  53. if (taos == NULL) {
  54. puts("failed to connect to server");
  55. exit(EXIT_FAILURE);
  56. }
  57. // param one is the connection returned by taos_connect.
  58. // param two is the SQL to execute.
  59. // param three is the callback function.
  60. // param four can be any pointer. It will be passed to your callback function as the first parameter. we use taos
  61. // here, because we want to close it after getting data.
  62. taos_query_a(taos, "SELECT * FROM meters", select_callback, taos);
  63. sleep(1);
  64. }
  65. // output:
  66. // query callback ...
  67. // ts current voltage phase location groupid
  68. // numOfRow = 8
  69. // 1538548685500 11.800000 221 0.280000 california.losangeles 2
  70. // 1538548696600 13.400000 223 0.290000 california.losangeles 2
  71. // 1538548685000 10.800000 223 0.290000 california.losangeles 3
  72. // 1538548686500 11.500000 221 0.350000 california.losangeles 3
  73. // 1538548685000 10.300000 219 0.310000 california.sanfrancisco 2
  74. // 1538548695000 12.600000 218 0.330000 california.sanfrancisco 2
  75. // 1538548696800 12.300000 221 0.310000 california.sanfrancisco 2
  76. // 1538548696650 10.300000 218 0.250000 california.sanfrancisco 3
  77. // numOfRow = 0
  78. // no more data, close the connection.

查看源码