Write from Kafka

About Kafka

Apache Kafka is an open-source distributed event streaming platform, used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. For the key concepts of kafka, please refer to kafka documentation.

kafka topic

Messages in Kafka are organized by topics. A topic may have one or more partitions. We can manage kafka topics through kafka-topics.

create a topic named kafka-events:

  1. bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092

Alter kafka-events topic to set partitions to 3:

  1. bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092

Show all topics and partitions in Kafka:

  1. bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe

Insert into TDengine

We can write data into TDengine via SQL or Schemaless. For more information, please refer to Insert Using SQL or High Performance Writing or Schemaless Writing.

Examples

  • Python

python Kafka 客户端

For python kafka client, please refer to kafka client. In this document, we use kafka-python.

consume from Kafka

The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows:

  1. from kafka import KafkaConsumer
  2. consumer = KafkaConsumer('my_favorite_topic')
  3. for msg in consumer:
  4. print (msg)

For higher performance, we can consume message from kafka in batch. The demo is as follows:

  1. from kafka import KafkaConsumer
  2. consumer = KafkaConsumer('my_favorite_topic')
  3. while True:
  4. msgs = consumer.poll(timeout_ms=500, max_records=1000)
  5. if msgs:
  6. print (msgs)

multi-threading

For more higher performance we can process data from kafka in multi-thread. We can use python’s ThreadPoolExecutor to achieve multithreading. The demo is as follows:

  1. from concurrent.futures import ThreadPoolExecutor, Future
  2. pool = ThreadPoolExecutor(max_workers=10)
  3. pool.submit(...)

multi-process

For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows:

  1. from multiprocessing import Process
  2. ps = []
  3. for i in range(5):
  4. p = Process(target=Consumer().consume())
  5. p.start()
  6. ps.append(p)
  7. for p in ps:
  8. p.join()

In addition to python’s built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn.

Examples

  1. #! encoding = utf-8
  2. import json
  3. import time
  4. from json import JSONDecodeError
  5. from typing import Callable
  6. import logging
  7. from concurrent.futures import ThreadPoolExecutor, Future
  8. import taos
  9. from kafka import KafkaConsumer
  10. from kafka.consumer.fetcher import ConsumerRecord
  11. class Consumer(object):
  12. DEFAULT_CONFIGS = {
  13. 'kafka_brokers': 'localhost:9092',
  14. 'kafka_topic': 'python_kafka',
  15. 'kafka_group_id': 'taos',
  16. 'taos_host': 'localhost',
  17. 'taos_user': 'root',
  18. 'taos_password': 'taosdata',
  19. 'taos_database': 'power',
  20. 'taos_port': 6030,
  21. 'timezone': None,
  22. 'clean_after_testing': False,
  23. 'bath_consume': True,
  24. 'batch_size': 1000,
  25. 'async_model': True,
  26. 'workers': 10
  27. }
  28. LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
  29. 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
  30. 'California.SantaClara', 'California.Cupertino']
  31. CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1'
  32. USE_DATABASE_SQL = 'use {}'
  33. DROP_TABLE_SQL = 'drop table if exists meters'
  34. DROP_DATABASE_SQL = 'drop database if exists {}'
  35. CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \
  36. 'tags (location binary(64), groupId int)'
  37. CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
  38. INSERT_SQL_HEADER = "insert into "
  39. INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})'
  40. def __init__(self, **configs):
  41. self.config: dict = self.DEFAULT_CONFIGS
  42. self.config.update(configs)
  43. self.consumer = KafkaConsumer(
  44. self.config.get('kafka_topic'), # topic
  45. bootstrap_servers=self.config.get('kafka_brokers'),
  46. group_id=self.config.get('kafka_group_id'),
  47. )
  48. self.taos = taos.connect(
  49. host=self.config.get('taos_host'),
  50. user=self.config.get('taos_user'),
  51. password=self.config.get('taos_password'),
  52. port=self.config.get('taos_port'),
  53. timezone=self.config.get('timezone'),
  54. )
  55. if self.config.get('async_model'):
  56. self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
  57. self.tasks: list[Future] = []
  58. # tags and table mapping # key: {location}_{groupId} value:
  59. self.tag_table_mapping = {}
  60. i = 0
  61. for location in self.LOCATIONS:
  62. for j in range(1, 11):
  63. table_name = 'd{}'.format(i)
  64. self._cache_table(location=location, group_id=j, table_name=table_name)
  65. i += 1
  66. def init_env(self):
  67. # create database and table
  68. self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
  69. self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database')))
  70. self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database')))
  71. self.taos.execute(self.DROP_TABLE_SQL)
  72. self.taos.execute(self.CREATE_STABLE_SQL)
  73. for tags, table_name in self.tag_table_mapping.items():
  74. location, group_id = _get_location_and_group(tags)
  75. self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id))
  76. def consume(self):
  77. logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic'))
  78. try:
  79. if self.config.get('bath_consume'):
  80. self._run_batch(self._to_taos_batch)
  81. else:
  82. self._run(self._to_taos)
  83. except KeyboardInterrupt:
  84. logging.warning("## caught keyboard interrupt, stopping")
  85. finally:
  86. self.stop()
  87. def stop(self):
  88. # close consumer
  89. if self.consumer is not None:
  90. self.consumer.commit()
  91. self.consumer.close()
  92. # multi thread
  93. if self.config.get('async_model'):
  94. for task in self.tasks:
  95. while not task.done():
  96. pass
  97. if self.pool is not None:
  98. self.pool.shutdown()
  99. # clean data
  100. if self.config.get('clean_after_testing'):
  101. self.taos.execute(self.DROP_TABLE_SQL)
  102. self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
  103. # close taos
  104. if self.taos is not None:
  105. self.taos.close()
  106. def _run(self, f: Callable[[ConsumerRecord], bool]):
  107. for message in self.consumer:
  108. if self.config.get('async_model'):
  109. self.pool.submit(f(message))
  110. else:
  111. f(message)
  112. def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
  113. while True:
  114. messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
  115. if messages:
  116. if self.config.get('async_model'):
  117. self.pool.submit(f, messages.values())
  118. else:
  119. f(list(messages.values()))
  120. if not messages:
  121. time.sleep(0.1)
  122. def _to_taos(self, message: ConsumerRecord) -> bool:
  123. sql = self.INSERT_SQL_HEADER + self._build_sql(message.value)
  124. if len(sql) == 0: # decode error, skip
  125. return True
  126. logging.info('## insert sql %s', sql)
  127. return self.taos.execute(sql=sql) == 1
  128. def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
  129. sql = self._build_sql_batch(messages=messages)
  130. if len(sql) == 0: # decode error, skip
  131. return
  132. self.taos.execute(sql=sql)
  133. def _build_sql(self, msg_value: str) -> str:
  134. try:
  135. data = json.loads(msg_value)
  136. except JSONDecodeError as e:
  137. logging.error('## decode message [%s] error ', msg_value, e)
  138. return ''
  139. location = data.get('location')
  140. group_id = data.get('groupId')
  141. ts = data.get('ts')
  142. current = data.get('current')
  143. voltage = data.get('voltage')
  144. phase = data.get('phase')
  145. table_name = self._get_table_name(location=location, group_id=group_id)
  146. return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
  147. def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
  148. sql_list = []
  149. for partition_messages in messages:
  150. for message in partition_messages:
  151. sql_list.append(self._build_sql(message.value))
  152. return self.INSERT_SQL_HEADER + ' '.join(sql_list)
  153. def _cache_table(self, location: str, group_id: int, table_name: str):
  154. self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name
  155. def _get_table_name(self, location: str, group_id: int) -> str:
  156. return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id))
  157. def _tag_table_mapping_key(location: str, group_id: int):
  158. return '{}_{}'.format(location, group_id)
  159. def _get_location_and_group(key: str) -> (str, int):
  160. fields = key.split('_')
  161. return fields[0], fields[1]
  162. if __name__ == '__main__':
  163. consumer = Consumer(async_model=True)
  164. consumer.init_env()
  165. consumer.consume()

view source code