InfluxDB 行协议

协议介绍

InfluxDB Line 协议采用一行字符串来表示一行数据。分为四部分:

  1. measurement,tag_set field_set timestamp
  • measurement 将作为超级表名。它与 tag_set 之间使用一个英文逗号来分隔。
  • tag_set 将作为标签数据,其格式形如 <tag_key>=<tag_value>,<tag_key>=<tag_value>,也即可以使用英文逗号来分隔多个标签数据。它与 field_set 之间使用一个半角空格来分隔。
  • field_set 将作为普通列数据,其格式形如 <field_key>=<field_value>,<field_key>=<field_value>,同样是使用英文逗号来分隔多个普通列的数据。它与 timestamp 之间使用一个半角空格来分隔。
  • timestamp 即本行数据对应的主键时间戳。

例如:

  1. meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611249500
InfluxDB 行协议 - 图1note
  • tag_set 中的所有的数据自动转化为 nchar 数据类型;
  • field_set 中的每个数据项都需要对自身的数据类型进行描述, 比如 1.2f32 代表 float 类型的数值 1.2, 如果不带类型后缀会被当作 double 处理;
  • timestamp 支持多种时间精度。写入数据的时候需要用参数指定时间精度,支持从小时到纳秒的 6 种时间精度。

要了解更多可参考:InfluxDB Line 协议官方文档TDengine 无模式写入参考指南

示例代码

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  1. package com.taos.example;
  2. import com.taosdata.jdbc.SchemalessWriter;
  3. import com.taosdata.jdbc.enums.SchemalessProtocolType;
  4. import com.taosdata.jdbc.enums.SchemalessTimestampType;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.SQLException;
  8. import java.sql.Statement;
  9. public class LineProtocolExample {
  10. // format: measurement,tag_set field_set timestamp
  11. private static String[] lines = {
  12. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000", // micro
  13. // seconds
  14. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611249500",
  15. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249300",
  16. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611249800",
  17. };
  18. private static Connection getConnection() throws SQLException {
  19. String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
  20. return DriverManager.getConnection(jdbcUrl);
  21. }
  22. private static void createDatabase(Connection conn) throws SQLException {
  23. try (Statement stmt = conn.createStatement()) {
  24. // the default precision is ms (millisecond), but we use us(microsecond) here.
  25. stmt.execute("CREATE DATABASE IF NOT EXISTS test PRECISION 'us'");
  26. stmt.execute("USE test");
  27. }
  28. }
  29. public static void main(String[] args) throws SQLException {
  30. try (Connection conn = getConnection()) {
  31. createDatabase(conn);
  32. SchemalessWriter writer = new SchemalessWriter(conn);
  33. writer.write(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.MICRO_SECONDS);
  34. }
  35. }
  36. }

查看源码

  1. import taos
  2. from taos import SmlProtocol, SmlPrecision
  3. lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000",
  4. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611249500",
  5. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249300",
  6. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611249800",
  7. ]
  8. def get_connection():
  9. # create connection use firstEP in taos.cfg.
  10. return taos.connect()
  11. def create_database(conn):
  12. # the default precision is ms (microsecond), but we use us(microsecond) here.
  13. conn.execute("CREATE DATABASE test precision 'us'")
  14. conn.execute("USE test")
  15. def insert_lines(conn):
  16. affected_rows = conn.schemaless_insert(
  17. lines, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
  18. print(affected_rows) # 8
  19. if __name__ == '__main__':
  20. connection = get_connection()
  21. try:
  22. create_database(connection)
  23. insert_lines(connection)
  24. finally:
  25. connection.close()

查看源码

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/taosdata/driver-go/v2/af"
  5. )
  6. func prepareDatabase(conn *af.Connector) {
  7. _, err := conn.Exec("CREATE DATABASE test")
  8. if err != nil {
  9. panic(err)
  10. }
  11. _, err = conn.Exec("USE test")
  12. if err != nil {
  13. panic(err)
  14. }
  15. }
  16. func main() {
  17. conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
  18. if err != nil {
  19. fmt.Println("fail to connect, err:", err)
  20. }
  21. defer conn.Close()
  22. prepareDatabase(conn)
  23. var lines = []string{
  24. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  25. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  26. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  27. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250",
  28. }
  29. err = conn.InfluxDBInsertLines(lines, "ms")
  30. if err != nil {
  31. fmt.Println("insert error:", err)
  32. }
  33. }

查看源码

  1. use libtaos::schemaless::*;
  2. use libtaos::*;
  3. fn main() {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.raw_query("CREATE DATABASE test").unwrap();
  6. taos.raw_query("USE test").unwrap();
  7. let lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  8. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  9. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  10. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"];
  11. let affected_rows = taos
  12. .schemaless_insert(
  13. &lines,
  14. TSDB_SML_LINE_PROTOCOL,
  15. TSDB_SML_TIMESTAMP_MILLISECONDS,
  16. )
  17. .unwrap();
  18. println!("affected_rows={}", affected_rows);
  19. }
  20. // run with: cargo run --example influxdb_line_example

查看源码

  1. const taos = require("td2.0-connector");
  2. const conn = taos.connect({
  3. host: "localhost",
  4. });
  5. const cursor = conn.cursor();
  6. function createDatabase() {
  7. cursor.execute("CREATE DATABASE test");
  8. cursor.execute("USE test");
  9. }
  10. function insertData() {
  11. const lines = [
  12. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  13. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  14. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  15. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250",
  16. ];
  17. cursor.schemalessInsert(
  18. lines,
  19. taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL,
  20. taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS
  21. );
  22. }
  23. try {
  24. createDatabase();
  25. insertData();
  26. } finally {
  27. cursor.close();
  28. conn.close();
  29. }

查看源码

  1. using TDengineDriver;
  2. namespace TDengineExample
  3. {
  4. internal class InfluxDBLineExample
  5. {
  6. static void Main()
  7. {
  8. IntPtr conn = GetConnection();
  9. PrepareDatabase(conn);
  10. string[] lines = {
  11. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  12. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  13. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  14. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"
  15. };
  16. IntPtr res = TDengine.SchemalessInsert(conn, lines, lines.Length, (int)TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL, (int)TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS);
  17. if (TDengine.ErrorNo(res) != 0)
  18. {
  19. Console.WriteLine("SchemalessInsert failed since " + TDengine.Error(res));
  20. ExitProgram(conn, 1);
  21. }
  22. else
  23. {
  24. int affectedRows = TDengine.AffectRows(res);
  25. Console.WriteLine($"SchemalessInsert success, affected {affectedRows} rows");
  26. }
  27. TDengine.FreeResult(res);
  28. ExitProgram(conn, 0);
  29. }
  30. static IntPtr GetConnection()
  31. {
  32. string host = "localhost";
  33. short port = 6030;
  34. string username = "root";
  35. string password = "taosdata";
  36. string dbname = "";
  37. var conn = TDengine.Connect(host, username, password, dbname, port);
  38. if (conn == IntPtr.Zero)
  39. {
  40. Console.WriteLine("Connect to TDengine failed");
  41. TDengine.Cleanup();
  42. Environment.Exit(1);
  43. }
  44. else
  45. {
  46. Console.WriteLine("Connect to TDengine success");
  47. }
  48. return conn;
  49. }
  50. static void PrepareDatabase(IntPtr conn)
  51. {
  52. IntPtr res = TDengine.Query(conn, "CREATE DATABASE test");
  53. if (TDengine.ErrorNo(res) != 0)
  54. {
  55. Console.WriteLine("failed to create database, reason: " + TDengine.Error(res));
  56. ExitProgram(conn, 1);
  57. }
  58. res = TDengine.Query(conn, "USE test");
  59. if (TDengine.ErrorNo(res) != 0)
  60. {
  61. Console.WriteLine("failed to change database, reason: " + TDengine.Error(res));
  62. ExitProgram(conn, 1);
  63. }
  64. }
  65. static void ExitProgram(IntPtr conn, int exitCode)
  66. {
  67. TDengine.Close(conn);
  68. TDengine.Cleanup();
  69. Environment.Exit(exitCode);
  70. }
  71. }
  72. }

查看源码

  1. int main() {
  2. TAOS *taos = taos_connect("localhost", "root", "taosdata", "", 0);
  3. if (taos == NULL) {
  4. printf("failed to connect to server\n");
  5. exit(EXIT_FAILURE);
  6. }
  7. executeSQL(taos, "DROP DATABASE IF EXISTS test");
  8. executeSQL(taos, "CREATE DATABASE test");
  9. executeSQL(taos, "USE test");
  10. char *lines[] = {"meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  11. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  12. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  13. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"};
  14. TAOS_RES *res = taos_schemaless_insert(taos, lines, 4, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
  15. if (taos_errno(res) != 0) {
  16. printf("failed to insert schema-less data, reason: %s\n", taos_errstr(res));
  17. } else {
  18. int affectedRows = taos_affected_rows(res);
  19. printf("successfully inserted %d rows\n", affectedRows);
  20. }
  21. taos_free_result(res);
  22. taos_close(taos);
  23. taos_cleanup();
  24. }
  25. // output:
  26. // successfully inserted 4 rows

查看源码