OpenTSDB JSON 格式协议

协议介绍

OpenTSDB JSON 格式协议采用一个 JSON 字符串表示一行或多行数据。例如:

  1. [
  2. {
  3. "metric": "sys.cpu.nice",
  4. "timestamp": 1346846400,
  5. "value": 18,
  6. "tags": {
  7. "host": "web01",
  8. "dc": "lga"
  9. }
  10. },
  11. {
  12. "metric": "sys.cpu.nice",
  13. "timestamp": 1346846400,
  14. "value": 9,
  15. "tags": {
  16. "host": "web02",
  17. "dc": "lga"
  18. }
  19. }
  20. ]

与 OpenTSDB 行协议类似, metric 将作为超级表名, timestamp 表示时间戳,value 表示度量值, tags 表示标签集。

参考OpenTSDB HTTP API 文档

OpenTSDB JSON 格式协议 - 图1note
  • 对于 JSON 格式协议,TDengine 并不会自动把所有标签转成 nchar 类型, 字符串将将转为 nchar 类型, 数值将同样转换为 double 类型。
  • TDengine 只接收 JSON 数组格式的字符串,即使一行数据也需要转换成数组形式。

示例代码

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  1. package com.taos.example;
  2. import com.taosdata.jdbc.SchemalessWriter;
  3. import com.taosdata.jdbc.enums.SchemalessProtocolType;
  4. import com.taosdata.jdbc.enums.SchemalessTimestampType;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.SQLException;
  8. import java.sql.Statement;
  9. public class JSONProtocolExample {
  10. private static Connection getConnection() throws SQLException {
  11. String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
  12. return DriverManager.getConnection(jdbcUrl);
  13. }
  14. private static void createDatabase(Connection conn) throws SQLException {
  15. try (Statement stmt = conn.createStatement()) {
  16. stmt.execute("CREATE DATABASE IF NOT EXISTS test");
  17. stmt.execute("USE test");
  18. }
  19. }
  20. private static String getJSONData() {
  21. return "[{\"metric\": \"meters.current\", \"timestamp\": 1648432611249, \"value\": 10.3, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
  22. " {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611249, \"value\": 219, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}, " +
  23. "{\"metric\": \"meters.current\", \"timestamp\": 1648432611250, \"value\": 12.6, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
  24. " {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611250, \"value\": 221, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}]";
  25. }
  26. public static void main(String[] args) throws SQLException {
  27. try (Connection conn = getConnection()) {
  28. createDatabase(conn);
  29. SchemalessWriter writer = new SchemalessWriter(conn);
  30. String jsonData = getJSONData();
  31. writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
  32. }
  33. }
  34. }

查看源码

  1. import json
  2. import taos
  3. from taos import SmlProtocol, SmlPrecision
  4. lines = [{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  5. {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219,
  6. "tags": {"location": "California.LosAngeles", "groupid": 1}},
  7. {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6,
  8. "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  9. {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]
  10. def get_connection():
  11. return taos.connect()
  12. def create_database(conn):
  13. conn.execute("CREATE DATABASE test")
  14. conn.execute("USE test")
  15. def insert_lines(conn):
  16. global lines
  17. lines = json.dumps(lines)
  18. # note: the first parameter must be a list with only one element.
  19. affected_rows = conn.schemaless_insert(
  20. [lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
  21. print(affected_rows) # 4
  22. if __name__ == '__main__':
  23. connection = get_connection()
  24. try:
  25. create_database(connection)
  26. insert_lines(connection)
  27. finally:
  28. connection.close()

查看源码

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/taosdata/driver-go/v2/af"
  5. )
  6. func prepareDatabase(conn *af.Connector) {
  7. _, err := conn.Exec("CREATE DATABASE test")
  8. if err != nil {
  9. panic(err)
  10. }
  11. _, err = conn.Exec("USE test")
  12. if err != nil {
  13. panic(err)
  14. }
  15. }
  16. func main() {
  17. conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
  18. if err != nil {
  19. fmt.Println("fail to connect, err:", err)
  20. }
  21. defer conn.Close()
  22. prepareDatabase(conn)
  23. payload := `[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  24. {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}},
  25. {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  26. {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]`
  27. err = conn.OpenTSDBInsertJsonPayload(payload)
  28. if err != nil {
  29. fmt.Println("insert error:", err)
  30. }
  31. }

查看源码

  1. use libtaos::schemaless::*;
  2. use libtaos::*;
  3. fn main() {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.raw_query("CREATE DATABASE test").unwrap();
  6. taos.raw_query("USE test").unwrap();
  7. let lines = [
  8. r#"[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  9. {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}},
  10. {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  11. {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#,
  12. ];
  13. let affected_rows = taos
  14. .schemaless_insert(
  15. &lines,
  16. TSDB_SML_JSON_PROTOCOL,
  17. TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
  18. )
  19. .unwrap();
  20. println!("affected_rows={}", affected_rows); // affected_rows=4
  21. }
  22. // run with: cargo run --example opentsdb_json_example

查看源码

  1. const taos = require("td2.0-connector");
  2. const conn = taos.connect({
  3. host: "localhost",
  4. });
  5. const cursor = conn.cursor();
  6. function createDatabase() {
  7. cursor.execute("CREATE DATABASE test");
  8. cursor.execute("USE test");
  9. }
  10. function insertData() {
  11. const lines = [
  12. {
  13. metric: "meters.current",
  14. timestamp: 1648432611249,
  15. value: 10.3,
  16. tags: { location: "California.SanFrancisco", groupid: 2 },
  17. },
  18. {
  19. metric: "meters.voltage",
  20. timestamp: 1648432611249,
  21. value: 219,
  22. tags: { location: "California.LosAngeles", groupid: 1 },
  23. },
  24. {
  25. metric: "meters.current",
  26. timestamp: 1648432611250,
  27. value: 12.6,
  28. tags: { location: "California.SanFrancisco", groupid: 2 },
  29. },
  30. {
  31. metric: "meters.voltage",
  32. timestamp: 1648432611250,
  33. value: 221,
  34. tags: { location: "California.LosAngeles", groupid: 1 },
  35. },
  36. ];
  37. cursor.schemalessInsert(
  38. [JSON.stringify(lines)],
  39. taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL,
  40. taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_NOT_CONFIGURED
  41. );
  42. }
  43. try {
  44. createDatabase();
  45. insertData();
  46. } finally {
  47. cursor.close();
  48. conn.close();
  49. }

查看源码

  1. using TDengineDriver;
  2. namespace TDengineExample
  3. {
  4. internal class OptsJsonExample
  5. {
  6. static void Main()
  7. {
  8. IntPtr conn = GetConnection();
  9. PrepareDatabase(conn);
  10. string[] lines = { "[{\"metric\": \"meters.current\", \"timestamp\": 1648432611249, \"value\": 10.3, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
  11. " {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611249, \"value\": 219, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}, " +
  12. "{\"metric\": \"meters.current\", \"timestamp\": 1648432611250, \"value\": 12.6, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
  13. " {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611250, \"value\": 221, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}]"
  14. };
  15. IntPtr res = TDengine.SchemalessInsert(conn, lines, 1, (int)TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL, (int)TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
  16. if (TDengine.ErrorNo(res) != 0)
  17. {
  18. Console.WriteLine("SchemalessInsert failed since " + TDengine.Error(res));
  19. ExitProgram(conn, 1);
  20. }
  21. else
  22. {
  23. int affectedRows = TDengine.AffectRows(res);
  24. Console.WriteLine($"SchemalessInsert success, affected {affectedRows} rows");
  25. }
  26. TDengine.FreeResult(res);
  27. ExitProgram(conn, 0);
  28. }
  29. static IntPtr GetConnection()
  30. {
  31. string host = "localhost";
  32. short port = 6030;
  33. string username = "root";
  34. string password = "taosdata";
  35. string dbname = "";
  36. var conn = TDengine.Connect(host, username, password, dbname, port);
  37. if (conn == IntPtr.Zero)
  38. {
  39. Console.WriteLine("Connect to TDengine failed");
  40. TDengine.Cleanup();
  41. Environment.Exit(1);
  42. }
  43. else
  44. {
  45. Console.WriteLine("Connect to TDengine success");
  46. }
  47. return conn;
  48. }
  49. static void PrepareDatabase(IntPtr conn)
  50. {
  51. IntPtr res = TDengine.Query(conn, "CREATE DATABASE test");
  52. if (TDengine.ErrorNo(res) != 0)
  53. {
  54. Console.WriteLine("failed to create database, reason: " + TDengine.Error(res));
  55. ExitProgram(conn, 1);
  56. }
  57. res = TDengine.Query(conn, "USE test");
  58. if (TDengine.ErrorNo(res) != 0)
  59. {
  60. Console.WriteLine("failed to change database, reason: " + TDengine.Error(res));
  61. ExitProgram(conn, 1);
  62. }
  63. }
  64. static void ExitProgram(IntPtr conn, int exitCode)
  65. {
  66. TDengine.Close(conn);
  67. TDengine.Cleanup();
  68. Environment.Exit(exitCode);
  69. }
  70. }
  71. }

查看源码

  1. int main() {
  2. TAOS *taos = taos_connect("localhost", "root", "taosdata", "", 6030);
  3. if (taos == NULL) {
  4. printf("failed to connect to server\n");
  5. exit(EXIT_FAILURE);
  6. }
  7. executeSQL(taos, "DROP DATABASE IF EXISTS test");
  8. executeSQL(taos, "CREATE DATABASE test");
  9. executeSQL(taos, "USE test");
  10. char *line =
  11. "[{\"metric\": \"meters.current\", \"timestamp\": 1648432611249, \"value\": 10.3, \"tags\": {\"location\": "
  12. "\"California.SanFrancisco\", \"groupid\": 2}},{\"metric\": \"meters.voltage\", \"timestamp\": 1648432611249, "
  13. "\"value\": 219, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}},{\"metric\": \"meters.current\", "
  14. "\"timestamp\": 1648432611250, \"value\": 12.6, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": "
  15. "2}},{\"metric\": \"meters.voltage\", \"timestamp\": 1648432611250, \"value\": 221, \"tags\": {\"location\": "
  16. "\"California.LosAngeles\", \"groupid\": 1}}]";
  17. char *lines[] = {line};
  18. TAOS_RES *res = taos_schemaless_insert(taos, lines, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
  19. if (taos_errno(res) != 0) {
  20. printf("failed to insert schema-less data, reason: %s\n", taos_errstr(res));
  21. } else {
  22. int affectedRow = taos_affected_rows(res);
  23. printf("successfully inserted %d rows\n", affectedRow);
  24. }
  25. taos_free_result(res);
  26. taos_close(taos);
  27. taos_cleanup();
  28. }
  29. // output:
  30. // successfully inserted 4 rows

查看源码

以上示例代码会自动创建 2 个超级表, 每个超级表有 2 条数据。

  1. taos> use test;
  2. Database changed.
  3. taos> show stables;
  4. name | created_time | columns | tags | tables |
  5. ============================================================================================
  6. meters.current | 2022-03-29 16:05:25.193 | 2 | 2 | 1 |
  7. meters.voltage | 2022-03-29 16:05:25.200 | 2 | 2 | 1 |
  8. Query OK, 2 row(s) in set (0.001954s)
  9. taos> select * from `meters.current`;
  10. ts | value | groupid | location |
  11. ===================================================================================================================
  12. 2022-03-28 09:56:51.249 | 10.300000000 | 2.000000000 | California.SanFrancisco |
  13. 2022-03-28 09:56:51.250 | 12.600000000 | 2.000000000 | California.SanFrancisco |
  14. Query OK, 2 row(s) in set (0.004076s)