titledescription
Insert data
This page demonstrates how to insert time series data into QuestDB from NodeJS, Java, Go and cURL. The examples show how to use the REST API as well as the InfluxDB integration.

This page shows how to insert data into QuestDB using different programming languages and tools. To ingest data to a running instance, there are three main methods for inserting data:

  • InfluxDB line protocol (ILP) which provides flexibility, ease of use, and high ingestion rates
  • Postgres wire protocol for compatibility with a range of clients or fallback over ILP
  • Rest API which can be used for importing datasets from CSV

:::tip

InfluxDB Line Protocol is the recommended primary ingestion method in QuestDB. To query ingested data, users should utilize the Web Console and REST API on port 9000 or use a Postgres wire client. Methods for querying data are described on the query data documentation page.

:::

Prerequisites

This page assumes that QuestDB is running and accessible. QuestDB can be run using either Docker, the binaries or Homebrew for macOS users.

InfluxDB line protocol

QuestDB implements InfluxDB line protocol which is accessible by default on TCP port 9009. This allows using QuestDB as a drop-in replacement for InfluxDB and other systems implementing the protocol.

This interface is the preferred ingestion method as it provides the following benefits:

  • high-throughput ingestion
  • robust ingestion from multiple sources into tables with dedicated systems for reducing congestion
  • configurable commit-lag for out-of-order data via server configuration settings

For additional details on the message format, see the InfluxDB line protocol guide. Details on ports and authentication can be found on the InfluxDB API reference page, and a guide on the Telegraf agent for collecting and sending metrics to QuestDB via this protocol can be found on the Telegraf guide.

:::info

  • Each line protocol message must be delimited with newline \n characters.

  • The timestamp element of InfluxDB line protocol messages is optional and when omitted, the server will automatically assign the server’s system time as the row’s timestamp value.

:::

  1. const net = require("net")
  2. const client = new net.Socket()
  3. const HOST = "localhost"
  4. const PORT = 9009
  5. function run() {
  6. client.connect(PORT, HOST, () => {
  7. const rows = [
  8. `trades,name=test_ilp1 value=12.4 ${Date.now() * 1e6}`,
  9. `trades,name=test_ilp2 value=11.4 ${Date.now() * 1e6}`,
  10. ]
  11. rows.forEach((row) => {
  12. client.write(`${row}\n`)
  13. })
  14. client.destroy()
  15. })
  16. client.on("data", function (data) {
  17. console.log("Received: " + data)
  18. })
  19. client.on("close", function () {
  20. console.log("Connection closed")
  21. })
  22. }
  23. run()
  1. package main
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net"
  6. "time"
  7. )
  8. func main() {
  9. host := "127.0.0.1:9009"
  10. tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
  11. checkErr(err)
  12. rows := [2]string{
  13. fmt.Sprintf("trades,name=test_ilp1 value=12.4 %d", time.Now().UnixNano()),
  14. fmt.Sprintf("trades,name=test_ilp2 value=11.4 %d", time.Now().UnixNano()),
  15. }
  16. conn, err := net.DialTCP("tcp", nil, tcpAddr)
  17. checkErr(err)
  18. defer conn.Close()
  19. for _, s := range rows {
  20. _, err = conn.Write([]byte(fmt.Sprintf("%s\n", s)))
  21. checkErr(err)
  22. }
  23. result, err := ioutil.ReadAll(conn)
  24. checkErr(err)
  25. fmt.Println(string(result))
  26. }
  27. func checkErr(err error) {
  28. if err != nil {
  29. panic(err)
  30. }
  31. }
  1. import io.questdb.cutlass.line.LineProtoSender;
  2. import io.questdb.cutlass.line.tcp.LineTCPProtoSender;
  3. import io.questdb.network.Net;
  4. import io.questdb.std.Os;
  5. public class LineTCPSenderMain {
  6. /*
  7. Maven:
  8. <dependency>
  9. <groupId>org.questdb</groupId>
  10. <artifactId>questdb</artifactId>
  11. <version>{@version@}</version>
  12. </dependency>
  13. Gradle:
  14. compile group: 'org.questdb', name: 'questdb', version: '{@version@}'
  15. */
  16. public static void main(String[] args) {
  17. String hostIPv4 = "127.0.0.1";
  18. int port = 9009;
  19. int bufferCapacity = 256 * 1024;
  20. try (LineProtoSender sender = new LineTCPProtoSender(Net.parseIPv4(hostIPv4), port, bufferCapacity)) {
  21. sender
  22. .metric("trades")
  23. .tag("name", "test_ilp1")
  24. .field("value", 12.4)
  25. .$(Os.currentTimeNanos());
  26. sender
  27. .metric("trades")
  28. .tag("name", "test_ilp2")
  29. .field("value", 11.4)
  30. .$(Os.currentTimeNanos());
  31. sender.flush();
  32. }
  33. }
  34. }
  1. import time
  2. import socket
  3. HOST = 'localhost'
  4. PORT = 9009
  5. # For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM
  6. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  7. try:
  8. sock.connect((HOST, PORT))
  9. # Single record insert
  10. sock.sendall(('trades,name=client_timestamp value=12.4 %d\n' %(time.time_ns())).encode())
  11. # Omitting the timestamp allows the server to assign one
  12. sock.sendall(('trades,name=server_timestamp value=12.4\n').encode())
  13. # Streams of readings must be newline-delimited
  14. sock.sendall(('trades,name=ilp_stream_1 value=12.4\ntrades,name=ilp_stream_2 value=11.4\n').encode())
  15. except socket.error as e:
  16. print("Got error: %s" % (e))
  17. sock.close()

Postgres compatibility

You can query data using the Postgres endpoint that QuestDB exposes. This is accessible via port 8812 by default. More information on the Postgres wire protocol implementation with details on supported features can be found on the Postgres API reference page.

This example uses the pg package which allows for quickly building queries using Postgres wire protocol. Details on the use of this package can be found on the node-postgres documentation.

This example uses naive Date.now() * 1000 inserts for Timestamp types in microsecond resolution. For accurate microsecond timestamps, the node-microtime package can be used which makes system calls to tv_usec from C++.

  1. const { Client } = require("pg")
  2. const start = async () => {
  3. try {
  4. const client = new Client({
  5. database: "qdb",
  6. host: "127.0.0.1",
  7. password: "quest",
  8. port: 8812,
  9. user: "admin",
  10. })
  11. await client.connect()
  12. const createTable = await client.query(
  13. "CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);",
  14. )
  15. console.log(createTable)
  16. const insertData = await client.query(
  17. "INSERT INTO trades VALUES($1, $2, $3, $4);",
  18. [Date.now() * 1000, Date.now(), "node pg example", 123],
  19. )
  20. await client.query("COMMIT")
  21. console.log(insertData)
  22. for (let rows = 0; rows < 10; rows++) {
  23. // Providing a 'name' field allows for prepared statements / bind variables
  24. const query = {
  25. name: "insert-values",
  26. text: "INSERT INTO trades VALUES($1, $2, $3, $4);",
  27. values: [Date.now() * 1000, Date.now(), "node pg prep statement", rows],
  28. }
  29. const preparedStatement = await client.query(query)
  30. }
  31. await client.query("COMMIT")
  32. const readAll = await client.query("SELECT * FROM trades")
  33. console.log(readAll.rows)
  34. await client.end()
  35. } catch (e) {
  36. console.log(e)
  37. }
  38. }
  39. start()

This example uses the pgx driver and toolkit for postgres in Go. More details on the use of this toolkit can be found on the GitHub repository for pgx.

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/jackc/pgx/v4"
  8. )
  9. var conn *pgx.Conn
  10. var err error
  11. func main() {
  12. ctx := context.Background()
  13. conn, _ = pgx.Connect(ctx, "postgresql://admin:quest@localhost:8812/qdb")
  14. defer conn.Close(ctx)
  15. // text-based query
  16. _, err := conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")
  17. if err != nil {
  18. log.Fatalln(err)
  19. }
  20. // Prepared statement given the name 'ps1'
  21. _, err = conn.Prepare(ctx, "ps1", "INSERT INTO trades VALUES($1,$2,$3,$4)")
  22. if err != nil {
  23. log.Fatalln(err)
  24. }
  25. for i := 0; i < 10; i++ {
  26. // Execute 'ps1' statement with a string and the loop iterator value
  27. _, err = conn.Exec(ctx, "ps1", time.Now(), time.Now().Round(time.Millisecond), "go prepared statement", i+1)
  28. if err != nil {
  29. log.Fatalln(err)
  30. }
  31. }
  32. // Read all rows from table
  33. rows, err := conn.Query(ctx, "SELECT * FROM trades")
  34. fmt.Println("Reading from trades table:")
  35. for rows.Next() {
  36. var name string
  37. var value int64
  38. var ts time.Time
  39. var date time.Time
  40. err = rows.Scan(&ts, &date, &name, &value)
  41. fmt.Println(ts, date, name, value)
  42. }
  43. err = conn.Close(ctx)
  44. }

The following example shows how to use parameterized queries and prepared statements using the rust-postgres client.

  1. use postgres::{Client, NoTls, Error};
  2. use chrono::{Utc};
  3. use std::time::SystemTime;
  4. fn main() -> Result<(), Error> {
  5. let mut client = Client::connect("postgresql://admin:quest@localhost:8812/qdb", NoTls)?;
  6. // Basic query
  7. client.batch_execute("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")?;
  8. // Parameterized query
  9. let name: &str = "rust example";
  10. let val: i32 = 123;
  11. let utc = Utc::now();
  12. let sys_time = SystemTime::now();
  13. client.execute(
  14. "INSERT INTO trades VALUES($1,$2,$3,$4)",
  15. &[&utc.naive_local(), &sys_time, &name, &val],
  16. )?;
  17. // Prepared statement
  18. let mut txn = client.transaction()?;
  19. let statement = txn.prepare("insert into trades values ($1,$2,$3,$4)")?;
  20. for value in 0..10 {
  21. let utc = Utc::now();
  22. let sys_time = SystemTime::now();
  23. txn.execute(&statement, &[&utc.naive_local(), &sys_time, &name, &value])?;
  24. }
  25. txn.commit()?;
  26. println!("import finished");
  27. Ok(())
  28. }
  1. package com.myco;
  2. import java.sql.*;
  3. import java.util.Properties;
  4. class App {
  5. public static void main(String[] args) throws SQLException {
  6. Properties properties = new Properties();
  7. properties.setProperty("user", "admin");
  8. properties.setProperty("password", "quest");
  9. properties.setProperty("sslmode", "disable");
  10. final Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:8812/qdb", properties);
  11. connection.setAutoCommit(false);
  12. final PreparedStatement statement = connection.prepareStatement("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);");
  13. statement.execute();
  14. try (PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO TRADES VALUES (?, ?, ?, ?)")) {
  15. preparedStatement.setTimestamp(1, new Timestamp(io.questdb.std.Os.currentTimeMicros()));
  16. preparedStatement.setDate(2, new Date(System.currentTimeMillis()));
  17. preparedStatement.setString(3, "abc");
  18. preparedStatement.setInt(4, 123);
  19. preparedStatement.execute();
  20. }
  21. System.out.println("Done");
  22. connection.close();
  23. }
  24. }

This example uses the psychopg2 database adapter which does not support prepared statements (bind variables). This functionality is on the roadmap for the antecedent psychopg3 adapter.

  1. import psycopg2 as pg
  2. import datetime as dt
  3. try:
  4. connection = pg.connect(user="admin",
  5. password="quest",
  6. host="127.0.0.1",
  7. port="8812",
  8. database="qdb")
  9. cursor = connection.cursor()
  10. # text-only query
  11. cursor.execute("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")
  12. # insert 10 records
  13. for x in range(10):
  14. now = dt.datetime.utcnow()
  15. date = dt.datetime.now().date()
  16. cursor.execute("""
  17. INSERT INTO trades
  18. VALUES (%s, %s, %s, %s);
  19. """, (now, date, "python example", x))
  20. # commit records
  21. connection.commit()
  22. cursor.execute("SELECT * FROM trades;")
  23. records = cursor.fetchall()
  24. for row in records:
  25. print(row)
  26. finally:
  27. if (connection):
  28. cursor.close()
  29. connection.close()
  30. print("Postgres connection is closed")

REST API

QuestDB exposes a REST API for compatibility with a wide range of libraries and tools. The REST API is accessible on port 9000 and has the following entrypoints:

  • /imp - import data
  • /exec - execute an SQL statement

More details on the use of these entrypoints can be found on the REST API reference page.

/imp endpoint

The /imp endpoint allows for importing a CSV file directly.

This example imports a CSV file with automatic schema detection.

  1. curl -F data=@data.csv http://localhost:9000/imp

This example overwrites an existing table, specifies a timestamp format and a designated timestamp column. For more information on the optional parameters for specifying timestamp formats, partitioning and renaming tables, see the REST API documentation.

  1. curl \
  2. -F schema='[{"name":"ts", "type": "TIMESTAMP", "pattern": "yyyy-MM-dd - HH:mm:ss"}]' \
  3. -F data=@weather.csv 'http://localhost:9000/imp?overwrite=true&timestamp=ts'
  1. const fetch = require("node-fetch")
  2. const FormData = require("form-data")
  3. const fs = require("fs")
  4. const qs = require("querystring")
  5. const HOST = "http://localhost:9000"
  6. async function run() {
  7. const form = new FormData()
  8. form.append("data", fs.readFileSync(__dirname + "/data.csv"), {
  9. filename: fileMetadata.name,
  10. contentType: "application/octet-stream",
  11. })
  12. try {
  13. const r = await fetch(`${HOST}/imp`, {
  14. method: "POST",
  15. body: form,
  16. headers: form.getHeaders(),
  17. })
  18. console.log(r)
  19. } catch (e) {
  20. console.error(e)
  21. }
  22. }
  23. run()
  1. import requests
  2. csv = {'data': ('my_table', open('./data.csv', 'r'))}
  3. host = 'http://localhost:9000'
  4. try:
  5. response = requests.post(host + '/imp', files=csv)
  6. print(response.text)
  7. except requests.exceptions.RequestException as e:
  8. print("Error: %s" % (e))
  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "mime/multipart"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. )
  13. func main() {
  14. u, err := url.Parse("http://localhost:9000")
  15. checkErr(err)
  16. u.Path += "imp"
  17. url := fmt.Sprintf("%v", u)
  18. fileName := "/path/to/data.csv"
  19. file, err := os.Open(fileName)
  20. checkErr(err)
  21. defer file.Close()
  22. buf := new(bytes.Buffer)
  23. writer := multipart.NewWriter(buf)
  24. uploadFile, _ := writer.CreateFormFile("data", "data.csv")
  25. _, err = io.Copy(uploadFile, file)
  26. checkErr(err)
  27. writer.Close()
  28. req, err := http.NewRequest(http.MethodPut, url, buf)
  29. checkErr(err)
  30. req.Header.Add("Content-Type", writer.FormDataContentType())
  31. client := &http.Client{}
  32. res, err := client.Do(req)
  33. checkErr(err)
  34. defer res.Body.Close()
  35. body, err := ioutil.ReadAll(res.Body)
  36. checkErr(err)
  37. log.Println(string(body))
  38. }
  39. func checkErr(err error) {
  40. if err != nil {
  41. panic(err)
  42. }
  43. }

/exec endpoint

Alternatively, the /exec endpoint can be used to create a table and the INSERT statement can be used to populate it with values:

  1. # Create Table
  2. curl -G \
  3. --data-urlencode "query=CREATE TABLE IF NOT EXISTS trades(name STRING, value INT)" \
  4. http://localhost:9000/exec
  5. # Insert a row
  6. curl -G \
  7. --data-urlencode "query=INSERT INTO trades VALUES('abc', 123456);" \
  8. http://localhost:9000/exec

Note that these two queries can be combined into a single curl request:

  1. curl -G \
  2. --data-urlencode "query=CREATE TABLE IF NOT EXISTS trades(name STRING, value INT);\
  3. INSERT INTO trades VALUES('abc', 123456);" \
  4. http://localhost:9000/exec

The node-fetch package can be installed using npm i node-fetch.

  1. const fetch = require("node-fetch")
  2. const qs = require("querystring")
  3. const HOST = "http://localhost:9000"
  4. async function createTable() {
  5. try {
  6. const queryData = {
  7. query: "CREATE TABLE IF NOT EXISTS trades (name STRING, value INT);",
  8. }
  9. const response = await fetch(`${HOST}/exec?${qs.encode(queryData)}`)
  10. const json = await response.json()
  11. console.log(json)
  12. } catch (error) {
  13. console.log(error)
  14. }
  15. }
  16. async function insertData() {
  17. try {
  18. const queryData = {
  19. query: "INSERT INTO trades VALUES('abc', 123456);",
  20. }
  21. const response = await fetch(`${HOST}/exec?${qs.encode(queryData)}`)
  22. const json = await response.json()
  23. console.log(json)
  24. } catch (error) {
  25. console.log(error)
  26. }
  27. }
  28. createTable()
  29. insertData()
  1. import requests
  2. import json
  3. host = 'http://localhost:9000'
  4. def run_query(sql_query):
  5. query_params = {'query': sql_query, 'fmt' : 'json'}
  6. try:
  7. response = requests.post(host + '/exec', params=query_params)
  8. json_response = json.loads(response.text)
  9. print(json_response)
  10. except requests.exceptions.RequestException as e:
  11. print("Error: %s" % (e))
  12. # create table
  13. run_query("CREATE TABLE IF NOT EXISTS trades (name STRING, value INT);")
  14. # insert row
  15. run_query("INSERT INTO trades VALUES('abc', 123456);")
  1. package main
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "log"
  6. "net/http"
  7. "net/url"
  8. )
  9. func main() {
  10. u, err := url.Parse("http://localhost:9000")
  11. checkErr(err)
  12. u.Path += "exec"
  13. params := url.Values{}
  14. params.Add("query", `
  15. CREATE TABLE IF NOT EXISTS
  16. trades (name STRING, value INT);
  17. INSERT INTO
  18. trades
  19. VALUES(
  20. "abc",
  21. 123456
  22. );
  23. `)
  24. u.RawQuery = params.Encode()
  25. url := fmt.Sprintf("%v", u)
  26. res, err := http.Get(url)
  27. checkErr(err)
  28. defer res.Body.Close()
  29. body, err := ioutil.ReadAll(res.Body)
  30. checkErr(err)
  31. log.Println(string(body))
  32. }
  33. func checkErr(err error) {
  34. if err != nil {
  35. panic(err)
  36. }
  37. }

Web Console

By default, QuestDB has an embedded Web Console running at http://\[server-address\]:9000. When running locally, this is accessible at http://localhost:9000. The Web Console can be used to explore table schemas, visualizing query results as tables or graphs, and importing datasets from CSV files. For details on these components, refer to the Web Console reference page.