This page shows how to insert data into QuestDB using different programming languages and tools. To ingest data to a running instance, there are three main methods that can be used:

  • InfluxDB line protocol which provides flexibility, ease of use, and high ingestion rates
  • Postgres wire protocol for compatibility with a range of clients
  • Rest API which can be used for importing datasets from CSV

Prerequisites

This page assumes that QuestDB is running and accessible. QuestDB can be run using either Docker, the binaries or Homebrew for macOS users.

InfluxDB line protocol

QuestDB implements InfluxDB line protocol which is accessible by default on TCP port 9009. This allows using QuestDB as a drop-in replacement for InfluxDB and others implementing the protocol. Configuration settings for ingestion using this protocol can be set for for Influx line over TCP and Influx line over UDP.

More information on the InfluxDB line protocol implementation with details on message format, ports, authentication and examples can be found on the InfluxDB API reference page. Additionally, a guide on the Telegraf agent for collecting and sending metrics to QuestDB via this protocol can be found on the Telegraf guide.

<Tabs defaultValue=”nodejs” values={[ { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, { label: “Java”, value: “java” } ]}>

  1. const net = require("net")
  2. const client = new net.Socket()
  3. const HOST = "localhost"
  4. const PORT = 9009
  5. function run() {
  6. client.connect(PORT, HOST, () => {
  7. const rows = [
  8. `trades,name=test_ilp1 value=12.4 ${Date.now() * 1e6}`,
  9. `trades,name=test_ilp2 value=11.4 ${Date.now() * 1e6}`,
  10. ]
  11. rows.forEach((row) => {
  12. client.write(`${row}\n`)
  13. })
  14. client.destroy()
  15. })
  16. client.on("data", function (data) {
  17. console.log("Received: " + data)
  18. })
  19. client.on("close", function () {
  20. console.log("Connection closed")
  21. })
  22. }
  23. run()
  1. package main
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net"
  6. "time"
  7. )
  8. func main() {
  9. host := "127.0.0.1:9009"
  10. tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
  11. checkErr(err)
  12. rows := [2]string{
  13. fmt.Sprintf("trades,name=test_ilp1 value=12.4 %d", time.Now().UnixNano()),
  14. fmt.Sprintf("trades,name=test_ilp2 value=11.4 %d", time.Now().UnixNano()),
  15. }
  16. conn, err := net.DialTCP("tcp", nil, tcpAddr)
  17. checkErr(err)
  18. defer conn.Close()
  19. for _, s := range rows {
  20. _, err = conn.Write([]byte(fmt.Sprintf("%s\n", s)))
  21. checkErr(err)
  22. }
  23. result, err := ioutil.ReadAll(conn)
  24. checkErr(err)
  25. fmt.Println(string(result))
  26. }
  27. func checkErr(err error) {
  28. if err != nil {
  29. panic(err)
  30. }
  31. }
  1. import io.questdb.cutlass.line.LineProtoSender;
  2. import io.questdb.cutlass.line.tcp.LineTCPProtoSender;
  3. import io.questdb.network.Net;
  4. import io.questdb.std.Os;
  5. public class LineTCPSenderMain {
  6. /*
  7. Maven:
  8. <dependency>
  9. <groupId>org.questdb</groupId>
  10. <artifactId>questdb</artifactId>
  11. <version>{@version@}</version>
  12. </dependency>
  13. Gradle:
  14. compile group: 'org.questdb', name: 'questdb', version: '{@version@}'
  15. */
  16. public static void main(String[] args) {
  17. String hostIPv4 = "127.0.0.1";
  18. int port = 9009;
  19. int bufferCapacity = 256 * 1024;
  20. try (LineProtoSender sender = new LineTCPProtoSender(Net.parseIPv4(hostIPv4), port, bufferCapacity)) {
  21. sender
  22. .metric("trades")
  23. .tag("name", "test_ilp1")
  24. .field("value", 12.4)
  25. .$(Os.currentTimeNanos());
  26. sender
  27. .metric("trades")
  28. .tag("name", "test_ilp2")
  29. .field("value", 11.4)
  30. .$(Os.currentTimeNanos());
  31. sender.flush();
  32. }
  33. }
  34. }

Postgres compatibility

You can query data using the Postgres endpoint that QuestDB exposes. This is accessible via port 8812 by default. More information on the Postgres wire protocol implementation with details on supported features can be found on the Postgres API reference page.

<Tabs defaultValue=”nodejs” values={[ { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, { label: “Rust”, value: “rust” }, { label: “Java”, value: “java” }, { label: “Python”, value: “python” }, ]}>

This example uses the pg package which allows for quickly building queries using PostgreSQL wire protocol. Details on the use of this package can be found on the node-postgres documentation.

This example uses naive Date.now() * 1000 inserts for Timestamp types in microsecond resolution. For accurate microsecond timestamps, the node-microtime package can be used which makes system calls to tv_usec from C++.

  1. const { Client } = require("pg")
  2. const start = async () => {
  3. try {
  4. const client = new Client({
  5. database: "qdb",
  6. host: "127.0.0.1",
  7. password: "quest",
  8. port: 8812,
  9. user: "admin",
  10. })
  11. await client.connect()
  12. const createTable = await client.query(
  13. "CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);",
  14. )
  15. console.log(createTable)
  16. const insertData = await client.query(
  17. "INSERT INTO trades VALUES($1, $2, $3, $4);",
  18. [Date.now() * 1000, Date.now(), "node pg example", 123],
  19. )
  20. await client.query("COMMIT")
  21. console.log(insertData)
  22. for (let rows = 0; rows < 10; rows++) {
  23. // Providing a 'name' field allows for prepared statements / bind variables
  24. const query = {
  25. name: "insert-values",
  26. text: "INSERT INTO trades VALUES($1, $2, $3, $4);",
  27. values: [Date.now() * 1000, Date.now(), "node pg prep statement", rows],
  28. }
  29. const preparedStatement = await client.query(query)
  30. }
  31. await client.query("COMMIT")
  32. const readAll = await client.query("SELECT * FROM trades")
  33. console.log(readAll.rows)
  34. await client.end()
  35. } catch (e) {
  36. console.log(e)
  37. }
  38. }
  39. start()

This example uses the pgx driver and toolkit for postgres in Go. More details on the use of this toolkit can be found on the GitHub repository for pgx.

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/jackc/pgx/v4"
  8. )
  9. var conn *pgx.Conn
  10. var err error
  11. func main() {
  12. ctx := context.Background()
  13. conn, _ = pgx.Connect(ctx, "postgresql://admin:quest@localhost:8812/qdb")
  14. defer conn.Close(ctx)
  15. // text-based query
  16. _, err := conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")
  17. if err != nil {
  18. log.Fatalln(err)
  19. }
  20. // Prepared statement given the name 'ps1'
  21. _, err = conn.Prepare(ctx, "ps1", "INSERT INTO trades VALUES($1,$2,$3,$4)")
  22. if err != nil {
  23. log.Fatalln(err)
  24. }
  25. for i := 0; i < 10; i++ {
  26. // Execute 'ps1' statement with a string and the loop iterator value
  27. _, err = conn.Exec(ctx, "ps1", time.Now(), time.Now().Round(time.Millisecond), "go prepared statement", i+1)
  28. if err != nil {
  29. log.Fatalln(err)
  30. }
  31. }
  32. // Read all rows from table
  33. rows, err := conn.Query(ctx, "SELECT * FROM trades")
  34. fmt.Println("Reading from trades table:")
  35. for rows.Next() {
  36. var name string
  37. var value int64
  38. var ts time.Time
  39. var date time.Time
  40. err = rows.Scan(&ts, &date, &name, &value)
  41. fmt.Println(ts, date, name, value)
  42. }
  43. err = conn.Close(ctx)
  44. }

The following example shows how to use parameterized queries and prepared statements using the rust-postgres client.

  1. use postgres::{Client, NoTls, Error};
  2. use chrono::{Utc};
  3. use std::time::SystemTime;
  4. fn main() -> Result<(), Error> {
  5. let mut client = Client::connect("postgresql://admin:quest@localhost:8812/qdb", NoTls)?;
  6. // Basic query
  7. client.batch_execute("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")?;
  8. // Parameterized query
  9. let name: &str = "rust example";
  10. let val: i32 = 123;
  11. let utc = Utc::now();
  12. let sys_time = SystemTime::now();
  13. client.execute(
  14. "INSERT INTO trades VALUES($1,$2,$3,$4)",
  15. &[&utc.naive_local(), &sys_time, &name, &val],
  16. )?;
  17. // Prepared statement
  18. let mut txn = client.transaction()?;
  19. let statement = txn.prepare("insert into trades values ($1,$2,$3,$4)")?;
  20. for value in 0..10 {
  21. let utc = Utc::now();
  22. let sys_time = SystemTime::now();
  23. txn.execute(&statement, &[&utc.naive_local(), &sys_time, &name, &value])?;
  24. }
  25. txn.commit()?;
  26. println!("import finished");
  27. Ok(())
  28. }
  1. package com.myco;
  2. import java.sql.*;
  3. import java.util.Properties;
  4. class App {
  5. public static void main(String[] args) throws SQLException {
  6. Properties properties = new Properties();
  7. properties.setProperty("user", "admin");
  8. properties.setProperty("password", "quest");
  9. properties.setProperty("sslmode", "disable");
  10. final Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:8812/qdb", properties);
  11. connection.setAutoCommit(false);
  12. final PreparedStatement statement = connection.prepareStatement("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);");
  13. statement.execute();
  14. try (PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO TRADES VALUES (?, ?, ?, ?)")) {
  15. preparedStatement.setTimestamp(1, new Timestamp(io.questdb.std.Os.currentTimeMicros()));
  16. preparedStatement.setDate(2, new Date(System.currentTimeMillis()));
  17. preparedStatement.setString(3, "abc");
  18. preparedStatement.setInt(4, 123);
  19. preparedStatement.execute();
  20. }
  21. System.out.println("Done");
  22. connection.close();
  23. }
  24. }

This example uses the psychopg2 database adapter which does not support prepared statements (bind variables). This functionality is on the roadmap for the antecedent psychopg3 adapter.

  1. import psycopg2 as pg
  2. import datetime as dt
  3. try:
  4. connection = pg.connect(user="admin",
  5. password="quest",
  6. host="127.0.0.1",
  7. port="8812",
  8. database="qdb")
  9. cursor = connection.cursor()
  10. # text-only query
  11. cursor.execute("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")
  12. # insert 10 records
  13. for x in range(10):
  14. now = dt.datetime.utcnow()
  15. date = dt.datetime.now().date()
  16. cursor.execute("""
  17. INSERT INTO trades
  18. VALUES (%s, %s, %s, %s);
  19. """, (now, date, "python example", x))
  20. # commit records
  21. connection.commit()
  22. cursor.execute("SELECT * FROM trades;")
  23. records = cursor.fetchall()
  24. for row in records:
  25. print(row)
  26. finally:
  27. if (connection):
  28. cursor.close()
  29. connection.close()
  30. print("PostgreSQL connection is closed")

REST API

QuestDB exposes a REST API for compatibility with a wide range of libraries and tools. The REST API is accessible on port 9000 and has the following entrypoints:

  • /imp - import data
  • /exec - execute an SQL statement

More details on the use of these entrypoints can be found on the REST API reference page.

/imp endpoint

The /imp endpoint allows for importing a CSV file directly.

import Tabs from “@theme/Tabs” import TabItem from “@theme/TabItem”

<Tabs defaultValue=”curl” values={[ { label: “cURL”, value: “curl” }, { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, ]}>

This example imports a CSV file with automatic schema detection.

  1. curl -F data=@data.csv http://localhost:9000/imp

This example overwrites an existing table, specifies a timestamp format and a designated timestamp column. For more information on the optional parameters for specifying timestamp formats, partitioning and renaming tables, see the REST API documentation.

  1. curl \
  2. -F schema='[{"name":"ts", "type": "TIMESTAMP", "pattern": "yyyy-MM-dd - HH:mm:ss"}]' \
  3. -F data=@weather.csv 'http://localhost:9000/imp?overwrite=true&timestamp=ts'
  1. const fetch = require("node-fetch")
  2. const FormData = require("form-data")
  3. const fs = require("fs")
  4. const qs = require("querystring")
  5. const HOST = "http://localhost:9000"
  6. async function run() {
  7. const form = new FormData()
  8. form.append("data", fs.readFileSync(__dirname + "/data.csv"), {
  9. filename: fileMetadata.name,
  10. contentType: "application/octet-stream",
  11. })
  12. try {
  13. const r = await fetch(`${HOST}/imp`, {
  14. method: "POST",
  15. body: form,
  16. headers: form.getHeaders(),
  17. })
  18. console.log(r)
  19. } catch (e) {
  20. console.error(e)
  21. }
  22. }
  23. run()
  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "mime/multipart"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. )
  13. func main() {
  14. u, err := url.Parse("http://localhost:9000")
  15. checkErr(err)
  16. u.Path += "imp"
  17. url := fmt.Sprintf("%v", u)
  18. fileName := "/path/to/data.csv"
  19. file, err := os.Open(fileName)
  20. checkErr(err)
  21. defer file.Close()
  22. buf := new(bytes.Buffer)
  23. writer := multipart.NewWriter(buf)
  24. uploadFile, _ := writer.CreateFormFile("data", "data.csv")
  25. _, err = io.Copy(uploadFile, file)
  26. checkErr(err)
  27. writer.Close()
  28. req, err := http.NewRequest(http.MethodPut, url, buf)
  29. checkErr(err)
  30. req.Header.Add("Content-Type", writer.FormDataContentType())
  31. client := &http.Client{}
  32. res, err := client.Do(req)
  33. checkErr(err)
  34. defer res.Body.Close()
  35. body, err := ioutil.ReadAll(res.Body)
  36. checkErr(err)
  37. log.Println(string(body))
  38. }
  39. func checkErr(err error) {
  40. if err != nil {
  41. panic(err)
  42. }
  43. }

/exec endpoint

Alternatively, the /exec endpoint can be used to create a table and the INSERT statement can be used to populate it with values:

<Tabs defaultValue=”curl” values={[ { label: “cURL”, value: “curl” }, { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, ]}>

  1. # Create Table
  2. curl -G \
  3. --data-urlencode "query=CREATE TABLE IF NOT EXISTS trades(name STRING, value INT)" \
  4. http://localhost:9000/exec
  5. # Insert a row
  6. curl -G \
  7. --data-urlencode "query=INSERT INTO trades VALUES('abc', 123456);" \
  8. http://localhost:9000/exec

Note that these two queries can be combined into a single curl request:

  1. curl -G \
  2. --data-urlencode "query=CREATE TABLE IF NOT EXISTS trades(name STRING, value INT);\
  3. INSERT INTO trades VALUES('abc', 123456);" \
  4. http://localhost:9000/exec

The node-fetch package can be installed using npm i node-fetch.

  1. const fetch = require("node-fetch")
  2. const qs = require("querystring")
  3. const HOST = "http://localhost:9000"
  4. async function createTable() {
  5. try {
  6. const queryData = {
  7. query: "CREATE TABLE IF NOT EXISTS trades (name STRING, value INT);",
  8. }
  9. const response = await fetch(`${HOST}/exec?${qs.encode(queryData)}`)
  10. const json = await response.json()
  11. console.log(json)
  12. } catch (error) {
  13. console.log(error)
  14. }
  15. }
  16. async function insertData() {
  17. try {
  18. const queryData = {
  19. query: "INSERT INTO trades VALUES('abc', 123456);",
  20. }
  21. const response = await fetch(`${HOST}/exec?${qs.encode(queryData)}`)
  22. const json = await response.json()
  23. console.log(json)
  24. } catch (error) {
  25. console.log(error)
  26. }
  27. }
  28. createTable()
  29. insertData()
  1. package main
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "log"
  6. "net/http"
  7. "net/url"
  8. )
  9. func main() {
  10. u, err := url.Parse("http://localhost:9000")
  11. checkErr(err)
  12. u.Path += "exec"
  13. params := url.Values{}
  14. params.Add("query", `
  15. CREATE TABLE IF NOT EXISTS
  16. trades (name STRING, value INT);
  17. INSERT INTO
  18. trades
  19. VALUES(
  20. "abc",
  21. 123456
  22. );
  23. `)
  24. u.RawQuery = params.Encode()
  25. url := fmt.Sprintf("%v", u)
  26. res, err := http.Get(url)
  27. checkErr(err)
  28. defer res.Body.Close()
  29. body, err := ioutil.ReadAll(res.Body)
  30. checkErr(err)
  31. log.Println(string(body))
  32. }
  33. func checkErr(err error) {
  34. if err != nil {
  35. panic(err)
  36. }
  37. }

Web Console

By default, QuestDB has an embedded Web Console running at http://\[server-address\]:9000. When running locally, this is accessible at http://localhost:9000. The Web Console can be used to explore table schemas, visualizing query results as tables or graphs, and importing datasets from CSV files. For details on these components, refer to the Web Console reference page.