TDengine Python Connector

taospy 是 TDengine 的官方 Python 连接器。taospy 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。taospy 对 TDengine 的原生接口REST 接口都进行了封装, 分别对应 taospy 包的 taos 模块 和 taosrest 模块。 除了对原生接口和 REST 接口的封装,taospy 还提供了符合 Python 数据访问规范(PEP 249) 的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemypandas

使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。

Python 连接器的源码托管在 GitHub

支持的平台

  • 原生连接支持的平台和 TDengine 客户端支持的平台一致。
  • REST 连接支持所有能运行 Python 的平台。

版本选择

无论使用什么版本的 TDengine 都建议使用最新版本的 taospy

支持的功能

  • 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
  • REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。

安装

准备

  1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 Python BeginnersGuide 安装。
  2. 安装 pip。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 pip documentation 安装。
  3. 如果使用原生连接,还需安装客户端驱动。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。

使用 pip 安装

卸载旧版本

如果以前安装过旧版本的 Python 连接器, 请提前卸载。

  1. pip3 uninstall taos taospy
Python - 图1note

较早的 TDengine 客户端软件包含了 Python 连接器。如果从客户端软件的安装目录安装了 Python 连接器,那么对应的 Python 包名是 taos。 所以上述卸载命令包含了 taos, 不存在也没关系。

安装 taospy

  • 从 PyPI 安装
  • 从 GitHub 安装

安装最新版本

  1. pip3 install taospy

也可以指定某个特定版本安装。

  1. pip3 install taospy==2.3.0
  1. pip3 install git+https://github.com/taosdata/taos-connector-python.git

安装 taos-ws-py(可选)

taos-ws-py 包提供了通过 WebSocket 连接 TDengine 的能力,可选安装 taos-ws-py 以获得 WebSocket 连接 TDengine 的能力。

和 taospy 同时安装
  1. pip3 install taospy[ws]
单独安装
  1. pip3 install taos-ws-py

安装验证

  • 原生连接
  • REST 连接

对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 taos 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:

  1. import taos

对于 REST 连接,只需验证是否能成功导入 taosrest 模块。可在 Python 交互式 Shell 中输入:

  1. import taosrest
Python - 图2tip

如果系统上有多个版本的 Python,则可能有多个 pip 命令。要确保使用的 pip 命令路径是正确的。上面我们用 pip3 命令安装,排除了使用 Python 2.x 版本对应的 pip 的可能性。但是如果系统上有多个 Python 3.x 版本,仍需检查安装路径是否正确。最简单的验证方式是,在命令再次输入 pip3 install taospy, 就会打印出 taospy 的具体安装位置,比如在 Windows 上:

  1. C:\> pip3 install taospy
  2. Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
  3. Requirement already satisfied: taospy in c:\users\username\appdata\local\programs\python\python310\lib\site-packages (2.3.0)

建立连接

连通性测试

在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。

  • 原生连接
  • REST 连接

请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 ping 命令进行测试:

  1. ping <FQDN>

然后测试用 TDengine CLI 能否正常连接集群:

  1. taos -h <FQDN> -p <PORT>

上面的 FQDN 可以为集群中任意一个 dnode 的 FQDN, PORT 为这个 dnode 对应的 serverPort。

对于 REST 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:

  1. curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"

上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。 如果测试成功,会输出服务器版本信息,比如:

  1. {
  2. "code": 0,
  3. "column_meta": [
  4. [
  5. "server_version()",
  6. "VARCHAR",
  7. 7
  8. ]
  9. ],
  10. "data": [
  11. [
  12. "3.0.0.0"
  13. ]
  14. ],
  15. "rows": 1
  16. }

使用连接器建立连接

以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。

  • 原生连接
  • REST 连接
  • WebSocket 连接
  1. import taos
  2. conn: taos.TaosConnection = taos.connect(host="localhost",
  3. user="root",
  4. password="taosdata",
  5. database="test",
  6. port=6030,
  7. config="/etc/taos", # for windows the default value is C:\TDengine\cfg
  8. timezone="Asia/Shanghai") # default your host's timezone
  9. server_version = conn.server_info
  10. print("server_version", server_version)
  11. client_version = conn.client_info
  12. print("client_version", client_version) # 3.0.0.0
  13. conn.close()
  14. # possible output:
  15. # 3.0.0.0
  16. # 3.0.0.0

查看源码

connect 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:

  • host : 要连接的节点的 FQDN。 没有默认值。如果不同提供此参数,则会连接客户端配置文件中的 firstEP。
  • user :TDengine 用户名。 默认值是 root。
  • password : TDengine 用户密码。 默认值是 taosdata。
  • port : 要连接的数据节点的起始端口,即 serverPort 配置。默认值是 6030。只有在提供了 host 参数的时候,这个参数才生效。
  • config : 客户端配置文件路径。 在 Windows 系统上默认是 C:\TDengine\cfg。 在 Linux/macOS 系统上默认是 /etc/taos/
  • timezone : 查询结果中 TIMESTAMP 类型的数据,转换为 python 的 datetime 对象时使用的时区。默认为本地时区。
Python - 图3warning

configtimezone 都是进程级别的配置。建议一个进程建立的所有连接都使用相同的参数值。否则可能产生无法预知的错误。

Python - 图4tip

connect 函数返回 taos.TaosConnection 实例。 在客户端多线程的场景下,推荐每个线程申请一个独立的连接实例,而不建议多线程共享一个连接。

  1. from taosrest import connect, TaosRestConnection, TaosRestCursor
  2. conn = connect(url="http://localhost:6041",
  3. user="root",
  4. password="taosdata",
  5. timeout=30)

查看源码

connect() 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:

  • url: taosAdapter REST 服务的 URL。默认是 http://localhost:6041
  • user: TDengine 用户名。默认是 root。
  • password: TDengine 用户密码。默认是 taosdata。
  • timeout: HTTP 请求超时时间。单位为秒。默认为 socket._GLOBAL_DEFAULT_TIMEOUT。 一般无需配置。
  1. import taosws
  2. conn = taosws.connect("taosws://root:taosdata@localhost:6041")

查看源码

connect() 函数参数为连接 url,协议为 taoswsws

示例程序

基本使用

  • 原生连接
  • REST 连接
  • WebSocket 连接
TaosConnection 类的使用

TaosConnection 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: executequeryschemaless_insertsubscribe 方法。

execute 方法

  1. conn = taos.connect()
  2. # Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
  3. conn.execute("DROP DATABASE IF EXISTS test")
  4. conn.execute("CREATE DATABASE test")
  5. # change database. same as execute "USE db"
  6. conn.select_db("test")
  7. conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
  8. affected_row = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
  9. print("affected_row", affected_row)
  10. # output:
  11. # affected_row 3

查看源码

query 方法

  1. # Execute a sql and get its result set. It's useful for SELECT statement
  2. result = conn.query("SELECT * from weather")
  3. # Get fields from result
  4. fields = result.fields
  5. for field in fields:
  6. print(field) # {name: ts, type: 9, bytes: 8}
  7. # output:
  8. # {name: ts, type: 9, bytes: 8}
  9. # {name: temperature, type: 6, bytes: 4}
  10. # {name: location, type: 4, bytes: 4}
  11. # Get data from result as list of tuple
  12. data = result.fetch_all()
  13. print(data)
  14. # output:
  15. # [(datetime.datetime(2022, 4, 27, 9, 4, 25, 367000), 23.5, 1), (datetime.datetime(2022, 4, 27, 9, 5, 25, 367000), 23.5, 1), (datetime.datetime(2022, 4, 27, 9, 6, 25, 367000), 24.399999618530273, 1)]
  16. # Or get data from result as a list of dict
  17. # map_data = result.fetch_all_into_dict()
  18. # print(map_data)
  19. # output:
  20. # [{'ts': datetime.datetime(2022, 4, 27, 9, 1, 15, 343000), 'temperature': 23.5, 'location': 1}, {'ts': datetime.datetime(2022, 4, 27, 9, 2, 15, 343000), 'temperature': 23.5, 'location': 1}, {'ts': datetime.datetime(2022, 4, 27, 9, 3, 15, 343000), 'temperature': 24.399999618530273, 'location': 1}]

查看源码

Python - 图5tip

查询结果只能获取一次。比如上面的示例中 fetch_all()fetch_all_into_dict() 只能用一个。重复获取得到的结果为空列表。

TaosResult 类的使用

上面 TaosConnection 类的使用示例中,我们已经展示了两种获取查询结果的方法: fetch_all()fetch_all_into_dict()。除此之外 TaosResult 还提供了按行迭代(rows_iter)或按数据块迭代(blocks_iter)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效。

blocks_iter 方法

  1. import taos
  2. conn = taos.connect()
  3. conn.execute("DROP DATABASE IF EXISTS test")
  4. conn.execute("CREATE DATABASE test")
  5. conn.select_db("test")
  6. conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
  7. # prepare data
  8. for i in range(2000):
  9. location = str(i % 10)
  10. tb = "t" + location
  11. conn.execute(f"INSERT INTO {tb} USING weather TAGS({location}) VALUES (now+{i}a, 23.5) (now+{i + 1}a, 23.5)")
  12. result: taos.TaosResult = conn.query("SELECT * FROM weather")
  13. block_index = 0
  14. blocks: taos.TaosBlocks = result.blocks_iter()
  15. for rows, length in blocks:
  16. print("block ", block_index, " length", length)
  17. print("first row in this block:", rows[0])
  18. block_index += 1
  19. conn.close()
  20. # possible output:
  21. # block 0 length 1200
  22. # first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 46000), 23.5, 0)
  23. # block 1 length 1200
  24. # first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 76000), 23.5, 3)
  25. # block 2 length 1200
  26. # first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 99000), 23.5, 6)
  27. # block 3 length 400
  28. # first row in this block: (datetime.datetime(2022, 4, 27, 15, 14, 52, 122000), 23.5, 9)

查看源码

TaosCursor 类的使用

TaosConnection 类和 TaosResult 类已经实现了原生接口的所有功能。如果你对 PEP249 规范中的接口比较熟悉也可以使用 TaosCursor 类提供的方法。

TaosCursor 的使用

  1. import taos
  2. conn = taos.connect()
  3. cursor = conn.cursor()
  4. cursor.execute("DROP DATABASE IF EXISTS test")
  5. cursor.execute("CREATE DATABASE test")
  6. cursor.execute("USE test")
  7. cursor.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
  8. for i in range(1000):
  9. location = str(i % 10)
  10. tb = "t" + location
  11. cursor.execute(f"INSERT INTO {tb} USING weather TAGS({location}) VALUES (now+{i}a, 23.5) (now+{i + 1}a, 23.5)")
  12. cursor.execute("SELECT count(*) FROM weather")
  13. data = cursor.fetchall()
  14. print("count:", data[0][0])
  15. cursor.execute("SELECT tbname, * FROM weather LIMIT 2")
  16. col_names = [meta[0] for meta in cursor.description]
  17. print(col_names)
  18. rows = cursor.fetchall()
  19. print(rows)
  20. cursor.close()
  21. conn.close()
  22. # output:
  23. # count: 2000
  24. # ['tbname', 'ts', 'temperature', 'location']
  25. # row_count: -1
  26. # [('t0', datetime.datetime(2022, 4, 27, 14, 54, 24, 392000), 23.5, 0), ('t0', datetime.datetime(2022, 4, 27, 14, 54, 24, 393000), 23.5, 0)]

查看源码

Python - 图6note

TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。

TaosRestCursor 类的使用

TaosRestCursor 类是对 PEP249 Cursor 接口的实现。

TaosRestCursor 的使用

  1. # create STable
  2. cursor = conn.cursor()
  3. cursor.execute("DROP DATABASE IF EXISTS power")
  4. cursor.execute("CREATE DATABASE power")
  5. cursor.execute(
  6. "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
  7. # insert data
  8. cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  9. power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  10. power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  11. power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
  12. print("inserted row count:", cursor.rowcount)
  13. # query data
  14. cursor.execute("SELECT * FROM power.meters LIMIT 3")
  15. # get total rows
  16. print("queried row count:", cursor.rowcount)
  17. # get column names from cursor
  18. column_names = [meta[0] for meta in cursor.description]
  19. # get rows
  20. data = cursor.fetchall()
  21. print(column_names)
  22. for row in data:
  23. print(row)
  24. # output:
  25. # inserted row count: 8
  26. # queried row count: 3
  27. # ['ts', 'current', 'voltage', 'phase', 'location', 'groupid']
  28. # [datetime.datetime(2018, 10, 3, 14, 38, 5, 500000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 11.8, 221, 0.28, 'california.losangeles', 2]
  29. # [datetime.datetime(2018, 10, 3, 14, 38, 16, 600000, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 13.4, 223, 0.29, 'california.losangeles', 2]
  30. # [datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.8, 223, 0.29, 'california.losangeles', 3]

查看源码

  • cursor.execute : 用来执行任意 SQL 语句。
  • cursor.rowcount: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。
  • cursor.description : 返回字段的描述信息。关于描述信息的具体格式请参考TaosRestCursor
RestClient 类的使用

RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。

RestClient 的使用

  1. from taosrest import RestClient
  2. client = RestClient("http://localhost:6041", user="root", password="taosdata")
  3. res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1")
  4. print(res)
  5. # output:
  6. # {'status': 'succ', 'head': ['ts', 'current'], 'column_meta': [['ts', 9, 8], ['current', 6, 4]], 'data': [[datetime.datetime(2018, 10, 3, 14, 38, 5, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800), '+08:00')), 10.3]], 'rows': 1}

查看源码

对于 sql() 方法更详细的介绍, 请参考 RestClient

  1. conn.execute("drop database if exists connwspy")
  2. conn.execute("create database if not exists connwspy")
  3. conn.execute("use connwspy")
  4. conn.execute("create table if not exists stb (ts timestamp, c1 int) tags (t1 int)")
  5. conn.execute("create table if not exists tb1 using stb tags (1)")
  6. conn.execute("insert into tb1 values (now, 1)")
  7. conn.execute("insert into tb1 values (now, 2)")
  8. conn.execute("insert into tb1 values (now, 3)")
  9. r = conn.execute("select * from stb")
  10. result = conn.query("select * from stb")
  11. num_of_fields = result.field_count
  12. print(num_of_fields)
  13. for row in result:
  14. print(row)
  15. # output:
  16. # 3
  17. # ('2023-02-28 15:56:13.329 +08:00', 1, 1)
  18. # ('2023-02-28 15:56:13.333 +08:00', 2, 1)
  19. # ('2023-02-28 15:56:13.337 +08:00', 3, 1)

查看源码

  • conn.execute: 用来执行任意 SQL 语句,返回影响的行数
  • conn.query: 用来执行查询 SQL 语句,返回查询结果

与 pandas 一起使用

  • 原生连接
  • REST 连接
  • WebSocket 连接
  1. import pandas
  2. from sqlalchemy import create_engine, text
  3. engine = create_engine("taos://root:taosdata@localhost:6030/power")
  4. conn = engine.connect()
  5. df = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
  6. conn.close()
  7. # print index
  8. print(df.index)
  9. # print data type of element in ts column
  10. print(type(df.ts[0]))
  11. print(df.head(3))
  12. # output:
  13. # RangeIndex(start=0, stop=8, step=1)
  14. # <class 'pandas._libs.tslibs.timestamps.Timestamp'>
  15. # ts current ... location groupid
  16. # 0 2018-10-03 14:38:05.500 11.8 ... california.losangeles 2
  17. # 1 2018-10-03 14:38:16.600 13.4 ... california.losangeles 2
  18. # 2 2018-10-03 14:38:05.000 10.8 ... california.losangeles 3

查看源码

  1. import pandas
  2. from sqlalchemy import create_engine, text
  3. engine = create_engine("taosrest://root:taosdata@localhost:6041")
  4. conn = engine.connect()
  5. df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
  6. conn.close()
  7. # print index
  8. print(df.index)
  9. # print data type of element in ts column
  10. print(type(df.ts[0]))
  11. print(df.head(3))
  12. # output:
  13. # RangeIndex(start=0, stop=8, step=1)
  14. # <class 'pandas._libs.tslibs.timestamps.Timestamp'>
  15. # ts current ... location groupid
  16. # 0 2018-10-03 06:38:05.500000+00:00 11.8 ... california.losangeles 2
  17. # 1 2018-10-03 06:38:16.600000+00:00 13.4 ... california.losangeles 2
  18. # 2 2018-10-03 06:38:05+00:00 10.8 ... california.losangeles 3

查看源码

  1. import pandas
  2. from sqlalchemy import create_engine, text
  3. import taos
  4. taos_conn = taos.connect()
  5. taos_conn.execute('drop database if exists power')
  6. taos_conn.execute('create database if not exists power')
  7. taos_conn.execute("use power")
  8. taos_conn.execute(
  9. "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
  10. # insert data
  11. taos_conn.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
  12. VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000)
  13. ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  14. power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
  15. VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  16. power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
  17. VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  18. power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
  19. VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
  20. engine = create_engine("taosws://root:taosdata@localhost:6041")
  21. conn = engine.connect()
  22. df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
  23. conn.close()
  24. # print index
  25. print(df.index)
  26. # print data type of element in ts column
  27. print(type(df.ts[0]))
  28. print(df.head(3))
  29. # output:
  30. # RangeIndex(start=0, stop=8, step=1)
  31. # <class 'pandas._libs.tslibs.timestamps.Timestamp'>
  32. # ts current ... location groupid
  33. # 0 2018-10-03 14:38:05.000 10.3 ... California.SanFrancisco 2
  34. # 1 2018-10-03 14:38:15.000 12.6 ... California.SanFrancisco 2
  35. # 2 2018-10-03 14:38:16.800 12.3 ... California.SanFrancisco 2

查看源码

数据订阅

连接器支持数据订阅功能,数据订阅功能请参考 数据订阅

  • 原生连接
  • WebSocket 连接

Consumer 提供了 Python 连接器订阅 TMQ 数据的 API,相关 API 定义请参考 数据订阅文档

  1. from taos.tmq import Consumer
  2. import taos
  3. def init_tmq_env(db, topic):
  4. conn = taos.connect()
  5. conn.execute("drop topic if exists {}".format(topic))
  6. conn.execute("drop database if exists {}".format(db))
  7. conn.execute("create database if not exists {}".format(db))
  8. conn.select_db(db)
  9. conn.execute(
  10. "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
  11. conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
  12. conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
  13. conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
  14. conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
  15. conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
  16. conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
  17. conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
  18. def cleanup(db, topic):
  19. conn = taos.connect()
  20. conn.execute("drop topic if exists {}".format(topic))
  21. conn.execute("drop database if exists {}".format(db))
  22. if __name__ == '__main__':
  23. init_tmq_env("tmq_test", "tmq_test_topic") # init env
  24. consumer = Consumer(
  25. {
  26. "group.id": "tg2",
  27. "td.connect.user": "root",
  28. "td.connect.pass": "taosdata",
  29. "enable.auto.commit": "true",
  30. }
  31. )
  32. consumer.subscribe(["tmq_test_topic"])
  33. try:
  34. while True:
  35. res = consumer.poll(1)
  36. if not res:
  37. break
  38. err = res.error()
  39. if err is not None:
  40. raise err
  41. val = res.value()
  42. for block in val:
  43. print(block.fetchall())
  44. finally:
  45. consumer.unsubscribe()
  46. consumer.close()
  47. cleanup("tmq_test", "tmq_test_topic")

查看源码

除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据。

  1. #!/usr/bin/python3
  2. from taosws import Consumer
  3. conf = {
  4. "td.connect.websocket.scheme": "ws",
  5. "group.id": "0",
  6. }
  7. consumer = Consumer(conf)
  8. consumer.subscribe(["test"])
  9. while True:
  10. message = consumer.poll(timeout=1.0)
  11. if message:
  12. id = message.vgroup()
  13. topic = message.topic()
  14. database = message.database()
  15. for block in message:
  16. nrows = block.nrows()
  17. ncols = block.ncols()
  18. for row in block:
  19. print(row)
  20. values = block.fetchall()
  21. print(nrows, ncols)
  22. # consumer.commit(message)
  23. else:
  24. break
  25. consumer.close()

查看源码

其它示例程序

示例程序链接示例程序内容
bind_multi.py参数绑定, 一次绑定多行
bind_row.py参数绑定,一次绑定一行
insert_lines.pyInfluxDB 行协议写入
json_tag.py使用 JSON 类型的标签
tmq_consumer.pytmq 订阅

其它说明

异常处理

所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:

  1. import taos
  2. try:
  3. conn = taos.connect()
  4. conn.execute("CREATE TABLE 123") # wrong sql
  5. except taos.Error as e:
  6. print(e)
  7. print("exception class: ", e.__class__.__name__)
  8. print("error number:", e.errno)
  9. print("error message:", e.msg)
  10. except BaseException as other:
  11. print("exception occur")
  12. print(other)
  13. # output:
  14. # [0x0216]: syntax error near 'Incomplete SQL statement'
  15. # exception class: ProgrammingError
  16. # error number: -2147483114
  17. # error message: syntax error near 'Incomplete SQL statement'

查看源码 ``

关于纳秒 (nanosecond)

由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。

  1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
  2. https://www.python.org/dev/peps/pep-0564/

重要更新

Release Notes

API 参考

常见问题

欢迎提问或报告问题