InfluxDB Line Protocol

Introduction

In the InfluxDB Line protocol format, a single line of text is used to represent one row of data. Each line contains 4 parts as shown below.

  1. measurement,tag_set field_set timestamp
  • measurement will be used as the name of the STable
  • tag_set will be used as tags, with format like <tag_key>=<tag_value>,<tag_key>=<tag_value>
  • field_setwill be used as data columns, with format like <field_key>=<field_value>,<field_key>=<field_value>
  • timestamp is the primary key timestamp corresponding to this row of data

For example:

  1. meters,location=California.LoSangeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611249500
InfluxDB Line Protocol - 图1note
  • All the data in tag_set will be converted to nchar type automatically .
  • Each data in field_set must be self-descriptive for its data type. For example 1.2f32 means a value 1.2 of float type. Without the “f” type suffix, it will be treated as type double.
  • Multiple kinds of precision can be used for the timestamp field. Time precision can be from nanosecond (ns) to hour (h).

For more details please refer to InfluxDB Line Protocol and TDengine Schemaless

Examples

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  1. package com.taos.example;
  2. import com.taosdata.jdbc.SchemalessWriter;
  3. import com.taosdata.jdbc.enums.SchemalessProtocolType;
  4. import com.taosdata.jdbc.enums.SchemalessTimestampType;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.SQLException;
  8. import java.sql.Statement;
  9. public class LineProtocolExample {
  10. // format: measurement,tag_set field_set timestamp
  11. private static String[] lines = {
  12. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000", // micro
  13. // seconds
  14. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611249500",
  15. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249300",
  16. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611249800",
  17. };
  18. private static Connection getConnection() throws SQLException {
  19. String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
  20. return DriverManager.getConnection(jdbcUrl);
  21. }
  22. private static void createDatabase(Connection conn) throws SQLException {
  23. try (Statement stmt = conn.createStatement()) {
  24. // the default precision is ms (millisecond), but we use us(microsecond) here.
  25. stmt.execute("CREATE DATABASE IF NOT EXISTS test PRECISION 'us'");
  26. stmt.execute("USE test");
  27. }
  28. }
  29. public static void main(String[] args) throws SQLException {
  30. try (Connection conn = getConnection()) {
  31. createDatabase(conn);
  32. SchemalessWriter writer = new SchemalessWriter(conn);
  33. writer.write(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.MICRO_SECONDS);
  34. }
  35. }
  36. }

view source code

  1. import taos
  2. from taos import SmlProtocol, SmlPrecision
  3. lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000",
  4. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611249500",
  5. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249300",
  6. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611249800",
  7. ]
  8. def get_connection():
  9. # create connection use firstEP in taos.cfg.
  10. return taos.connect()
  11. def create_database(conn):
  12. # the default precision is ms (microsecond), but we use us(microsecond) here.
  13. conn.execute("CREATE DATABASE test precision 'us'")
  14. conn.execute("USE test")
  15. def insert_lines(conn):
  16. affected_rows = conn.schemaless_insert(
  17. lines, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
  18. print(affected_rows) # 8
  19. if __name__ == '__main__':
  20. connection = get_connection()
  21. try:
  22. create_database(connection)
  23. insert_lines(connection)
  24. finally:
  25. connection.close()

view source code

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/taosdata/driver-go/v2/af"
  5. )
  6. func prepareDatabase(conn *af.Connector) {
  7. _, err := conn.Exec("CREATE DATABASE test")
  8. if err != nil {
  9. panic(err)
  10. }
  11. _, err = conn.Exec("USE test")
  12. if err != nil {
  13. panic(err)
  14. }
  15. }
  16. func main() {
  17. conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
  18. if err != nil {
  19. fmt.Println("fail to connect, err:", err)
  20. }
  21. defer conn.Close()
  22. prepareDatabase(conn)
  23. var lines = []string{
  24. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  25. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  26. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  27. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250",
  28. }
  29. err = conn.InfluxDBInsertLines(lines, "ms")
  30. if err != nil {
  31. fmt.Println("insert error:", err)
  32. }
  33. }

view source code

  1. use libtaos::schemaless::*;
  2. use libtaos::*;
  3. fn main() {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.raw_query("CREATE DATABASE test").unwrap();
  6. taos.raw_query("USE test").unwrap();
  7. let lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  8. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  9. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  10. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"];
  11. let affected_rows = taos
  12. .schemaless_insert(
  13. &lines,
  14. TSDB_SML_LINE_PROTOCOL,
  15. TSDB_SML_TIMESTAMP_MILLISECONDS,
  16. )
  17. .unwrap();
  18. println!("affected_rows={}", affected_rows);
  19. }
  20. // run with: cargo run --example influxdb_line_example

view source code

  1. const taos = require("td2.0-connector");
  2. const conn = taos.connect({
  3. host: "localhost",
  4. });
  5. const cursor = conn.cursor();
  6. function createDatabase() {
  7. cursor.execute("CREATE DATABASE test");
  8. cursor.execute("USE test");
  9. }
  10. function insertData() {
  11. const lines = [
  12. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  13. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  14. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  15. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250",
  16. ];
  17. cursor.schemalessInsert(
  18. lines,
  19. taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL,
  20. taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS
  21. );
  22. }
  23. try {
  24. createDatabase();
  25. insertData();
  26. } finally {
  27. cursor.close();
  28. conn.close();
  29. }

view source code

  1. using TDengineDriver;
  2. namespace TDengineExample
  3. {
  4. internal class InfluxDBLineExample
  5. {
  6. static void Main()
  7. {
  8. IntPtr conn = GetConnection();
  9. PrepareDatabase(conn);
  10. string[] lines = {
  11. "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  12. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  13. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  14. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"
  15. };
  16. IntPtr res = TDengine.SchemalessInsert(conn, lines, lines.Length, (int)TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL, (int)TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS);
  17. if (TDengine.ErrorNo(res) != 0)
  18. {
  19. Console.WriteLine("SchemalessInsert failed since " + TDengine.Error(res));
  20. ExitProgram(conn, 1);
  21. }
  22. else
  23. {
  24. int affectedRows = TDengine.AffectRows(res);
  25. Console.WriteLine($"SchemalessInsert success, affected {affectedRows} rows");
  26. }
  27. TDengine.FreeResult(res);
  28. ExitProgram(conn, 0);
  29. }
  30. static IntPtr GetConnection()
  31. {
  32. string host = "localhost";
  33. short port = 6030;
  34. string username = "root";
  35. string password = "taosdata";
  36. string dbname = "";
  37. var conn = TDengine.Connect(host, username, password, dbname, port);
  38. if (conn == IntPtr.Zero)
  39. {
  40. Console.WriteLine("Connect to TDengine failed");
  41. TDengine.Cleanup();
  42. Environment.Exit(1);
  43. }
  44. else
  45. {
  46. Console.WriteLine("Connect to TDengine success");
  47. }
  48. return conn;
  49. }
  50. static void PrepareDatabase(IntPtr conn)
  51. {
  52. IntPtr res = TDengine.Query(conn, "CREATE DATABASE test");
  53. if (TDengine.ErrorNo(res) != 0)
  54. {
  55. Console.WriteLine("failed to create database, reason: " + TDengine.Error(res));
  56. ExitProgram(conn, 1);
  57. }
  58. res = TDengine.Query(conn, "USE test");
  59. if (TDengine.ErrorNo(res) != 0)
  60. {
  61. Console.WriteLine("failed to change database, reason: " + TDengine.Error(res));
  62. ExitProgram(conn, 1);
  63. }
  64. }
  65. static void ExitProgram(IntPtr conn, int exitCode)
  66. {
  67. TDengine.Close(conn);
  68. TDengine.Cleanup();
  69. Environment.Exit(exitCode);
  70. }
  71. }
  72. }

view source code

  1. int main() {
  2. TAOS *taos = taos_connect("localhost", "root", "taosdata", "", 0);
  3. if (taos == NULL) {
  4. printf("failed to connect to server\n");
  5. exit(EXIT_FAILURE);
  6. }
  7. executeSQL(taos, "DROP DATABASE IF EXISTS test");
  8. executeSQL(taos, "CREATE DATABASE test");
  9. executeSQL(taos, "USE test");
  10. char *lines[] = {"meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  11. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  12. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  13. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"};
  14. TAOS_RES *res = taos_schemaless_insert(taos, lines, 4, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
  15. if (taos_errno(res) != 0) {
  16. printf("failed to insert schema-less data, reason: %s\n", taos_errstr(res));
  17. } else {
  18. int affectedRows = taos_affected_rows(res);
  19. printf("successfully inserted %d rows\n", affectedRows);
  20. }
  21. taos_free_result(res);
  22. taos_close(taos);
  23. taos_cleanup();
  24. }
  25. // output:
  26. // successfully inserted 4 rows

view source code