Data Subscription

TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.

To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.

By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.

To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.

Data Schema and API

The related schemas and APIs in various languages are described as follows:

  • C
  • Java
  • Python
  • Go
  • Rust
  • Node.JS
  • C#
  1. typedef struct tmq_t tmq_t;
  2. typedef struct tmq_conf_t tmq_conf_t;
  3. typedef struct tmq_list_t tmq_list_t;
  4. typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
  5. DLL_EXPORT tmq_list_t *tmq_list_new();
  6. DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
  7. DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
  8. DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
  9. DLL_EXPORT const char *tmq_err2str(int32_t code);
  10. DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
  11. DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
  12. DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
  13. DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
  14. DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
  15. DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
  16. enum tmq_conf_res_t {
  17. TMQ_CONF_UNKNOWN = -2,
  18. TMQ_CONF_INVALID = -1,
  19. TMQ_CONF_OK = 0,
  20. };
  21. typedef enum tmq_conf_res_t tmq_conf_res_t;
  22. DLL_EXPORT tmq_conf_t *tmq_conf_new();
  23. DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
  24. DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
  25. DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);

For more information, see C/C++ Connector.

The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below.

  1. void subscribe(Collection<String> topics) throws SQLException;
  2. void unsubscribe() throws SQLException;
  3. Set<String> subscription() throws SQLException;
  4. ConsumerRecords<V> poll(Duration timeout) throws SQLException;
  5. void commitAsync();
  6. void commitAsync(OffsetCommitCallback callback);
  7. void commitSync() throws SQLException;
  8. void close() throws SQLException;
  1. class TaosConsumer():
  2. def __init__(self, *topics, **configs)
  3. def __iter__(self)
  4. def __next__(self)
  5. def sync_next(self)
  6. def subscription(self)
  7. def unsubscribe(self)
  8. def close(self)
  9. def __del__(self)
  1. func NewConsumer(conf *Config) (*Consumer, error)
  2. func (c *Consumer) Close() error
  3. func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
  4. func (c *Consumer) FreeMessage(message unsafe.Pointer)
  5. func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
  6. func (c *Consumer) Subscribe(topics []string) error
  7. func (c *Consumer) Unsubscribe() error
  1. impl TBuilder for TmqBuilder
  2. fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
  3. fn build(&self) -> Result<Self::Target, Self::Error>
  4. impl AsAsyncConsumer for Consumer
  5. async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
  6. &mut self,
  7. topics: I,
  8. ) -> Result<(), Self::Error>;
  9. fn stream(
  10. &self,
  11. ) -> Pin<
  12. Box<
  13. dyn '_
  14. + Send
  15. + futures::Stream<
  16. Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
  17. >,
  18. >,
  19. >;
  20. async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
  21. async fn unsubscribe(self);

For more information, see Crate taos.

  1. function TMQConsumer(config)
  2. function subscribe(topic)
  3. function consume(timeout)
  4. function subscription()
  5. function unsubscribe()
  6. function commit(msg)
  7. function close()
  1. ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
  2. virtual IConsumer Build()
  3. Consumer(ConsumerBuilder builder)
  4. void Subscribe(IEnumerable<string> topics)
  5. void Subscribe(string topic)
  6. ConsumeResult Consume(int millisecondsTimeout)
  7. List<string> Subscription()
  8. void Unsubscribe()
  9. void Commit(ConsumeResult consumerResult)
  10. void Close()

Insert Data into TDengine

A database including one supertable and two subtables is created as follows:

  1. DROP DATABASE IF EXISTS tmqdb;
  2. CREATE DATABASE tmqdb;
  3. CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
  4. CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
  5. CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
  6. INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
  7. INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');

Create a Topic

The following SQL statement creates a topic in TDengine:

  1. CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;

Multiple subscription types are supported.

Subscribe to a Column

Syntax:

  1. CREATE TOPIC topic_name as subquery

You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as SELECT * and SELECT ts, cl are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:

  • The schema of topics created in this manner is determined by the subscribed data.
  • You cannot modify (ALTER <table> MODIFY) or delete (ALTER <table> DROP) columns or tags that are used in a subscription or calculation.
  • Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.

Subscribe to a Supertable

Syntax:

  1. CREATE TOPIC topic_name AS STABLE stb_name

Creating a topic in this manner differs from a SELECT * from stbName statement as follows:

  • The table schema can be modified.
  • Unstructured data is returned. The format of the data returned changes based on the supertable schema.
  • A different table schema may exist for every data block to be processed.
  • The data returned does not include tags.

Subscribe to a Database

Syntax:

  1. CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;

This SQL statement creates a subscription to all tables in the database. You can add the WITH META parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.

Create a Consumer

You configure the following parameters when creating a consumer:

ParameterTypeDescriptionRemarks
td.connect.ipstringUsed in establishing a connection; same as taos_connect
td.connect.userstringUsed in establishing a connection; same as taos_connect
td.connect.passstringUsed in establishing a connection; same as taos_connect
td.connect.portstringUsed in establishing a connection; same as taos_connect
group.idstringConsumer group ID; consumers with the same ID are in the same groupRequired. Maximum length: 192.
client.idstringClient IDMaximum length: 192.
auto.offset.resetenumInitial offset for the consumer groupSpecify earliest, latest, or none(default)
enable.auto.commitbooleanCommit automaticallySpecify true or false.
auto.commit.interval.msintegerInterval for automatic commits, in milliseconds
enable.heartbeat.backgroundbooleanBackend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time
experimental.snapshot.enablebooleanSpecify whether to consume messages from the WAL or from TSBS
msg.with.table.namebooleanSpecify whether to deserialize table names from messages

The method of specifying these parameters depends on the language used:

  • C
  • Java
  • Go
  • Rust
  • Python
  • Node.JS
  • C#
  1. /* Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
  2. an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) */
  3. tmq_conf_t* conf = tmq_conf_new();
  4. tmq_conf_set(conf, "enable.auto.commit", "true");
  5. tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
  6. tmq_conf_set(conf, "group.id", "cgrpName");
  7. tmq_conf_set(conf, "td.connect.user", "root");
  8. tmq_conf_set(conf, "td.connect.pass", "taosdata");
  9. tmq_conf_set(conf, "auto.offset.reset", "earliest");
  10. tmq_conf_set(conf, "experimental.snapshot.enable", "true");
  11. tmq_conf_set(conf, "msg.with.table.name", "true");
  12. tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
  13. tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
  14. tmq_conf_destroy(conf);

Java programs use the following parameters:

ParameterTypeDescription
bootstrap.serversstringConnection address, such as localhost:6030
value.deserializerstringValue deserializer; to use this method, implement the com.taosdata.jdbc.tmq.Deserializer interface or inherit the com.taosdata.jdbc.tmq.ReferenceDeserializer type
value.deserializer.encodingstringSpecify the encoding for string deserialization

Note: The bootstrap.servers parameter is used instead of td.connect.ip and td.connect.port to provide an interface that is consistent with Kafka.

  1. Properties properties = new Properties();
  2. properties.setProperty("enable.auto.commit", "true");
  3. properties.setProperty("auto.commit.interval.ms", "1000");
  4. properties.setProperty("group.id", "cgrpName");
  5. properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
  6. properties.setProperty("td.connect.user", "root");
  7. properties.setProperty("td.connect.pass", "taosdata");
  8. properties.setProperty("auto.offset.reset", "earliest");
  9. properties.setProperty("msg.with.table.name", "true");
  10. properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
  11. TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
  12. /* value deserializer definition. */
  13. import com.taosdata.jdbc.tmq.ReferenceDeserializer;
  14. public class MetersDeserializer extends ReferenceDeserializer<Meters> {
  15. }
  1. config := tmq.NewConfig()
  2. defer config.Destroy()
  3. err = config.SetGroupID("test")
  4. if err != nil {
  5. panic(err)
  6. }
  7. err = config.SetAutoOffsetReset("earliest")
  8. if err != nil {
  9. panic(err)
  10. }
  11. err = config.SetConnectIP("127.0.0.1")
  12. if err != nil {
  13. panic(err)
  14. }
  15. err = config.SetConnectUser("root")
  16. if err != nil {
  17. panic(err)
  18. }
  19. err = config.SetConnectPass("taosdata")
  20. if err != nil {
  21. panic(err)
  22. }
  23. err = config.SetConnectPort("6030")
  24. if err != nil {
  25. panic(err)
  26. }
  27. err = config.SetMsgWithTableName(true)
  28. if err != nil {
  29. panic(err)
  30. }
  31. err = config.EnableHeartBeat()
  32. if err != nil {
  33. panic(err)
  34. }
  35. err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
  36. if result.ErrCode != 0 {
  37. errStr := wrapper.TMQErr2Str(result.ErrCode)
  38. err := errors.NewError(int(result.ErrCode), errStr)
  39. panic(err)
  40. }
  41. })
  42. if err != nil {
  43. panic(err)
  44. }
  1. let mut dsn: Dsn = "taos://".parse()?;
  2. dsn.set("group.id", "group1");
  3. dsn.set("client.id", "test");
  4. dsn.set("auto.offset.reset", "earliest");
  5. let tmq = TmqBuilder::from_dsn(dsn)?;
  6. let mut consumer = tmq.build()?;

Python programs use the following parameters:

ParameterTypeDescriptionRemarks
td_connect_ipstringUsed in establishing a connection; same as taos_connect
td_connect_userstringUsed in establishing a connection; same as taos_connect
td_connect_passstringUsed in establishing a connection; same as taos_connect
td_connect_portstringUsed in establishing a connection; same as taos_connect
group_idstringConsumer group ID; consumers with the same ID are in the same groupRequired. Maximum length: 192.
client_idstringClient IDMaximum length: 192.
auto_offset_resetstringInitial offset for the consumer groupSpecify earliest, latest, or none(default)
enable_auto_commitstringCommit automaticallySpecify true or false.
auto_commit_interval_msstringInterval for automatic commits, in milliseconds
enable_heartbeat_backgroundstringBackend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long timeSpecify true or false.
experimental_snapshot_enablestringSpecify whether to consume messages from the WAL or from TSBSSpecify true or false.
msg_with_table_namestringSpecify whether to deserialize table names from messagesSpecify true or false.
timeoutintConsumer pull timeout
  1. // Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
  2. // an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass)
  3. let consumer = taos.consumer({
  4. 'enable.auto.commit': 'true',
  5. 'auto.commit.interval.ms','1000',
  6. 'group.id': 'tg2',
  7. 'td.connect.user': 'root',
  8. 'td.connect.pass': 'taosdata',
  9. 'auto.offset.reset','earliest',
  10. 'msg.with.table.name': 'true',
  11. 'td.connect.ip','127.0.0.1',
  12. 'td.connect.port','6030'
  13. });
  1. using TDengineTMQ;
  2. // Create consumer groups on demand (GourpID) and enable automatic commits (EnableAutoCommit),
  3. // an automatic commit interval (AutoCommitIntervalMs), and a username (TDConnectUser) and password (TDConnectPasswd)
  4. var cfg = new ConsumerConfig
  5. {
  6. EnableAutoCommit = "true"
  7. AutoCommitIntervalMs = "1000"
  8. GourpId = "TDengine-TMQ-C#",
  9. TDConnectUser = "root",
  10. TDConnectPasswd = "taosdata",
  11. AutoOffsetReset = "earliest"
  12. MsgWithTableName = "true",
  13. TDConnectIp = "127.0.0.1",
  14. TDConnectPort = "6030"
  15. };
  16. var consumer = new ConsumerBuilder(cfg).Build();

A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.

Subscribe to a Topic

A single consumer can subscribe to multiple topics.

  • C
  • Java
  • Go
  • Rust
  • Python
  • Node.JS
  • C#
  1. // Create a list of subscribed topics
  2. tmq_list_t* topicList = tmq_list_new();
  3. tmq_list_append(topicList, "topicName");
  4. // Enable subscription
  5. tmq_subscribe(tmq, topicList);
  6. tmq_list_destroy(topicList);
  1. List<String> topics = new ArrayList<>();
  2. topics.add("tmq_topic");
  3. consumer.subscribe(topics);
  1. consumer, err := tmq.NewConsumer(config)
  2. if err != nil {
  3. panic(err)
  4. }
  5. err = consumer.Subscribe([]string{"example_tmq_topic"})
  6. if err != nil {
  7. panic(err)
  8. }
  1. consumer.subscribe(["tmq_meters"]).await?;
  1. consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
  1. // Create a list of subscribed topics
  2. let topics = ['topic_test']
  3. // Enable subscription
  4. consumer.subscribe(topics);
  1. // Create a list of subscribed topics
  2. List<String> topics = new List<string>();
  3. topics.add("tmq_topic");
  4. // Enable subscription
  5. consumer.Subscribe(topics);

Consume messages

The following code demonstrates how to consume the messages in a queue.

  • C
  • Java
  • Go
  • Rust
  • Python
  • Node.JS
  • C#
  1. ## Consume data
  2. while (running) {
  3. TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
  4. msg_process(msg);
  5. }

The while loop obtains a message each time it calls tmq_consumer_poll(). This message is exactly the same as the result returned by a query, and the same deserialization API can be used on it.

  1. while(running){
  2. ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
  3. for (Meters meter : meters) {
  4. processMsg(meter);
  5. }
  6. }
  1. for {
  2. result, err := consumer.Poll(time.Second)
  3. if err != nil {
  4. panic(err)
  5. }
  6. fmt.Println(result)
  7. consumer.Commit(context.Background(), result.Message)
  8. consumer.FreeMessage(result.Message)
  9. }
  1. {
  2. let mut stream = consumer.stream();
  3. while let Some((offset, message)) = stream.try_next().await? {
  4. // get information from offset
  5. // the topic
  6. let topic = offset.topic();
  7. // the vgroup id, like partition id in kafka.
  8. let vgroup_id = offset.vgroup_id();
  9. println!("* in vgroup id {vgroup_id} of topic {topic}\n");
  10. if let Some(data) = message.into_data() {
  11. while let Some(block) = data.fetch_raw_block().await? {
  12. // one block for one table, get table name if needed
  13. let name = block.table_name();
  14. let records: Vec<Record> = block.deserialize().try_collect()?;
  15. println!(
  16. "** table: {}, got {} records: {:#?}\n",
  17. name.unwrap(),
  18. records.len(),
  19. records
  20. );
  21. }
  22. }
  23. consumer.commit(offset).await?;
  24. }
  25. }
  1. for msg in consumer:
  2. for row in msg:
  3. print(row)
  1. while(true){
  2. msg = consumer.consume(200);
  3. // process message(consumeResult)
  4. console.log(msg.topicPartition);
  5. console.log(msg.block);
  6. console.log(msg.fields)
  7. }
  1. ## Consume data
  2. while (true)
  3. {
  4. var consumerRes = consumer.Consume(100);
  5. // process ConsumeResult
  6. ProcessMsg(consumerRes);
  7. consumer.Commit(consumerRes);
  8. }

Close the consumer

After message consumption is finished, the consumer is unsubscribed.

  • C
  • Java
  • Go
  • Rust
  • Python
  • Node.JS
  • C#
  1. /* Unsubscribe */
  2. tmq_unsubscribe(tmq);
  3. /* Close consumer object */
  4. tmq_consumer_close(tmq);
  1. /* Unsubscribe */
  2. consumer.unsubscribe();
  3. /* Close consumer */
  4. consumer.close();
  1. consumer.Close()
  1. consumer.unsubscribe().await;
  1. # Unsubscribe
  2. consumer.unsubscribe()
  3. # Close consumer
  4. consumer.close()
  1. consumer.unsubscribe();
  2. consumer.close();
  1. // Unsubscribe
  2. consumer.Unsubscribe();
  3. // Close consumer
  4. consumer.Close();

Delete a Topic

You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.

  1. /* Delete topic/
  2. DROP TOPIC topic_name;

Check Status

  1. Query all existing topics.
  1. SHOW TOPICS;
  1. Query the status and subscribed topics of all consumers.
  1. SHOW CONSUMERS;
  1. Query the relationships between consumers and vgroups.
  1. SHOW SUBSCRIPTIONS;

Examples

The following section shows sample code in various languages.

  • C
  • Java
  • Go
  • Rust
  • Python
  • Node.JS
  • C#
  1. /*
  2. * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
  3. *
  4. * This program is free software: you can use, redistribute, and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3
  6. * or later ("AGPL"), as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful, but WITHOUT
  9. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. * FITNESS FOR A PARTICULAR PURPOSE.
  11. *
  12. * You should have received a copy of the GNU Affero General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include <assert.h>
  16. #include <stdio.h>
  17. #include <stdlib.h>
  18. #include <string.h>
  19. #include <time.h>
  20. #include "taos.h"
  21. static int running = 1;
  22. static char dbName[64] = "tmqdb";
  23. static char stbName[64] = "stb";
  24. static char topicName[64] = "topicname";
  25. static int32_t msg_process(TAOS_RES* msg) {
  26. char buf[1024];
  27. int32_t rows = 0;
  28. const char* topicName = tmq_get_topic_name(msg);
  29. const char* dbName = tmq_get_db_name(msg);
  30. int32_t vgroupId = tmq_get_vgroup_id(msg);
  31. printf("topic: %s\n", topicName);
  32. printf("db: %s\n", dbName);
  33. printf("vgroup id: %d\n", vgroupId);
  34. while (1) {
  35. TAOS_ROW row = taos_fetch_row(msg);
  36. if (row == NULL) break;
  37. TAOS_FIELD* fields = taos_fetch_fields(msg);
  38. int32_t numOfFields = taos_field_count(msg);
  39. int32_t* length = taos_fetch_lengths(msg);
  40. int32_t precision = taos_result_precision(msg);
  41. rows++;
  42. taos_print_row(buf, row, fields, numOfFields);
  43. printf("row content: %s\n", buf);
  44. }
  45. return rows;
  46. }
  47. static int32_t init_env() {
  48. TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  49. if (pConn == NULL) {
  50. return -1;
  51. }
  52. TAOS_RES* pRes;
  53. // drop database if exists
  54. printf("create database\n");
  55. pRes = taos_query(pConn, "drop database if exists tmqdb");
  56. if (taos_errno(pRes) != 0) {
  57. printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
  58. return -1;
  59. }
  60. taos_free_result(pRes);
  61. // create database
  62. pRes = taos_query(pConn, "create database tmqdb");
  63. if (taos_errno(pRes) != 0) {
  64. printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
  65. return -1;
  66. }
  67. taos_free_result(pRes);
  68. // create super table
  69. printf("create super table\n");
  70. pRes = taos_query(
  71. pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
  72. if (taos_errno(pRes) != 0) {
  73. printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
  74. return -1;
  75. }
  76. taos_free_result(pRes);
  77. // create sub tables
  78. printf("create sub tables\n");
  79. pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
  80. if (taos_errno(pRes) != 0) {
  81. printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
  82. return -1;
  83. }
  84. taos_free_result(pRes);
  85. pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
  86. if (taos_errno(pRes) != 0) {
  87. printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
  88. return -1;
  89. }
  90. taos_free_result(pRes);
  91. pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
  92. if (taos_errno(pRes) != 0) {
  93. printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
  94. return -1;
  95. }
  96. taos_free_result(pRes);
  97. pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
  98. if (taos_errno(pRes) != 0) {
  99. printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
  100. return -1;
  101. }
  102. taos_free_result(pRes);
  103. // insert data
  104. printf("insert data into sub tables\n");
  105. pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
  106. if (taos_errno(pRes) != 0) {
  107. printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
  108. return -1;
  109. }
  110. taos_free_result(pRes);
  111. pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
  112. if (taos_errno(pRes) != 0) {
  113. printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
  114. return -1;
  115. }
  116. taos_free_result(pRes);
  117. pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
  118. if (taos_errno(pRes) != 0) {
  119. printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
  120. return -1;
  121. }
  122. taos_free_result(pRes);
  123. pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
  124. if (taos_errno(pRes) != 0) {
  125. printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
  126. return -1;
  127. }
  128. taos_free_result(pRes);
  129. taos_close(pConn);
  130. return 0;
  131. }
  132. int32_t create_topic() {
  133. printf("create topic\n");
  134. TAOS_RES* pRes;
  135. TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  136. if (pConn == NULL) {
  137. return -1;
  138. }
  139. pRes = taos_query(pConn, "use tmqdb");
  140. if (taos_errno(pRes) != 0) {
  141. printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
  142. return -1;
  143. }
  144. taos_free_result(pRes);
  145. pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
  146. if (taos_errno(pRes) != 0) {
  147. printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
  148. return -1;
  149. }
  150. taos_free_result(pRes);
  151. taos_close(pConn);
  152. return 0;
  153. }
  154. void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
  155. printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
  156. }
  157. tmq_t* build_consumer() {
  158. tmq_conf_res_t code;
  159. tmq_conf_t* conf = tmq_conf_new();
  160. code = tmq_conf_set(conf, "enable.auto.commit", "true");
  161. if (TMQ_CONF_OK != code) {
  162. tmq_conf_destroy(conf);
  163. return NULL;
  164. }
  165. code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
  166. if (TMQ_CONF_OK != code) {
  167. tmq_conf_destroy(conf);
  168. return NULL;
  169. }
  170. code = tmq_conf_set(conf, "group.id", "cgrpName");
  171. if (TMQ_CONF_OK != code) {
  172. tmq_conf_destroy(conf);
  173. return NULL;
  174. }
  175. code = tmq_conf_set(conf, "client.id", "user defined name");
  176. if (TMQ_CONF_OK != code) {
  177. tmq_conf_destroy(conf);
  178. return NULL;
  179. }
  180. code = tmq_conf_set(conf, "td.connect.user", "root");
  181. if (TMQ_CONF_OK != code) {
  182. tmq_conf_destroy(conf);
  183. return NULL;
  184. }
  185. code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
  186. if (TMQ_CONF_OK != code) {
  187. tmq_conf_destroy(conf);
  188. return NULL;
  189. }
  190. code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
  191. if (TMQ_CONF_OK != code) {
  192. tmq_conf_destroy(conf);
  193. return NULL;
  194. }
  195. code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
  196. if (TMQ_CONF_OK != code) {
  197. tmq_conf_destroy(conf);
  198. return NULL;
  199. }
  200. tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
  201. tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
  202. tmq_conf_destroy(conf);
  203. return tmq;
  204. }
  205. tmq_list_t* build_topic_list() {
  206. tmq_list_t* topicList = tmq_list_new();
  207. int32_t code = tmq_list_append(topicList, "topicname");
  208. if (code) {
  209. return NULL;
  210. }
  211. return topicList;
  212. }
  213. void basic_consume_loop(tmq_t* tmq) {
  214. int32_t totalRows = 0;
  215. int32_t msgCnt = 0;
  216. int32_t timeout = 5000;
  217. while (running) {
  218. TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
  219. if (tmqmsg) {
  220. msgCnt++;
  221. totalRows += msg_process(tmqmsg);
  222. taos_free_result(tmqmsg);
  223. } else {
  224. break;
  225. }
  226. }
  227. fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
  228. }
  229. int main(int argc, char* argv[]) {
  230. int32_t code;
  231. if (init_env() < 0) {
  232. return -1;
  233. }
  234. if (create_topic() < 0) {
  235. return -1;
  236. }
  237. tmq_t* tmq = build_consumer();
  238. if (NULL == tmq) {
  239. fprintf(stderr, "%% build_consumer() fail!\n");
  240. return -1;
  241. }
  242. tmq_list_t* topic_list = build_topic_list();
  243. if (NULL == topic_list) {
  244. return -1;
  245. }
  246. if ((code = tmq_subscribe(tmq, topic_list))) {
  247. fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
  248. }
  249. tmq_list_destroy(topic_list);
  250. basic_consume_loop(tmq);
  251. code = tmq_consumer_close(tmq);
  252. if (code) {
  253. fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
  254. } else {
  255. fprintf(stderr, "%% Consumer closed\n");
  256. }
  257. return 0;
  258. }

view source code

  1. package com.taos.example;
  2. import com.taosdata.jdbc.tmq.ConsumerRecords;
  3. import com.taosdata.jdbc.tmq.TMQConstants;
  4. import com.taosdata.jdbc.tmq.TaosConsumer;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.SQLException;
  8. import java.sql.Statement;
  9. import java.time.Duration;
  10. import java.util.Collections;
  11. import java.util.Properties;
  12. import java.util.Timer;
  13. import java.util.TimerTask;
  14. import java.util.concurrent.atomic.AtomicBoolean;
  15. public class SubscribeDemo {
  16. private static final String TOPIC = "tmq_topic";
  17. private static final String DB_NAME = "meters";
  18. private static final AtomicBoolean shutdown = new AtomicBoolean(false);
  19. public static void main(String[] args) {
  20. Timer timer = new Timer();
  21. timer.schedule(new TimerTask() {
  22. public void run() {
  23. shutdown.set(true);
  24. }
  25. }, 3_000);
  26. try {
  27. // prepare
  28. Class.forName("com.taosdata.jdbc.TSDBDriver");
  29. String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
  30. Connection connection = DriverManager.getConnection(jdbcUrl);
  31. try (Statement statement = connection.createStatement()) {
  32. statement.executeUpdate("drop topic if exists " + TOPIC);
  33. statement.executeUpdate("drop database if exists " + DB_NAME);
  34. statement.executeUpdate("create database " + DB_NAME);
  35. statement.executeUpdate("use " + DB_NAME);
  36. statement.executeUpdate(
  37. "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
  38. statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
  39. statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
  40. statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
  41. statement.executeUpdate(
  42. "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
  43. statement.executeUpdate(
  44. "INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
  45. // create topic
  46. statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
  47. }
  48. // create consumer
  49. Properties properties = new Properties();
  50. properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
  51. properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
  52. properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
  53. properties.setProperty(TMQConstants.GROUP_ID, "test");
  54. properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
  55. "com.taos.example.MetersDeserializer");
  56. // poll data
  57. try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
  58. consumer.subscribe(Collections.singletonList(TOPIC));
  59. while (!shutdown.get()) {
  60. ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
  61. for (Meters meter : meters) {
  62. System.out.println(meter);
  63. }
  64. }
  65. consumer.unsubscribe();
  66. }
  67. } catch (ClassNotFoundException | SQLException e) {
  68. e.printStackTrace();
  69. }
  70. timer.cancel();
  71. }
  72. }
  73. {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
  74. [view source code](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java)
  75. package com.taos.example;
  76. import java.sql.Timestamp;
  77. public class Meters {
  78. private Timestamp ts;
  79. private float current;
  80. private int voltage;
  81. private int groupid;
  82. private String location;
  83. public Timestamp getTs() {
  84. return ts;
  85. }
  86. public void setTs(Timestamp ts) {
  87. this.ts = ts;
  88. }
  89. public float getCurrent() {
  90. return current;
  91. }
  92. public void setCurrent(float current) {
  93. this.current = current;
  94. }
  95. public int getVoltage() {
  96. return voltage;
  97. }
  98. public void setVoltage(int voltage) {
  99. this.voltage = voltage;
  100. }
  101. public int getGroupid() {
  102. return groupid;
  103. }
  104. public void setGroupid(int groupid) {
  105. this.groupid = groupid;
  106. }
  107. public String getLocation() {
  108. return location;
  109. }
  110. public void setLocation(String location) {
  111. this.location = location;
  112. }
  113. @Override
  114. public String toString() {
  115. return "Meters{" +
  116. "ts=" + ts +
  117. ", current=" + current +
  118. ", voltage=" + voltage +
  119. ", groupid=" + groupid +
  120. ", location='" + location + '\'' +
  121. '}';
  122. }
  123. }

view source code

  1. package com.taos.example;
  2. import com.taosdata.jdbc.tmq.ReferenceDeserializer;
  3. public class MetersDeserializer extends ReferenceDeserializer<Meters> {
  4. }

view source code

  1. package com.taos.example;
  2. import java.sql.Timestamp;
  3. public class Meters {
  4. private Timestamp ts;
  5. private float current;
  6. private int voltage;
  7. private int groupid;
  8. private String location;
  9. public Timestamp getTs() {
  10. return ts;
  11. }
  12. public void setTs(Timestamp ts) {
  13. this.ts = ts;
  14. }
  15. public float getCurrent() {
  16. return current;
  17. }
  18. public void setCurrent(float current) {
  19. this.current = current;
  20. }
  21. public int getVoltage() {
  22. return voltage;
  23. }
  24. public void setVoltage(int voltage) {
  25. this.voltage = voltage;
  26. }
  27. public int getGroupid() {
  28. return groupid;
  29. }
  30. public void setGroupid(int groupid) {
  31. this.groupid = groupid;
  32. }
  33. public String getLocation() {
  34. return location;
  35. }
  36. public void setLocation(String location) {
  37. this.location = location;
  38. }
  39. @Override
  40. public String toString() {
  41. return "Meters{" +
  42. "ts=" + ts +
  43. ", current=" + current +
  44. ", voltage=" + voltage +
  45. ", groupid=" + groupid +
  46. ", location='" + location + '\'' +
  47. '}';
  48. }
  49. }

view source code

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "github.com/taosdata/driver-go/v3/af"
  9. "github.com/taosdata/driver-go/v3/af/tmq"
  10. "github.com/taosdata/driver-go/v3/common"
  11. "github.com/taosdata/driver-go/v3/errors"
  12. "github.com/taosdata/driver-go/v3/wrapper"
  13. )
  14. func main() {
  15. db, err := af.Open("", "root", "taosdata", "", 0)
  16. if err != nil {
  17. panic(err)
  18. }
  19. defer db.Close()
  20. _, err = db.Exec("create database if not exists example_tmq")
  21. if err != nil {
  22. panic(err)
  23. }
  24. _, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
  25. if err != nil {
  26. panic(err)
  27. }
  28. config := tmq.NewConfig()
  29. defer config.Destroy()
  30. err = config.SetGroupID("test")
  31. if err != nil {
  32. panic(err)
  33. }
  34. err = config.SetAutoOffsetReset("earliest")
  35. if err != nil {
  36. panic(err)
  37. }
  38. err = config.SetConnectIP("127.0.0.1")
  39. if err != nil {
  40. panic(err)
  41. }
  42. err = config.SetConnectUser("root")
  43. if err != nil {
  44. panic(err)
  45. }
  46. err = config.SetConnectPass("taosdata")
  47. if err != nil {
  48. panic(err)
  49. }
  50. err = config.SetConnectPort("6030")
  51. if err != nil {
  52. panic(err)
  53. }
  54. err = config.SetMsgWithTableName(true)
  55. if err != nil {
  56. panic(err)
  57. }
  58. err = config.EnableHeartBeat()
  59. if err != nil {
  60. panic(err)
  61. }
  62. err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
  63. if result.ErrCode != 0 {
  64. errStr := wrapper.TMQErr2Str(result.ErrCode)
  65. err := errors.NewError(int(result.ErrCode), errStr)
  66. panic(err)
  67. }
  68. })
  69. if err != nil {
  70. panic(err)
  71. }
  72. consumer, err := tmq.NewConsumer(config)
  73. if err != nil {
  74. panic(err)
  75. }
  76. err = consumer.Subscribe([]string{"example_tmq_topic"})
  77. if err != nil {
  78. panic(err)
  79. }
  80. _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
  81. if err != nil {
  82. panic(err)
  83. }
  84. _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
  85. if err != nil {
  86. panic(err)
  87. }
  88. for {
  89. result, err := consumer.Poll(time.Second)
  90. if err != nil {
  91. panic(err)
  92. }
  93. if result.Type != common.TMQ_RES_DATA {
  94. panic("want message type 1 got " + strconv.Itoa(int(result.Type)))
  95. }
  96. data, _ := json.Marshal(result.Data)
  97. fmt.Println(string(data))
  98. consumer.Commit(context.Background(), result.Message)
  99. consumer.FreeMessage(result.Message)
  100. break
  101. }
  102. consumer.Close()
  103. }

view source code

  1. use std::time::Duration;
  2. use chrono::{DateTime, Local};
  3. use taos::*;
  4. // Query options 2, use deserialization with serde.
  5. #[derive(Debug, serde::Deserialize)]
  6. #[allow(dead_code)]
  7. struct Record {
  8. // deserialize timestamp to chrono::DateTime<Local>
  9. ts: DateTime<Local>,
  10. // float to f32
  11. current: Option<f32>,
  12. // int to i32
  13. voltage: Option<i32>,
  14. phase: Option<f32>,
  15. }
  16. async fn prepare(taos: Taos) -> anyhow::Result<()> {
  17. let inserted = taos.exec_many([
  18. // create child table
  19. "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
  20. // insert into child table
  21. "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
  22. // insert with NULL values
  23. "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
  24. // insert and automatically create table with tags if not exists
  25. "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
  26. // insert many records in a single sql
  27. "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
  28. ]).await?;
  29. assert_eq!(inserted, 6);
  30. Ok(())
  31. }
  32. #[tokio::main]
  33. async fn main() -> anyhow::Result<()> {
  34. let dsn = "taos://localhost:6030";
  35. let builder = TaosBuilder::from_dsn(dsn)?;
  36. let taos = builder.build()?;
  37. let db = "tmq";
  38. // prepare database
  39. taos.exec_many([
  40. format!("DROP TOPIC IF EXISTS tmq_meters"),
  41. format!("DROP DATABASE IF EXISTS `{db}`"),
  42. format!("CREATE DATABASE `{db}`"),
  43. format!("USE `{db}`"),
  44. // create super table
  45. format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
  46. // create topic for subscription
  47. format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
  48. ])
  49. .await?;
  50. let task = tokio::spawn(prepare(taos));
  51. tokio::time::sleep(Duration::from_secs(1)).await;
  52. // subscribe
  53. let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
  54. let mut consumer = tmq.build()?;
  55. consumer.subscribe(["tmq_meters"]).await?;
  56. consumer
  57. .stream()
  58. .try_for_each(|(offset, message)| async {
  59. let topic = offset.topic();
  60. // the vgroup id, like partition id in kafka.
  61. let vgroup_id = offset.vgroup_id();
  62. println!("* in vgroup id {vgroup_id} of topic {topic}\n");
  63. if let Some(data) = message.into_data() {
  64. while let Some(block) = data.fetch_raw_block().await? {
  65. let records: Vec<Record> = block.deserialize().try_collect()?;
  66. println!("** read {} records: {:#?}\n", records.len(), records);
  67. }
  68. }
  69. consumer.commit(offset).await?;
  70. Ok(())
  71. })
  72. .await?;
  73. consumer.unsubscribe().await;
  74. task.await??;
  75. Ok(())
  76. }

view source code

  1. import taos
  2. from taos.tmq import *
  3. conn = taos.connect()
  4. print("init")
  5. conn.execute("drop topic if exists topic_ctb_column")
  6. conn.execute("drop database if exists py_tmq")
  7. conn.execute("create database if not exists py_tmq vgroups 2")
  8. conn.select_db("py_tmq")
  9. conn.execute(
  10. "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
  11. )
  12. conn.execute("create table if not exists tb1 using stb1 tags(1)")
  13. conn.execute("create table if not exists tb2 using stb1 tags(2)")
  14. conn.execute("create table if not exists tb3 using stb1 tags(3)")
  15. print("create topic")
  16. conn.execute(
  17. "create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
  18. )
  19. print("build consumer")
  20. conf = TaosTmqConf()
  21. conf.set("group.id", "tg2")
  22. conf.set("td.connect.user", "root")
  23. conf.set("td.connect.pass", "taosdata")
  24. conf.set("enable.auto.commit", "true")
  25. def tmq_commit_cb_print(tmq, resp, offset, param=None):
  26. print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
  27. conf.set_auto_commit_cb(tmq_commit_cb_print, None)
  28. tmq = conf.new_consumer()
  29. print("build topic list")
  30. topic_list = TaosTmqList()
  31. topic_list.append("topic_ctb_column")
  32. print("basic consume loop")
  33. tmq.subscribe(topic_list)
  34. sub_list = tmq.subscription()
  35. print("subscribed topics: ", sub_list)
  36. while 1:
  37. res = tmq.poll(1000)
  38. if res:
  39. topic = res.get_topic_name()
  40. vg = res.get_vgroup_id()
  41. db = res.get_db_name()
  42. print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
  43. for row in res:
  44. print(row)

view source code

  1. const taos = require("@tdengine/client");
  2. const conn = taos.connect({ host: "localhost", database: "power" });
  3. var cursor = conn.cursor();
  4. function runConsumer() {
  5. // create topic
  6. cursor.execute("create topic topic_name_example as select * from meters");
  7. let consumer = taos.consumer({
  8. 'group.id': 'tg2',
  9. 'td.connect.user': 'root',
  10. 'td.connect.pass': 'taosdata',
  11. 'msg.with.table.name': 'true',
  12. 'enable.auto.commit': 'true'
  13. });
  14. // subscribe the topic just created.
  15. consumer.subscribe("topic_name_example");
  16. // get subscribe topic list
  17. let topicList = consumer.subscription();
  18. console.log(topicList);
  19. for (let i = 0; i < 5; i++) {
  20. let msg = consumer.consume(100);
  21. console.log(msg.topicPartition);
  22. console.log(msg.block);
  23. console.log(msg.fields)
  24. consumer.commit(msg);
  25. console.log(`=======consumer ${i} done`)
  26. }
  27. consumer.unsubscribe();
  28. consumer.close();
  29. // drop topic
  30. cursor.execute("drop topic topic_name_example");
  31. }
  32. try {
  33. runConsumer();
  34. } finally {
  35. setTimeout(() => {
  36. cursor.close();
  37. conn.close();
  38. }, 2000);
  39. }

view source code

  1. using System;
  2. using TDengineTMQ;
  3. using TDengineDriver;
  4. using System.Runtime.InteropServices;
  5. namespace TMQExample
  6. {
  7. internal class SubscribeDemo
  8. {
  9. static void Main(string[] args)
  10. {
  11. IntPtr conn = GetConnection();
  12. string topic = "topic_example";
  13. //create topic
  14. IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from meters");
  15. if (TDengine.ErrorNo(res) != 0 )
  16. {
  17. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  18. }
  19. var cfg = new ConsumerConfig
  20. {
  21. GourpId = "group_1",
  22. TDConnectUser = "root",
  23. TDConnectPasswd = "taosdata",
  24. MsgWithTableName = "true",
  25. TDConnectIp = "127.0.0.1",
  26. };
  27. // create consumer
  28. var consumer = new ConsumerBuilder(cfg)
  29. .Build();
  30. // subscribe
  31. consumer.Subscribe(topic);
  32. // consume
  33. for (int i = 0; i < 5; i++)
  34. {
  35. var consumeRes = consumer.Consume(300);
  36. // print consumeResult
  37. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  38. {
  39. Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  40. kv.Value.Metas.ForEach(meta =>
  41. {
  42. Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  43. });
  44. Console.WriteLine("");
  45. kv.Value.Datas.ForEach(data =>
  46. {
  47. Console.WriteLine(data.ToString());
  48. });
  49. }
  50. consumer.Commit(consumeRes);
  51. Console.WriteLine("\n================ {0} done ", i);
  52. }
  53. // retrieve topic list
  54. List<string> topics = consumer.Subscription();
  55. topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  56. // unsubscribe
  57. consumer.Unsubscribe();
  58. // close consumer after use.Otherwise will lead memory leak.
  59. consumer.Close();
  60. TDengine.Close(conn);
  61. }
  62. static IntPtr GetConnection()
  63. {
  64. string host = "localhost";
  65. short port = 6030;
  66. string username = "root";
  67. string password = "taosdata";
  68. string dbname = "power";
  69. var conn = TDengine.Connect(host, username, password, dbname, port);
  70. if (conn == IntPtr.Zero)
  71. {
  72. throw new Exception("Connect to TDengine failed");
  73. }
  74. else
  75. {
  76. Console.WriteLine("Connect to TDengine success");
  77. }
  78. return conn;
  79. }
  80. }
  81. }

view source code