SQL 写入

SQL 写入简介

应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TDengine CLI,手动输入 INSERT 语句插入数据。

一次写入一条

下面这条 INSERT 就将一条记录写入到表 d1001 中:

  1. INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31);

这里的ts1为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

一次写入多条

TDengine 支持一次写入多条记录,比如下面这条命令就将两条记录写入到表 d1001 中:

  1. INSERT INTO d1001 VALUES (ts1, 10.2, 220, 0.23) (ts2, 10.3, 218, 0.25);

这里的ts1ts2为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

一次写入多表

TDengine 也支持一次向多个表写入数据,比如下面这条命令就向 d1001 写入两条记录,向 d1002 写入一条记录:

  1. INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31) (ts2, 12.6, 218, 0.33) d1002 VALUES (ts3, 12.3, 221, 0.31);

这里的ts1ts2ts3为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

详细的 SQL INSERT 语法规则参考 TDengine SQL 的数据写入

SQL 写入 - 图1info
  • 要提高写入效率,需要批量写入。一般来说一批写入的记录条数越多,插入效率就越高。但一条记录不能超过 48KB,一条 SQL 语句总长度不能超过 1MB。
  • TDengine 支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开多个同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程频繁切换,会带来额外开销,合适的线程数量与服务端的处理能力,服务端的具体配置,数据库的参数,数据定义的 Schema,写入数据的 Batch Size 等很多因素相关。一般来说,服务端和客户端处理能力越强,所能支持的并发写入的线程可以越多;数据库配置时的 vgroups 参数值越多(但仍然要在服务端的处理能力以内)则所能支持的并发写入越多;数据定义的 Schema 越简单,所能支持的并发写入越多。
SQL 写入 - 图2warning
  • 对同一张表,如果新插入记录的时间戳已经存在,则指定了新值的列会用新值覆盖旧值,而没有指定新值的列则不受影响。
  • 写入的数据的时间戳必须大于当前时间减去数据库配置参数 KEEP 的时间。如果 KEEP 配置为 3650 天,那么无法写入比 3650 天还早的数据。写入数据的时间戳也不能大于当前时间加配置参数 DURATION。如果 DURATION 为 2,那么无法写入比当前时间还晚 2 天的数据。

示例程序

普通 SQL 写入

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  • PHP
  1. package com.taos.example;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.SQLException;
  5. import java.sql.Statement;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. public class RestInsertExample {
  9. private static Connection getConnection() throws SQLException {
  10. String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
  11. return DriverManager.getConnection(jdbcUrl);
  12. }
  13. private static List<String> getRawData() {
  14. return Arrays.asList(
  15. "d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2",
  16. "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2",
  17. "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2",
  18. "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3",
  19. "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2",
  20. "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2",
  21. "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3",
  22. "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3"
  23. );
  24. }
  25. /**
  26. * The generated SQL is:
  27. * INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)
  28. * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000)
  29. * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000)
  30. * power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000)
  31. * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000)
  32. * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000)
  33. * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000)
  34. * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000)
  35. */
  36. private static String getSQL() {
  37. StringBuilder sb = new StringBuilder("INSERT INTO ");
  38. for (String line : getRawData()) {
  39. String[] ps = line.split(",");
  40. sb.append("power." + ps[0]).append(" USING power.meters TAGS(")
  41. .append(ps[5]).append(", ") // tag: location
  42. .append(ps[6]) // tag: groupId
  43. .append(") VALUES(")
  44. .append('\'').append(ps[1]).append('\'').append(",") // ts
  45. .append(ps[2]).append(",") // current
  46. .append(ps[3]).append(",") // voltage
  47. .append(ps[4]).append(") "); // phase
  48. }
  49. return sb.toString();
  50. }
  51. public static void insertData() throws SQLException {
  52. try (Connection conn = getConnection()) {
  53. try (Statement stmt = conn.createStatement()) {
  54. stmt.execute("CREATE DATABASE power KEEP 3650");
  55. stmt.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
  56. "TAGS (location BINARY(64), groupId INT)");
  57. String sql = getSQL();
  58. int rowCount = stmt.executeUpdate(sql);
  59. System.out.println("rowCount=" + rowCount); // rowCount=8
  60. }
  61. }
  62. }
  63. public static void main(String[] args) throws SQLException {
  64. insertData();
  65. }
  66. }

查看源码

  1. import taos
  2. lines = ["d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2",
  3. "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3",
  4. "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2",
  5. "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3",
  6. "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3",
  7. "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2",
  8. "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2",
  9. "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2"]
  10. def get_connection() -> taos.TaosConnection:
  11. """
  12. create connection use firstEp in taos.cfg and use default user and password.
  13. """
  14. return taos.connect()
  15. def create_stable(conn: taos.TaosConnection):
  16. conn.execute("CREATE DATABASE power")
  17. conn.execute("USE power")
  18. conn.execute("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
  19. "TAGS (location BINARY(64), groupId INT)")
  20. # The generated SQL is:
  21. # INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  22. # d1002 USING meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  23. # d1003 USING meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  24. # d1004 USING meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
  25. def get_sql():
  26. global lines
  27. lines = map(lambda line: line.split(','), lines) # [['d1001', ...]...]
  28. lines = sorted(lines, key=lambda ls: ls[0]) # sort by table name
  29. sql = "INSERT INTO "
  30. tb_name = None
  31. for ps in lines:
  32. tmp_tb_name = ps[0]
  33. if tb_name != tmp_tb_name:
  34. tb_name = tmp_tb_name
  35. sql += f"{tb_name} USING meters TAGS({ps[5]}, {ps[6]}) VALUES "
  36. sql += f"('{ps[1]}', {ps[2]}, {ps[3]}, {ps[4]}) "
  37. return sql
  38. def insert_data(conn: taos.TaosConnection):
  39. sql = get_sql()
  40. affected_rows = conn.execute(sql)
  41. print("affected_rows", affected_rows) # 8
  42. if __name__ == '__main__':
  43. connection = get_connection()
  44. try:
  45. create_stable(connection)
  46. insert_data(connection)
  47. finally:
  48. connection.close()

查看源码

  1. package main
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "log"
  6. _ "github.com/taosdata/driver-go/v3/taosRestful"
  7. )
  8. func createStable(taos *sql.DB) {
  9. _, err := taos.Exec("CREATE DATABASE power")
  10. if err != nil {
  11. log.Fatalln("failed to create database, err:", err)
  12. }
  13. _, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
  14. if err != nil {
  15. log.Fatalln("failed to create stable, err:", err)
  16. }
  17. }
  18. func insertData(taos *sql.DB) {
  19. sql := `INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  20. power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  21. power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  22. power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
  23. result, err := taos.Exec(sql)
  24. if err != nil {
  25. log.Fatalln("failed to insert, err:", err)
  26. }
  27. rowsAffected, err := result.RowsAffected()
  28. if err != nil {
  29. log.Fatalln("failed to get affected rows, err:", err)
  30. }
  31. fmt.Println("RowsAffected", rowsAffected)
  32. }
  33. func main() {
  34. var taosDSN = "root:taosdata@http(localhost:6041)/"
  35. taos, err := sql.Open("taosRestful", taosDSN)
  36. if err != nil {
  37. log.Fatalln("failed to connect TDengine, err:", err)
  38. }
  39. defer taos.Close()
  40. createStable(taos)
  41. insertData(taos)
  42. }

查看源码

  1. use taos::*;
  2. #[tokio::main]
  3. async fn main() -> anyhow::Result<()> {
  4. let dsn = "ws://";
  5. let taos = TaosBuilder::from_dsn(dsn)?.build()?;
  6. taos.exec_many([
  7. "DROP DATABASE IF EXISTS power",
  8. "CREATE DATABASE power",
  9. "USE power",
  10. "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
  11. ]).await?;
  12. let inserted = taos.exec("INSERT INTO
  13. power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
  14. VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
  15. ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  16. power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
  17. VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  18. power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
  19. VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  20. power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
  21. VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)").await?;
  22. assert_eq!(inserted, 8);
  23. Ok(())
  24. }

查看源码

  1. const taos = require("@tdengine/client");
  2. const conn = taos.connect({
  3. host: "localhost",
  4. });
  5. const cursor = conn.cursor();
  6. try {
  7. cursor.execute("CREATE DATABASE power");
  8. cursor.execute("USE power");
  9. cursor.execute(
  10. "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
  11. );
  12. var sql = `INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  13. power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  14. power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  15. power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`;
  16. cursor.execute(sql,{'quiet':false});
  17. } finally {
  18. cursor.close();
  19. conn.close();
  20. }
  21. // run with: node insert_example.js
  22. // output:
  23. // Successfully connected to TDengine
  24. // Query OK, 0 row(s) affected (0.00509570s)
  25. // Query OK, 0 row(s) affected (0.00130880s)
  26. // Query OK, 0 row(s) affected (0.00467900s)
  27. // Query OK, 8 row(s) affected (0.04043550s)
  28. // Connection is closed

查看源码

  1. using TDengineDriver;
  2. namespace TDengineExample
  3. {
  4. internal class SQLInsertExample
  5. {
  6. static void Main()
  7. {
  8. IntPtr conn = GetConnection();
  9. try
  10. {
  11. IntPtr res = TDengine.Query(conn, "CREATE DATABASE power");
  12. CheckRes(conn, res, "failed to create database");
  13. res = TDengine.Query(conn, "USE power");
  14. CheckRes(conn, res, "failed to change database");
  15. res = TDengine.Query(conn, "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
  16. CheckRes(conn, res, "failed to create stable");
  17. var sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
  18. "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
  19. "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
  20. "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  21. res = TDengine.Query(conn, sql);
  22. CheckRes(conn, res, "failed to insert data");
  23. int affectedRows = TDengine.AffectRows(res);
  24. Console.WriteLine("affectedRows " + affectedRows);
  25. TDengine.FreeResult(res);
  26. }
  27. finally
  28. {
  29. TDengine.Close(conn);
  30. }
  31. }
  32. static IntPtr GetConnection()
  33. {
  34. string host = "localhost";
  35. short port = 6030;
  36. string username = "root";
  37. string password = "taosdata";
  38. string dbname = "";
  39. var conn = TDengine.Connect(host, username, password, dbname, port);
  40. if (conn == IntPtr.Zero)
  41. {
  42. throw new Exception("Connect to TDengine failed");
  43. }
  44. else
  45. {
  46. Console.WriteLine("Connect to TDengine success");
  47. }
  48. return conn;
  49. }
  50. static void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
  51. {
  52. if (TDengine.ErrorNo(res) != 0)
  53. {
  54. throw new Exception($"{errorMsg} since: {TDengine.Error(res)}");
  55. }
  56. }
  57. }
  58. }
  59. // output:
  60. // Connect to TDengine success
  61. // affectedRows 8

查看源码

  1. // compile with
  2. // gcc -o insert_example insert_example.c -ltaos
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include "taos.h"
  6. /**
  7. * @brief execute sql and print affected rows.
  8. *
  9. * @param taos
  10. * @param sql
  11. */
  12. void executeSQL(TAOS *taos, const char *sql) {
  13. TAOS_RES *res = taos_query(taos, sql);
  14. int code = taos_errno(res);
  15. if (code != 0) {
  16. printf("Error code: %d; Message: %s\n", code, taos_errstr(res));
  17. taos_free_result(res);
  18. taos_close(taos);
  19. exit(EXIT_FAILURE);
  20. }
  21. int affectedRows = taos_affected_rows(res);
  22. printf("affected rows %d\n", affectedRows);
  23. taos_free_result(res);
  24. }
  25. int main() {
  26. TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
  27. if (taos == NULL) {
  28. printf("failed to connect to server\n");
  29. exit(EXIT_FAILURE);
  30. }
  31. executeSQL(taos, "CREATE DATABASE power");
  32. executeSQL(taos, "USE power");
  33. executeSQL(taos, "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
  34. executeSQL(taos, "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)"
  35. "d1002 USING meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)"
  36. "d1003 USING meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)"
  37. "d1004 USING meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)");
  38. taos_close(taos);
  39. taos_cleanup();
  40. }
  41. // output:
  42. // affected rows 0
  43. // affected rows 0
  44. // affected rows 0
  45. // affected rows 8

查看源码

  1. <?php
  2. use TDengine\Connection;
  3. use TDengine\Exception\TDengineException;
  4. try {
  5. // instantiate
  6. $host = 'localhost';
  7. $port = 6030;
  8. $username = 'root';
  9. $password = 'taosdata';
  10. $dbname = 'power';
  11. $connection = new Connection($host, $port, $username, $password, $dbname);
  12. // connect
  13. $connection->connect();
  14. // insert
  15. $connection->query('CREATE DATABASE if not exists power');
  16. $connection->query('CREATE STABLE if not exists meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)');
  17. $resource = $connection->query(<<<'SQL'
  18. INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  19. power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  20. power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  21. power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)
  22. SQL);
  23. // get affected rows
  24. var_dump($resource->affectedRows());
  25. } catch (TDengineException $e) {
  26. // throw exception
  27. throw $e;
  28. }

查看源码

SQL 写入 - 图3note
  1. 无论 RESTful 方式建立连接还是本地驱动方式建立连接,以上示例代码都能正常工作。
  2. 唯一需要注意的是:由于 RESTful 接口无状态, 不能使用 USE db; 语句来切换数据库, 所以在上面示例中使用了dbName.tbName指定表名。

参数绑定写入

TDengine 也提供了支持参数绑定的 Prepare API,与 MySQL 类似,这些 API 目前也仅支持用问号 ? 来代表待绑定的参数。在通过参数绑定接口写入数据时,就避免了 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。

需要注意的是,只有使用原生连接的连接器,才能使用参数绑定功能。

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  • PHP
  1. package com.taos.example;
  2. import com.taosdata.jdbc.TSDBPreparedStatement;
  3. import java.sql.Connection;
  4. import java.sql.DriverManager;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. import java.time.LocalDateTime;
  8. import java.time.ZoneOffset;
  9. import java.time.format.DateTimeFormatter;
  10. import java.util.ArrayList;
  11. import java.util.Arrays;
  12. import java.util.List;
  13. public class StmtInsertExample {
  14. private static ArrayList<Long> tsToLongArray(String ts) {
  15. ArrayList<Long> result = new ArrayList<>();
  16. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
  17. LocalDateTime localDateTime = LocalDateTime.parse(ts, formatter);
  18. result.add(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
  19. return result;
  20. }
  21. private static <T> ArrayList<T> toArray(T v) {
  22. ArrayList<T> result = new ArrayList<>();
  23. result.add(v);
  24. return result;
  25. }
  26. private static List<String> getRawData() {
  27. return Arrays.asList(
  28. "d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,California.SanFrancisco,2",
  29. "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,California.SanFrancisco,2",
  30. "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,California.SanFrancisco,2",
  31. "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,California.SanFrancisco,3",
  32. "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,California.LosAngeles,2",
  33. "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,California.LosAngeles,2",
  34. "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,California.LosAngeles,3",
  35. "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,California.LosAngeles,3"
  36. );
  37. }
  38. private static Connection getConnection() throws SQLException {
  39. String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
  40. return DriverManager.getConnection(jdbcUrl);
  41. }
  42. private static void createTable(Connection conn) throws SQLException {
  43. try (Statement stmt = conn.createStatement()) {
  44. stmt.execute("CREATE DATABASE power KEEP 3650");
  45. stmt.executeUpdate("USE power");
  46. stmt.execute("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
  47. "TAGS (location BINARY(64), groupId INT)");
  48. }
  49. }
  50. private static void insertData() throws SQLException {
  51. try (Connection conn = getConnection()) {
  52. createTable(conn);
  53. String psql = "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)";
  54. try (TSDBPreparedStatement pst = (TSDBPreparedStatement) conn.prepareStatement(psql)) {
  55. for (String line : getRawData()) {
  56. String[] ps = line.split(",");
  57. // bind table name and tags
  58. pst.setTableName(ps[0]);
  59. pst.setTagString(0, ps[5]);
  60. pst.setTagInt(1, Integer.valueOf(ps[6]));
  61. // bind values
  62. pst.setTimestamp(0, tsToLongArray(ps[1])); //ps[1] looks like: 2018-10-03 14:38:05.000
  63. pst.setFloat(1, toArray(Float.valueOf(ps[2])));
  64. pst.setInt(2, toArray(Integer.valueOf(ps[3])));
  65. pst.setFloat(3, toArray(Float.valueOf(ps[4])));
  66. pst.columnDataAddBatch();
  67. }
  68. pst.columnDataExecuteBatch();
  69. }
  70. }
  71. }
  72. public static void main(String[] args) throws SQLException {
  73. insertData();
  74. }
  75. }

查看源码

一次绑定一行

  1. import taos
  2. from datetime import datetime
  3. # note: lines have already been sorted by table name
  4. lines = [('d1001', '2018-10-03 14:38:05.000', 10.30000, 219, 0.31000, 'California.SanFrancisco', 2),
  5. ('d1001', '2018-10-03 14:38:15.000', 12.60000, 218, 0.33000, 'California.SanFrancisco', 2),
  6. ('d1001', '2018-10-03 14:38:16.800', 12.30000, 221, 0.31000, 'California.SanFrancisco', 2),
  7. ('d1002', '2018-10-03 14:38:16.650', 10.30000, 218, 0.25000, 'California.SanFrancisco', 3),
  8. ('d1003', '2018-10-03 14:38:05.500', 11.80000, 221, 0.28000, 'California.LosAngeles', 2),
  9. ('d1003', '2018-10-03 14:38:16.600', 13.40000, 223, 0.29000, 'California.LosAngeles', 2),
  10. ('d1004', '2018-10-03 14:38:05.000', 10.80000, 223, 0.29000, 'California.LosAngeles', 3),
  11. ('d1004', '2018-10-03 14:38:06.500', 11.50000, 221, 0.35000, 'California.LosAngeles', 3)]
  12. def get_ts(ts: str):
  13. dt = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S.%f')
  14. return int(dt.timestamp() * 1000)
  15. def create_stable():
  16. conn = taos.connect()
  17. try:
  18. conn.execute("CREATE DATABASE power")
  19. conn.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
  20. "TAGS (location BINARY(64), groupId INT)")
  21. finally:
  22. conn.close()
  23. def bind_row_by_row(stmt: taos.TaosStmt):
  24. tb_name = None
  25. for row in lines:
  26. if tb_name != row[0]:
  27. tb_name = row[0]
  28. tags: taos.TaosBind = taos.new_bind_params(2) # 2 is count of tags
  29. tags[0].binary(row[5]) # location
  30. tags[1].int(row[6]) # groupId
  31. stmt.set_tbname_tags(tb_name, tags)
  32. values: taos.TaosBind = taos.new_bind_params(4) # 4 is count of columns
  33. values[0].timestamp(get_ts(row[1]))
  34. values[1].float(row[2])
  35. values[2].int(row[3])
  36. values[3].float(row[4])
  37. stmt.bind_param(values)
  38. def insert_data():
  39. conn = taos.connect(database="power")
  40. try:
  41. stmt = conn.statement("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")
  42. bind_row_by_row(stmt)
  43. stmt.execute()
  44. stmt.close()
  45. finally:
  46. conn.close()
  47. if __name__ == '__main__':
  48. create_stable()
  49. insert_data()

查看源码

一次绑定多行

  1. table_tags = {
  2. "d1001": ('California.SanFrancisco', 2),
  3. "d1002": ('California.SanFrancisco', 3),
  4. "d1003": ('California.LosAngeles', 2),
  5. "d1004": ('California.LosAngeles', 3)
  6. }
  7. table_values = {
  8. "d1001": [
  9. ['2018-10-03 14:38:05.000', '2018-10-03 14:38:15.000', '2018-10-03 14:38:16.800'],
  10. [10.3, 12.6, 12.3],
  11. [219, 218, 221],
  12. [0.31, 0.33, 0.32]
  13. ],
  14. "d1002": [
  15. ['2018-10-03 14:38:16.650'], [10.3], [218], [0.25]
  16. ],
  17. "d1003": [
  18. ['2018-10-03 14:38:05.500', '2018-10-03 14:38:16.600'],
  19. [11.8, 13.4],
  20. [221, 223],
  21. [0.28, 0.29]
  22. ],
  23. "d1004": [
  24. ['2018-10-03 14:38:05.500', '2018-10-03 14:38:06.500'],
  25. [10.8, 11.5],
  26. [223, 221],
  27. [0.29, 0.35]
  28. ]
  29. }
  30. def bind_multi_rows(stmt: taos.TaosStmt):
  31. """
  32. batch bind example
  33. """
  34. for tb_name in table_values.keys():
  35. tags = table_tags[tb_name]
  36. tag_params = taos.new_bind_params(2)
  37. tag_params[0].binary(tags[0])
  38. tag_params[1].int(tags[1])
  39. stmt.set_tbname_tags(tb_name, tag_params)
  40. values = table_values[tb_name]
  41. value_params = taos.new_multi_binds(4)
  42. value_params[0].timestamp([get_ts(t) for t in values[0]])
  43. value_params[1].float(values[1])
  44. value_params[2].int(values[2])
  45. value_params[3].float(values[3])
  46. stmt.bind_param_batch(value_params)
  47. def insert_data():
  48. conn = taos.connect(database="power")
  49. try:
  50. stmt = conn.statement("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")
  51. bind_multi_rows(stmt)
  52. stmt.execute()
  53. stmt.close()
  54. finally:
  55. conn.close()

查看源码

SQL 写入 - 图4info

一次绑定一行效率不如一次绑定多行,但支持非 INSERT 语句。一次绑定多行效率更高,但仅支持 INSERT 语句。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/taosdata/driver-go/v3/af"
  6. "github.com/taosdata/driver-go/v3/common"
  7. "github.com/taosdata/driver-go/v3/common/param"
  8. )
  9. func checkErr(err error, prompt string) {
  10. if err != nil {
  11. fmt.Printf("%s\n", prompt)
  12. panic(err)
  13. }
  14. }
  15. func prepareStable(conn *af.Connector) {
  16. _, err := conn.Exec("CREATE DATABASE power")
  17. checkErr(err, "failed to create database")
  18. _, err = conn.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
  19. checkErr(err, "failed to create stable")
  20. _, err = conn.Exec("USE power")
  21. checkErr(err, "failed to change database")
  22. }
  23. func main() {
  24. conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
  25. checkErr(err, "fail to connect")
  26. defer conn.Close()
  27. prepareStable(conn)
  28. // create stmt
  29. stmt := conn.InsertStmt()
  30. defer stmt.Close()
  31. err = stmt.Prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")
  32. checkErr(err, "failed to create prepare statement")
  33. // bind table name and tags
  34. tagParams := param.NewParam(2).AddBinary([]byte("California.SanFrancisco")).AddInt(2)
  35. err = stmt.SetTableNameWithTags("d1001", tagParams)
  36. checkErr(err, "failed to execute SetTableNameWithTags")
  37. // specify ColumnType
  38. var bindType *param.ColumnType = param.NewColumnType(4).AddTimestamp().AddFloat().AddInt().AddFloat()
  39. // bind values. note: can only bind one row each time.
  40. valueParams := []*param.Param{
  41. param.NewParam(1).AddTimestamp(time.Unix(1648432611, 249300000), common.PrecisionMilliSecond),
  42. param.NewParam(1).AddFloat(10.3),
  43. param.NewParam(1).AddInt(219),
  44. param.NewParam(1).AddFloat(0.31),
  45. }
  46. err = stmt.BindParam(valueParams, bindType)
  47. checkErr(err, "BindParam error")
  48. err = stmt.AddBatch()
  49. checkErr(err, "AddBatch error")
  50. // bind one more row
  51. valueParams = []*param.Param{
  52. param.NewParam(1).AddTimestamp(time.Unix(1648432611, 749300000), common.PrecisionMilliSecond),
  53. param.NewParam(1).AddFloat(12.6),
  54. param.NewParam(1).AddInt(218),
  55. param.NewParam(1).AddFloat(0.33),
  56. }
  57. err = stmt.BindParam(valueParams, bindType)
  58. checkErr(err, "BindParam error")
  59. err = stmt.AddBatch()
  60. checkErr(err, "AddBatch error")
  61. // execute
  62. err = stmt.Execute()
  63. checkErr(err, "Execute batch error")
  64. }

查看源码

SQL 写入 - 图5tip

driver-go 的模块 github.com/taosdata/driver-go/v3/wrapper 是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。

  1. use taos::*;
  2. #[tokio::main]
  3. async fn main() -> anyhow::Result<()> {
  4. let taos = TaosBuilder::from_dsn("taos://")?.build()?;
  5. taos.create_database("power").await?;
  6. taos.use_database("power").await?;
  7. taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
  8. let mut stmt = Stmt::init(&taos)?;
  9. stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
  10. // bind table name and tags
  11. stmt.set_tbname_tags(
  12. "d1001",
  13. &[
  14. Value::VarChar("California.SanFransico".into()),
  15. Value::Int(2),
  16. ],
  17. )?;
  18. // bind values.
  19. let values = vec![
  20. ColumnView::from_millis_timestamp(vec![1648432611249]),
  21. ColumnView::from_floats(vec![10.3]),
  22. ColumnView::from_ints(vec![219]),
  23. ColumnView::from_floats(vec![0.31]),
  24. ];
  25. stmt.bind(&values)?;
  26. // bind one more row
  27. let values2 = vec![
  28. ColumnView::from_millis_timestamp(vec![1648432611749]),
  29. ColumnView::from_floats(vec![12.6]),
  30. ColumnView::from_ints(vec![218]),
  31. ColumnView::from_floats(vec![0.33]),
  32. ];
  33. stmt.bind(&values2)?;
  34. stmt.add_batch()?;
  35. // execute.
  36. let rows = stmt.execute()?;
  37. assert_eq!(rows, 2);
  38. Ok(())
  39. }

查看源码

一次绑定一行

  1. const taos = require("@tdengine/client");
  2. const conn = taos.connect({
  3. host: "localhost",
  4. });
  5. const cursor = conn.cursor();
  6. function prepareSTable() {
  7. cursor.execute("CREATE DATABASE power");
  8. cursor.execute("USE power");
  9. cursor.execute(
  10. "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
  11. );
  12. }
  13. function insertData() {
  14. // init
  15. cursor.stmtInit();
  16. // prepare
  17. cursor.stmtPrepare(
  18. "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)"
  19. );
  20. // bind table name and tags
  21. let tagBind = new taos.TaosMultiBindArr(2);
  22. tagBind.multiBindBinary(["California.SanFrancisco"]);
  23. tagBind.multiBindInt([2]);
  24. cursor.stmtSetTbnameTags("d1001", tagBind.getMultiBindArr());
  25. // bind values
  26. let rows = [[1648432611249, 1648432611749], [10.3, 12.6], [219, 218], [0.31, 0.33]];
  27. let valueBind = new taos.TaosMultiBindArr(4);
  28. valueBind.multiBindTimestamp(rows[0]);
  29. valueBind.multiBindFloat(rows[1]);
  30. valueBind.multiBindInt(rows[2]);
  31. valueBind.multiBindFloat(rows[3]);
  32. cursor.stmtBindParamBatch(valueBind.getMultiBindArr());
  33. cursor.stmtAddBatch();
  34. // execute
  35. cursor.stmtExecute();
  36. cursor.stmtClose();
  37. }
  38. try {
  39. prepareSTable();
  40. insertData();
  41. } finally {
  42. cursor.close();
  43. conn.close();
  44. }

查看源码

一次绑定多行

  1. function insertData() {
  2. // init
  3. cursor.stmtInit();
  4. // prepare
  5. cursor.stmtPrepare(
  6. "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)"
  7. );
  8. // bind table name and tags
  9. let tagBind = new taos.TaosMultiBindArr(2);
  10. tagBind.multiBindBinary(["California.SanFrancisco"]);
  11. tagBind.multiBindInt([2]);
  12. cursor.stmtSetTbnameTags("d1001", tagBind.getMultiBindArr());
  13. // bind values
  14. let valueBind = new taos.TaosMultiBindArr(4);
  15. valueBind.multiBindTimestamp([1648432611249, 1648432611749]);
  16. valueBind.multiBindFloat([10.3, 12.6]);
  17. valueBind.multiBindInt([219, 218]);
  18. valueBind.multiBindFloat([0.31, 0.33]);
  19. cursor.stmtBindParamBatch(valueBind.getMultiBindArr());
  20. cursor.stmtAddBatch();
  21. // execute
  22. cursor.stmtExecute();
  23. cursor.stmtClose();
  24. }

查看源码

SQL 写入 - 图6info

一次绑定一行效率不如一次绑定多行,但支持非 INSERT 语句。一次绑定多行效率更高,但仅支持 INSERT 语句。

  1. using TDengineDriver;
  2. namespace TDengineExample
  3. {
  4. internal class StmtInsertExample
  5. {
  6. private static IntPtr conn;
  7. private static IntPtr stmt;
  8. static void Main()
  9. {
  10. conn = GetConnection();
  11. try
  12. {
  13. PrepareSTable();
  14. // 1. init and prepare
  15. stmt = TDengine.StmtInit(conn);
  16. if (stmt == IntPtr.Zero)
  17. {
  18. throw new Exception("failed to init stmt.");
  19. }
  20. int res = TDengine.StmtPrepare(stmt, "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)");
  21. CheckStmtRes(res, "failed to prepare stmt");
  22. // 2. bind table name and tags
  23. TAOS_MULTI_BIND[] tags = new TAOS_MULTI_BIND[2] { TaosMultiBind.MultiBindBinary(new string[] { "California.SanFrancisco" }), TaosMultiBind.MultiBindInt(new int?[] { 2 }) };
  24. res = TDengine.StmtSetTbnameTags(stmt, "d1001", tags);
  25. CheckStmtRes(res, "failed to bind table name and tags");
  26. // 3. bind values
  27. TAOS_MULTI_BIND[] values = new TAOS_MULTI_BIND[4] {
  28. TaosMultiBind.MultiBindTimestamp(new long[2] { 1648432611249, 1648432611749}),
  29. TaosMultiBind.MultiBindFloat(new float?[2] { 10.3f, 12.6f}),
  30. TaosMultiBind.MultiBindInt(new int?[2] { 219, 218}),
  31. TaosMultiBind.MultiBindFloat(new float?[2]{ 0.31f, 0.33f})
  32. };
  33. res = TDengine.StmtBindParamBatch(stmt, values);
  34. CheckStmtRes(res, "failed to bind params");
  35. // 4. add batch
  36. res = TDengine.StmtAddBatch(stmt);
  37. CheckStmtRes(res, "failed to add batch");
  38. // 5. execute
  39. res = TDengine.StmtExecute(stmt);
  40. CheckStmtRes(res, "failed to execute");
  41. // 6. free
  42. TaosMultiBind.FreeTaosBind(tags);
  43. TaosMultiBind.FreeTaosBind(values);
  44. }
  45. finally
  46. {
  47. TDengine.Close(conn);
  48. }
  49. }
  50. static IntPtr GetConnection()
  51. {
  52. string host = "localhost";
  53. short port = 6030;
  54. string username = "root";
  55. string password = "taosdata";
  56. string dbname = "";
  57. var conn = TDengine.Connect(host, username, password, dbname, port);
  58. if (conn == IntPtr.Zero)
  59. {
  60. throw new Exception("Connect to TDengine failed");
  61. }
  62. else
  63. {
  64. Console.WriteLine("Connect to TDengine success");
  65. }
  66. return conn;
  67. }
  68. static void PrepareSTable()
  69. {
  70. IntPtr res = TDengine.Query(conn, "CREATE DATABASE power");
  71. CheckResPtr(res, "failed to create database");
  72. res = TDengine.Query(conn, "USE power");
  73. CheckResPtr(res, "failed to change database");
  74. res = TDengine.Query(conn, "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
  75. CheckResPtr(res, "failed to create stable");
  76. }
  77. static void CheckStmtRes(int res, string errorMsg)
  78. {
  79. if (res != 0)
  80. {
  81. Console.WriteLine(errorMsg + ", " + TDengine.StmtErrorStr(stmt));
  82. int code = TDengine.StmtClose(stmt);
  83. if (code != 0)
  84. {
  85. throw new Exception($"failed to close stmt, {code} reason: {TDengine.StmtErrorStr(stmt)} ");
  86. }
  87. }
  88. }
  89. static void CheckResPtr(IntPtr res, string errorMsg)
  90. {
  91. if (TDengine.ErrorNo(res) != 0)
  92. {
  93. throw new Exception(errorMsg + " since:" + TDengine.Error(res));
  94. }
  95. }
  96. }
  97. }

查看源码

一次绑定一行

  1. // compile with
  2. // gcc -o stmt_example stmt_example.c -ltaos
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include "taos.h"
  7. /**
  8. * @brief execute sql only.
  9. *
  10. * @param taos
  11. * @param sql
  12. */
  13. void executeSQL(TAOS *taos, const char *sql) {
  14. TAOS_RES *res = taos_query(taos, sql);
  15. int code = taos_errno(res);
  16. if (code != 0) {
  17. printf("%s\n", taos_errstr(res));
  18. taos_free_result(res);
  19. taos_close(taos);
  20. exit(EXIT_FAILURE);
  21. }
  22. taos_free_result(res);
  23. }
  24. /**
  25. * @brief check return status and exit program when error occur.
  26. *
  27. * @param stmt
  28. * @param code
  29. * @param msg
  30. */
  31. void checkErrorCode(TAOS_STMT *stmt, int code, const char* msg) {
  32. if (code != 0) {
  33. printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
  34. taos_stmt_close(stmt);
  35. exit(EXIT_FAILURE);
  36. }
  37. }
  38. typedef struct {
  39. int64_t ts;
  40. float current;
  41. int voltage;
  42. float phase;
  43. } Row;
  44. /**
  45. * @brief insert data using stmt API
  46. *
  47. * @param taos
  48. */
  49. void insertData(TAOS *taos) {
  50. // init
  51. TAOS_STMT *stmt = taos_stmt_init(taos);
  52. // prepare
  53. const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)";
  54. int code = taos_stmt_prepare(stmt, sql, 0);
  55. checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
  56. // bind table name and tags
  57. TAOS_BIND tags[2];
  58. char* location = "California.SanFrancisco";
  59. int groupId = 2;
  60. tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
  61. tags[0].buffer_length = strlen(location);
  62. tags[0].length = &tags[0].buffer_length;
  63. tags[0].buffer = location;
  64. tags[0].is_null = NULL;
  65. tags[1].buffer_type = TSDB_DATA_TYPE_INT;
  66. tags[1].buffer_length = sizeof(int);
  67. tags[1].length = &tags[1].buffer_length;
  68. tags[1].buffer = &groupId;
  69. tags[1].is_null = NULL;
  70. code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
  71. checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
  72. // insert two rows
  73. Row rows[2] = {
  74. {1648432611249, 10.3, 219, 0.31},
  75. {1648432611749, 12.6, 218, 0.33},
  76. };
  77. TAOS_BIND values[4];
  78. values[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
  79. values[0].buffer_length = sizeof(int64_t);
  80. values[0].length = &values[0].buffer_length;
  81. values[0].is_null = NULL;
  82. values[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
  83. values[1].buffer_length = sizeof(float);
  84. values[1].length = &values[1].buffer_length;
  85. values[1].is_null = NULL;
  86. values[2].buffer_type = TSDB_DATA_TYPE_INT;
  87. values[2].buffer_length = sizeof(int);
  88. values[2].length = &values[2].buffer_length;
  89. values[2].is_null = NULL;
  90. values[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
  91. values[3].buffer_length = sizeof(float);
  92. values[3].length = &values[3].buffer_length;
  93. values[3].is_null = NULL;
  94. for (int i = 0; i < 2; ++i) {
  95. values[0].buffer = &rows[i].ts;
  96. values[1].buffer = &rows[i].current;
  97. values[2].buffer = &rows[i].voltage;
  98. values[3].buffer = &rows[i].phase;
  99. code = taos_stmt_bind_param(stmt, values); // bind param
  100. checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param");
  101. code = taos_stmt_add_batch(stmt); // add batch
  102. checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
  103. }
  104. // execute
  105. code = taos_stmt_execute(stmt);
  106. checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
  107. int affectedRows = taos_stmt_affected_rows(stmt);
  108. printf("successfully inserted %d rows\n", affectedRows);
  109. // close
  110. taos_stmt_close(stmt);
  111. }
  112. int main() {
  113. TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
  114. if (taos == NULL) {
  115. printf("failed to connect to server\n");
  116. exit(EXIT_FAILURE);
  117. }
  118. executeSQL(taos, "CREATE DATABASE power");
  119. executeSQL(taos, "USE power");
  120. executeSQL(taos, "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
  121. insertData(taos);
  122. taos_close(taos);
  123. taos_cleanup();
  124. }
  125. // output:
  126. // successfully inserted 2 rows

查看源码

一次绑定多行

  1. // compile with
  2. // gcc -o multi_bind_example multi_bind_example.c -ltaos
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include "taos.h"
  7. /**
  8. * @brief execute sql only and ignore result set
  9. *
  10. * @param taos
  11. * @param sql
  12. */
  13. void executeSQL(TAOS *taos, const char *sql) {
  14. TAOS_RES *res = taos_query(taos, sql);
  15. int code = taos_errno(res);
  16. if (code != 0) {
  17. printf("%s\n", taos_errstr(res));
  18. taos_free_result(res);
  19. taos_close(taos);
  20. exit(EXIT_FAILURE);
  21. }
  22. taos_free_result(res);
  23. }
  24. /**
  25. * @brief exit program when error occur.
  26. *
  27. * @param stmt
  28. * @param code
  29. * @param msg
  30. */
  31. void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
  32. if (code != 0) {
  33. printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
  34. taos_stmt_close(stmt);
  35. exit(EXIT_FAILURE);
  36. }
  37. }
  38. /**
  39. * @brief insert data using stmt API
  40. *
  41. * @param taos
  42. */
  43. void insertData(TAOS *taos) {
  44. // init
  45. TAOS_STMT *stmt = taos_stmt_init(taos);
  46. // prepare
  47. const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)";
  48. int code = taos_stmt_prepare(stmt, sql, 0);
  49. checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
  50. // bind table name and tags
  51. TAOS_BIND tags[2];
  52. char *location = "California.SanFrancisco";
  53. int groupId = 2;
  54. tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
  55. tags[0].buffer_length = strlen(location);
  56. tags[0].length = &tags[0].buffer_length;
  57. tags[0].buffer = location;
  58. tags[0].is_null = NULL;
  59. tags[1].buffer_type = TSDB_DATA_TYPE_INT;
  60. tags[1].buffer_length = sizeof(int);
  61. tags[1].length = &tags[1].buffer_length;
  62. tags[1].buffer = &groupId;
  63. tags[1].is_null = NULL;
  64. code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
  65. checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
  66. // insert two rows with multi binds
  67. TAOS_MULTI_BIND params[4];
  68. // values to bind
  69. int64_t ts[] = {1648432611249, 1648432611749};
  70. float current[] = {10.3, 12.6};
  71. int voltage[] = {219, 218};
  72. float phase[] = {0.31, 0.33};
  73. // is_null array
  74. char is_null[2] = {0};
  75. // length array
  76. int32_t int64Len[2] = {sizeof(int64_t)};
  77. int32_t floatLen[2] = {sizeof(float)};
  78. int32_t intLen[2] = {sizeof(int)};
  79. params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
  80. params[0].buffer_length = sizeof(int64_t);
  81. params[0].buffer = ts;
  82. params[0].length = int64Len;
  83. params[0].is_null = is_null;
  84. params[0].num = 2;
  85. params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
  86. params[1].buffer_length = sizeof(float);
  87. params[1].buffer = current;
  88. params[1].length = floatLen;
  89. params[1].is_null = is_null;
  90. params[1].num = 2;
  91. params[2].buffer_type = TSDB_DATA_TYPE_INT;
  92. params[2].buffer_length = sizeof(int);
  93. params[2].buffer = voltage;
  94. params[2].length = intLen;
  95. params[2].is_null = is_null;
  96. params[2].num = 2;
  97. params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
  98. params[3].buffer_length = sizeof(float);
  99. params[3].buffer = phase;
  100. params[3].length = floatLen;
  101. params[3].is_null = is_null;
  102. params[3].num = 2;
  103. code = taos_stmt_bind_param_batch(stmt, params); // bind batch
  104. checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");
  105. code = taos_stmt_add_batch(stmt); // add batch
  106. checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
  107. // execute
  108. code = taos_stmt_execute(stmt);
  109. checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
  110. int affectedRows = taos_stmt_affected_rows(stmt);
  111. printf("successfully inserted %d rows\n", affectedRows);
  112. // close
  113. taos_stmt_close(stmt);
  114. }
  115. int main() {
  116. TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
  117. if (taos == NULL) {
  118. printf("failed to connect to server\n");
  119. exit(EXIT_FAILURE);
  120. }
  121. executeSQL(taos, "DROP DATABASE IF EXISTS power");
  122. executeSQL(taos, "CREATE DATABASE power");
  123. executeSQL(taos, "USE power");
  124. executeSQL(taos,
  125. "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), "
  126. "groupId INT)");
  127. insertData(taos);
  128. taos_close(taos);
  129. taos_cleanup();
  130. }
  131. // output:
  132. // successfully inserted 2 rows

查看源码

  1. <?php
  2. use TDengine\Connection;
  3. use TDengine\Exception\TDengineException;
  4. try {
  5. // instantiate
  6. $host = 'localhost';
  7. $port = 6030;
  8. $username = 'root';
  9. $password = 'taosdata';
  10. $dbname = 'power';
  11. $connection = new Connection($host, $port, $username, $password, $dbname);
  12. // connect
  13. $connection->connect();
  14. // insert
  15. $connection->query('CREATE DATABASE if not exists power');
  16. $connection->query('CREATE STABLE if not exists meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)');
  17. $stmt = $connection->prepare('INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)');
  18. // set table name and tags
  19. $stmt->setTableNameTags('d1001', [
  20. // same format as parameter binding
  21. [TDengine\TSDB_DATA_TYPE_BINARY, 'California.SanFrancisco'],
  22. [TDengine\TSDB_DATA_TYPE_INT, 2],
  23. ]);
  24. $stmt->bindParams([
  25. [TDengine\TSDB_DATA_TYPE_TIMESTAMP, 1648432611249],
  26. [TDengine\TSDB_DATA_TYPE_FLOAT, 10.3],
  27. [TDengine\TSDB_DATA_TYPE_INT, 219],
  28. [TDengine\TSDB_DATA_TYPE_FLOAT, 0.31],
  29. ]);
  30. $stmt->bindParams([
  31. [TDengine\TSDB_DATA_TYPE_TIMESTAMP, 1648432611749],
  32. [TDengine\TSDB_DATA_TYPE_FLOAT, 12.6],
  33. [TDengine\TSDB_DATA_TYPE_INT, 218],
  34. [TDengine\TSDB_DATA_TYPE_FLOAT, 0.33],
  35. ]);
  36. $resource = $stmt->execute();
  37. // get affected rows
  38. var_dump($resource->affectedRows());
  39. } catch (TDengineException $e) {
  40. // throw exception
  41. throw $e;
  42. }

查看源码