TiDB 和 Python 的简单 CRUD 应用程序

本文档将展示如何使用 TiDB 和 Python 来构造一个简单的 CRUD 应用程序。

Python - 图1

注意

推荐使用 Python 3.10 及以上版本进行 TiDB 的应用程序的编写。

第 1 步:启动你的 TiDB 集群

本节将介绍 TiDB 集群的启动方法。

  • TiDB Cloud
  • 本地集群
  • Gitpod

创建 Serverless Tier 集群

你可以部署一个本地测试的 TiDB 集群或正式的 TiDB 集群。详细步骤,请参考:

基于 Git 的预配置的开发环境:现在就试试

该环境会自动克隆代码,并通过 TiUP 部署测试集群。

第 2 步:获取代码

  1. git clone https://github.com/pingcap-inc/tidb-example-python.git
  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python

SQLAlchemy 为当前比较流行的开源 Python ORM 之一。此处将以 SQLAlchemy 1.4.44 版本进行说明。

  1. import uuid
  2. from typing import List
  3. from sqlalchemy import create_engine, String, Column, Integer, select, func
  4. from sqlalchemy.orm import declarative_base, sessionmaker
  5. engine = create_engine('mysql://root:@127.0.0.1:4000/test')
  6. Base = declarative_base()
  7. Base.metadata.create_all(engine)
  8. Session = sessionmaker(bind=engine)
  9. class Player(Base):
  10. __tablename__ = "player"
  11. id = Column(String(36), primary_key=True)
  12. coins = Column(Integer)
  13. goods = Column(Integer)
  14. def __repr__(self):
  15. return f'Player(id={self.id!r}, coins={self.coins!r}, goods={self.goods!r})'
  16. def random_player(amount: int) -> List[Player]:
  17. players = []
  18. for _ in range(amount):
  19. players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
  20. return players
  21. def simple_example() -> None:
  22. with Session() as session:
  23. # create a player, who has a coin and a goods.
  24. session.add(Player(id="test", coins=1, goods=1))
  25. # get this player, and print it.
  26. get_test_stmt = select(Player).where(Player.id == "test")
  27. for player in session.scalars(get_test_stmt):
  28. print(player)
  29. # create players with bulk inserts.
  30. # insert 1919 players totally, with 114 players per batch.
  31. # each player has a random UUID
  32. player_list = random_player(1919)
  33. for idx in range(0, len(player_list), 114):
  34. session.bulk_save_objects(player_list[idx:idx + 114])
  35. # print the number of players
  36. count = session.query(func.count(Player.id)).scalar()
  37. print(f'number of players: {count}')
  38. # print 3 players.
  39. three_players = session.query(Player).limit(3).all()
  40. for player in three_players:
  41. print(player)
  42. session.commit()
  43. def trade_check(session: Session, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
  44. # sell player goods check
  45. sell_player = session.query(Player.goods).filter(Player.id == sell_id).with_for_update().one()
  46. if sell_player.goods < amount:
  47. print(f'sell player {sell_id} goods not enough')
  48. return False
  49. # buy player coins check
  50. buy_player = session.query(Player.coins).filter(Player.id == buy_id).with_for_update().one()
  51. if buy_player.coins < price:
  52. print(f'buy player {buy_id} coins not enough')
  53. return False
  54. def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
  55. with Session() as session:
  56. if trade_check(session, sell_id, buy_id, amount, price) is False:
  57. return
  58. # deduct the goods of seller, and raise his/her the coins
  59. session.query(Player).filter(Player.id == sell_id). \
  60. update({'goods': Player.goods - amount, 'coins': Player.coins + price})
  61. # deduct the coins of buyer, and raise his/her the goods
  62. session.query(Player).filter(Player.id == buy_id). \
  63. update({'goods': Player.goods + amount, 'coins': Player.coins - price})
  64. session.commit()
  65. print("trade success")
  66. def trade_example() -> None:
  67. with Session() as session:
  68. # create two players
  69. # player 1: id is "1", has only 100 coins.
  70. # player 2: id is "2", has 114514 coins, and 20 goods.
  71. session.add(Player(id="1", coins=100, goods=0))
  72. session.add(Player(id="2", coins=114514, goods=20))
  73. session.commit()
  74. # player 1 wants to buy 10 goods from player 2.
  75. # it will cost 500 coins, but player 1 cannot afford it.
  76. # so this trade will fail, and nobody will lose their coins or goods
  77. trade(sell_id="2", buy_id="1", amount=10, price=500)
  78. # then player 1 has to reduce the incoming quantity to 2.
  79. # this trade will be successful
  80. trade(sell_id="2", buy_id="1", amount=2, price=100)
  81. with Session() as session:
  82. traders = session.query(Player).filter(Player.id.in_(("1", "2"))).all()
  83. for player in traders:
  84. print(player)
  85. session.commit()
  86. simple_example()
  87. trade_example()

相较于直接使用 Driver,SQLAlchemy 屏蔽了创建数据库连接时,不同数据库差异的细节。SQLAlchemy 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。

Player 类为数据库表在程序内的映射。Player 的每个属性都对应着 player 表的一个字段。SQLAlchemy 使用 Player 类为了给 SQLAlchemy 提供更多的信息,使用了形如以上示例中的 id = Column(String(36), primary_key=True) 的类型定义,用来指示字段类型和其附加属性。id = Column(String(36), primary_key=True) 表示 id 字段为 String 类型,对应数据库类型为 VARCHAR,长度为 36,且为主键。

关于 SQLAlchemy 的更多使用方法,你可以参考 SQLAlchemy 官网

peewee 为当前比较流行的开源 Python ORM 之一。此处将以 peewee 3.15.4 版本进行说明。

  1. import os
  2. import uuid
  3. from typing import List
  4. from peewee import *
  5. from playhouse.db_url import connect
  6. db = connect('mysql://root:@127.0.0.1:4000/test')
  7. class Player(Model):
  8. id = CharField(max_length=36, primary_key=True)
  9. coins = IntegerField()
  10. goods = IntegerField()
  11. class Meta:
  12. database = db
  13. table_name = "player"
  14. def random_player(amount: int) -> List[Player]:
  15. players = []
  16. for _ in range(amount):
  17. players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
  18. return players
  19. def simple_example() -> None:
  20. # create a player, who has a coin and a goods.
  21. Player.create(id="test", coins=1, goods=1)
  22. # get this player, and print it.
  23. test_player = Player.select().where(Player.id == "test").get()
  24. print(f'id:{test_player.id}, coins:{test_player.coins}, goods:{test_player.goods}')
  25. # create players with bulk inserts.
  26. # insert 1919 players totally, with 114 players per batch.
  27. # each player has a random UUID
  28. player_list = random_player(1919)
  29. Player.bulk_create(player_list, 114)
  30. # print the number of players
  31. count = Player.select().count()
  32. print(f'number of players: {count}')
  33. # print 3 players.
  34. three_players = Player.select().limit(3)
  35. for player in three_players:
  36. print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
  37. def trade_check(sell_id: str, buy_id: str, amount: int, price: int) -> bool:
  38. sell_goods = Player.select(Player.goods).where(Player.id == sell_id).get().goods
  39. if sell_goods < amount:
  40. print(f'sell player {sell_id} goods not enough')
  41. return False
  42. buy_coins = Player.select(Player.coins).where(Player.id == buy_id).get().coins
  43. if buy_coins < price:
  44. print(f'buy player {buy_id} coins not enough')
  45. return False
  46. return True
  47. def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
  48. with db.atomic() as txn:
  49. try:
  50. if trade_check(sell_id, buy_id, amount, price) is False:
  51. txn.rollback()
  52. return
  53. # deduct the goods of seller, and raise his/her the coins
  54. Player.update(goods=Player.goods - amount, coins=Player.coins + price).where(Player.id == sell_id).execute()
  55. # deduct the coins of buyer, and raise his/her the goods
  56. Player.update(goods=Player.goods + amount, coins=Player.coins - price).where(Player.id == buy_id).execute()
  57. except Exception as err:
  58. txn.rollback()
  59. print(f'something went wrong: {err}')
  60. else:
  61. txn.commit()
  62. print("trade success")
  63. def trade_example() -> None:
  64. # create two players
  65. # player 1: id is "1", has only 100 coins.
  66. # player 2: id is "2", has 114514 coins, and 20 goods.
  67. Player.create(id="1", coins=100, goods=0)
  68. Player.create(id="2", coins=114514, goods=20)
  69. # player 1 wants to buy 10 goods from player 2.
  70. # it will cost 500 coins, but player 1 cannot afford it.
  71. # so this trade will fail, and nobody will lose their coins or goods
  72. trade(sell_id="2", buy_id="1", amount=10, price=500)
  73. # then player 1 has to reduce the incoming quantity to 2.
  74. # this trade will be successful
  75. trade(sell_id="2", buy_id="1", amount=2, price=100)
  76. # let's take a look for player 1 and player 2 currently
  77. after_trade_players = Player.select().where(Player.id.in_(["1", "2"]))
  78. for player in after_trade_players:
  79. print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
  80. db.connect()
  81. # recreate the player table
  82. db.drop_tables([Player])
  83. db.create_tables([Player])
  84. simple_example()
  85. trade_example()

相较于直接使用 Driver,peewee 屏蔽了创建数据库连接时,不同数据库差异的细节。peewee 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。

Player 类为数据库表在程序内的映射。Player 的每个属性都对应着 player 表的一个字段。peewee 使用 Player 类为了给 peewee 提供更多的信息,使用了形如以上示例中的 id = CharField(max_length=36, primary_key=True) 的类型定义,用来指示字段类型和其附加属性。id = CharField(max_length=36, primary_key=True) 表示 id 字段为 CharField 类型,对应数据库类型为 VARCHAR,长度为 36,且为主键。

关于 peewee 的更多使用方法,你可以参考 peewee 官网

mysqlclient 为当前比较流行的开源 Python Driver 之一。此处将以 mysqlclient 2.1.1 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

  1. import uuid
  2. from typing import List
  3. import MySQLdb
  4. from MySQLdb import Connection
  5. from MySQLdb.cursors import Cursor
  6. def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
  7. return MySQLdb.connect(
  8. host="127.0.0.1",
  9. port=4000,
  10. user="root",
  11. password="",
  12. database="test",
  13. autocommit=autocommit
  14. )
  15. def create_player(cursor: Cursor, player: tuple) -> None:
  16. cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
  17. def get_player(cursor: Cursor, player_id: str) -> tuple:
  18. cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
  19. return cursor.fetchone()
  20. def get_players_with_limit(cursor: Cursor, limit: int) -> List[tuple]:
  21. cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
  22. return cursor.fetchall()
  23. def random_player(amount: int) -> List[tuple]:
  24. players = []
  25. for _ in range(amount):
  26. players.append((uuid.uuid4(), 10000, 10000))
  27. return players
  28. def bulk_create_player(cursor: Cursor, players: List[tuple]) -> None:
  29. cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
  30. def get_count(cursor: Cursor) -> None:
  31. cursor.execute("SELECT count(*) FROM player")
  32. return cursor.fetchone()[0]
  33. def trade_check(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
  34. get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
  35. # sell player goods check
  36. cursor.execute(get_player_with_lock_sql, (sell_id,))
  37. _, sell_goods = cursor.fetchone()
  38. if sell_goods < amount:
  39. print(f'sell player {sell_id} goods not enough')
  40. return False
  41. # buy player coins check
  42. cursor.execute(get_player_with_lock_sql, (buy_id,))
  43. buy_coins, _ = cursor.fetchone()
  44. if buy_coins < price:
  45. print(f'buy player {buy_id} coins not enough')
  46. return False
  47. def trade_update(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
  48. update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
  49. # deduct the goods of seller, and raise his/her the coins
  50. cursor.execute(update_player_sql, (-amount, price, sell_id))
  51. # deduct the coins of buyer, and raise his/her the goods
  52. cursor.execute(update_player_sql, (amount, -price, buy_id))
  53. def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
  54. with connection.cursor() as cursor:
  55. if trade_check(cursor, sell_id, buy_id, amount, price) is False:
  56. connection.rollback()
  57. return
  58. try:
  59. trade_update(cursor, sell_id, buy_id, amount, price)
  60. except Exception as err:
  61. connection.rollback()
  62. print(f'something went wrong: {err}')
  63. else:
  64. connection.commit()
  65. print("trade success")
  66. def simple_example() -> None:
  67. with get_connection(autocommit=True) as conn:
  68. with conn.cursor() as cur:
  69. # create a player, who has a coin and a goods.
  70. create_player(cur, ("test", 1, 1))
  71. # get this player, and print it.
  72. test_player = get_player(cur, "test")
  73. print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
  74. # create players with bulk inserts.
  75. # insert 1919 players totally, with 114 players per batch.
  76. # each player has a random UUID
  77. player_list = random_player(1919)
  78. for idx in range(0, len(player_list), 114):
  79. bulk_create_player(cur, player_list[idx:idx + 114])
  80. # print the number of players
  81. count = get_count(cur)
  82. print(f'number of players: {count}')
  83. # print 3 players.
  84. three_players = get_players_with_limit(cur, 3)
  85. for player in three_players:
  86. print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
  87. def trade_example() -> None:
  88. with get_connection(autocommit=False) as conn:
  89. with conn.cursor() as cur:
  90. # create two players
  91. # player 1: id is "1", has only 100 coins.
  92. # player 2: id is "2", has 114514 coins, and 20 goods.
  93. create_player(cur, ("1", 100, 0))
  94. create_player(cur, ("2", 114514, 20))
  95. conn.commit()
  96. # player 1 wants to buy 10 goods from player 2.
  97. # it will cost 500 coins, but player 1 cannot afford it.
  98. # so this trade will fail, and nobody will lose their coins or goods
  99. trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
  100. # then player 1 has to reduce the incoming quantity to 2.
  101. # this trade will be successful
  102. trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
  103. # let's take a look for player 1 and player 2 currently
  104. with conn.cursor() as cur:
  105. _, player1_coin, player1_goods = get_player(cur, "1")
  106. print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
  107. _, player2_coin, player2_goods = get_player(cur, "2")
  108. print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
  109. simple_example()
  110. trade_example()

Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以元组 (tuple) 进行表示。

关于 mysqlclient 的更多使用方法,你可以参考 mysqlclient 官方文档

PyMySQL 为当前比较流行的开源 Python Driver 之一。此处将以 PyMySQL 1.0.2 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

  1. import uuid
  2. from typing import List
  3. import pymysql.cursors
  4. from pymysql import Connection
  5. from pymysql.cursors import DictCursor
  6. def get_connection(autocommit: bool = False) -> Connection:
  7. return pymysql.connect(host='127.0.0.1',
  8. port=4000,
  9. user='root',
  10. password='',
  11. database='test',
  12. cursorclass=DictCursor,
  13. autocommit=autocommit)
  14. def create_player(cursor: DictCursor, player: tuple) -> None:
  15. cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
  16. def get_player(cursor: DictCursor, player_id: str) -> dict:
  17. cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
  18. return cursor.fetchone()
  19. def get_players_with_limit(cursor: DictCursor, limit: int) -> tuple:
  20. cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
  21. return cursor.fetchall()
  22. def random_player(amount: int) -> List[tuple]:
  23. players = []
  24. for _ in range(amount):
  25. players.append((uuid.uuid4(), 10000, 10000))
  26. return players
  27. def bulk_create_player(cursor: DictCursor, players: List[tuple]) -> None:
  28. cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
  29. def get_count(cursor: DictCursor) -> int:
  30. cursor.execute("SELECT count(*) as count FROM player")
  31. return cursor.fetchone()['count']
  32. def trade_check(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
  33. get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
  34. # sell player goods check
  35. cursor.execute(get_player_with_lock_sql, (sell_id,))
  36. seller = cursor.fetchone()
  37. if seller['goods'] < amount:
  38. print(f'sell player {sell_id} goods not enough')
  39. return False
  40. # buy player coins check
  41. cursor.execute(get_player_with_lock_sql, (buy_id,))
  42. buyer = cursor.fetchone()
  43. if buyer['coins'] < price:
  44. print(f'buy player {buy_id} coins not enough')
  45. return False
  46. def trade_update(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
  47. update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
  48. # deduct the goods of seller, and raise his/her the coins
  49. cursor.execute(update_player_sql, (-amount, price, sell_id))
  50. # deduct the coins of buyer, and raise his/her the goods
  51. cursor.execute(update_player_sql, (amount, -price, buy_id))
  52. def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
  53. with connection.cursor() as cursor:
  54. if trade_check(cursor, sell_id, buy_id, amount, price) is False:
  55. connection.rollback()
  56. return
  57. try:
  58. trade_update(cursor, sell_id, buy_id, amount, price)
  59. except Exception as err:
  60. connection.rollback()
  61. print(f'something went wrong: {err}')
  62. else:
  63. connection.commit()
  64. print("trade success")
  65. def simple_example() -> None:
  66. with get_connection(autocommit=True) as connection:
  67. with connection.cursor() as cur:
  68. # create a player, who has a coin and a goods.
  69. create_player(cur, ("test", 1, 1))
  70. # get this player, and print it.
  71. test_player = get_player(cur, "test")
  72. print(test_player)
  73. # create players with bulk inserts.
  74. # insert 1919 players totally, with 114 players per batch.
  75. # each player has a random UUID
  76. player_list = random_player(1919)
  77. for idx in range(0, len(player_list), 114):
  78. bulk_create_player(cur, player_list[idx:idx + 114])
  79. # print the number of players
  80. count = get_count(cur)
  81. print(f'number of players: {count}')
  82. # print 3 players.
  83. three_players = get_players_with_limit(cur, 3)
  84. for player in three_players:
  85. print(player)
  86. def trade_example() -> None:
  87. with get_connection(autocommit=False) as connection:
  88. with connection.cursor() as cur:
  89. # create two players
  90. # player 1: id is "1", has only 100 coins.
  91. # player 2: id is "2", has 114514 coins, and 20 goods.
  92. create_player(cur, ("1", 100, 0))
  93. create_player(cur, ("2", 114514, 20))
  94. connection.commit()
  95. # player 1 wants to buy 10 goods from player 2.
  96. # it will cost 500 coins, but player 1 cannot afford it.
  97. # so this trade will fail, and nobody will lose their coins or goods
  98. trade(connection, sell_id="2", buy_id="1", amount=10, price=500)
  99. # then player 1 has to reduce the incoming quantity to 2.
  100. # this trade will be successful
  101. trade(connection, sell_id="2", buy_id="1", amount=2, price=100)
  102. # let's take a look for player 1 and player 2 currently
  103. with connection.cursor() as cur:
  104. print(get_player(cur, "1"))
  105. print(get_player(cur, "2"))
  106. simple_example()
  107. trade_example()

Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以 dict 进行表示。

关于 PyMySQL 的更多使用方法,你可以参考 PyMySQL 官方文档

mysql-connector-python 为当前比较流行的开源 Python Driver 之一。此处将以 mysql-connector-python 8.0.31 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

  1. import uuid
  2. from typing import List
  3. from mysql.connector import connect, MySQLConnection
  4. from mysql.connector.cursor import MySQLCursor
  5. def get_connection(autocommit: bool = True) -> MySQLConnection:
  6. connection = connect(host='127.0.0.1',
  7. port=4000,
  8. user='root',
  9. password='',
  10. database='test')
  11. connection.autocommit = autocommit
  12. return connection
  13. def create_player(cursor: MySQLCursor, player: tuple) -> None:
  14. cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
  15. def get_player(cursor: MySQLCursor, player_id: str) -> tuple:
  16. cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
  17. return cursor.fetchone()
  18. def get_players_with_limit(cursor: MySQLCursor, limit: int) -> List[tuple]:
  19. cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
  20. return cursor.fetchall()
  21. def random_player(amount: int) -> List[tuple]:
  22. players = []
  23. for _ in range(amount):
  24. players.append((str(uuid.uuid4()), 10000, 10000))
  25. return players
  26. def bulk_create_player(cursor: MySQLCursor, players: List[tuple]) -> None:
  27. cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
  28. def get_count(cursor: MySQLCursor) -> int:
  29. cursor.execute("SELECT count(*) FROM player")
  30. return cursor.fetchone()[0]
  31. def trade_check(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
  32. get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
  33. # sell player goods check
  34. cursor.execute(get_player_with_lock_sql, (sell_id,))
  35. _, sell_goods = cursor.fetchone()
  36. if sell_goods < amount:
  37. print(f'sell player {sell_id} goods not enough')
  38. return False
  39. # buy player coins check
  40. cursor.execute(get_player_with_lock_sql, (buy_id,))
  41. buy_coins, _ = cursor.fetchone()
  42. if buy_coins < price:
  43. print(f'buy player {buy_id} coins not enough')
  44. return False
  45. def trade_update(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
  46. update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
  47. # deduct the goods of seller, and raise his/her the coins
  48. cursor.execute(update_player_sql, (-amount, price, sell_id))
  49. # deduct the coins of buyer, and raise his/her the goods
  50. cursor.execute(update_player_sql, (amount, -price, buy_id))
  51. def trade(connection: MySQLConnection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
  52. with connection.cursor() as cursor:
  53. if trade_check(cursor, sell_id, buy_id, amount, price) is False:
  54. connection.rollback()
  55. return
  56. try:
  57. trade_update(cursor, sell_id, buy_id, amount, price)
  58. except Exception as err:
  59. connection.rollback()
  60. print(f'something went wrong: {err}')
  61. else:
  62. connection.commit()
  63. print("trade success")
  64. def simple_example() -> None:
  65. with get_connection(autocommit=True) as connection:
  66. with connection.cursor() as cur:
  67. # create a player, who has a coin and a goods.
  68. create_player(cur, ("test", 1, 1))
  69. # get this player, and print it.
  70. test_player = get_player(cur, "test")
  71. print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
  72. # create players with bulk inserts.
  73. # insert 1919 players totally, with 114 players per batch.
  74. # each player has a random UUID
  75. player_list = random_player(1919)
  76. for idx in range(0, len(player_list), 114):
  77. bulk_create_player(cur, player_list[idx:idx + 114])
  78. # print the number of players
  79. count = get_count(cur)
  80. print(f'number of players: {count}')
  81. # print 3 players.
  82. three_players = get_players_with_limit(cur, 3)
  83. for player in three_players:
  84. print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
  85. def trade_example() -> None:
  86. with get_connection(autocommit=False) as conn:
  87. with conn.cursor() as cur:
  88. # create two players
  89. # player 1: id is "1", has only 100 coins.
  90. # player 2: id is "2", has 114514 coins, and 20 goods.
  91. create_player(cur, ("1", 100, 0))
  92. create_player(cur, ("2", 114514, 20))
  93. conn.commit()
  94. # player 1 wants to buy 10 goods from player 2.
  95. # it will cost 500 coins, but player 1 cannot afford it.
  96. # so this trade will fail, and nobody will lose their coins or goods
  97. trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
  98. # then player 1 has to reduce the incoming quantity to 2.
  99. # this trade will be successful
  100. trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
  101. # let's take a look for player 1 and player 2 currently
  102. with conn.cursor() as cur:
  103. _, player1_coin, player1_goods = get_player(cur, "1")
  104. print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
  105. _, player2_coin, player2_goods = get_player(cur, "2")
  106. print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
  107. simple_example()
  108. trade_example()

Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以 tuple 进行表示。

关于 mysql-connector-python 的更多使用方法,你可以参考 mysql-connector-python 官方文档

第 3 步:运行代码

本节将逐步介绍代码的运行方法。

第 3 步第 1 部分:表初始化

Python - 图2

小贴士

在 Gitpod Playground 中尝试 Python 与 TiDB 的连接:现在就试试

本示例需手动初始化表,若你使用本地集群,可直接运行:

  • MySQL CLI
  • MyCLI
  1. mysql --host 127.0.0.1 --port 4000 -u root < player_init.sql
  1. mycli --host 127.0.0.1 --port 4000 -u root --no-warn < player_init.sql

若不使用本地集群,或未安装命令行客户端,请用喜欢的方式(如 Navicat、DBeaver 等 GUI 工具)直接登录集群,并运行 player_init.sql 文件内的 SQL 语句。

第 3 步第 2 部分:TiDB Cloud 更改参数

若你使用了 TiDB Cloud Serverless Tier 集群,此处需使用系统本地的 CA 证书,并将证书路径记为 <ca_path> 以供后续指代。请参考以下系统相关的证书路径地址:

  • MacOS / Alpine
  • Debian / Ubuntu / Arch
  • RedHat / Fedora / CentOS / Mageia
  • OpenSUSE

/etc/ssl/cert.pem

/etc/ssl/certs/ca-certificates.crt

/etc/pki/tls/certs/ca-bundle.crt

/etc/ssl/ca-bundle.pem

若设置后仍有证书错误,请查阅 TiDB Cloud Serverless Tier 安全连接文档

  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python

若你使用 TiDB Cloud Serverless Tier 集群,更改 sqlalchemy_example.pycreate_engine 函数的入参:

  1. engine = create_engine('mysql://root:@127.0.0.1:4000/test')

若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 create_engine 更改为:

  1. engine = create_engine('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', connect_args={
  2. "ssl_mode": "VERIFY_IDENTITY",
  3. "ssl": {
  4. "ca": "<ca_path>"
  5. }
  6. })

若你使用 TiDB Cloud Serverless Tier 集群,更改 peewee_example.pyconnect 函数的入参:

  1. db = connect('mysql://root:@127.0.0.1:4000/test')

若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 connect 更改为:

  • peewee 将 PyMySQL 作为 Driver 时:

    1. db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test',
    2. ssl_verify_cert=True, ssl_ca="<ca_path>")
  • peewee 将 mysqlclient 作为 Driver 时:

    1. db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test',
    2. ssl_mode="VERIFY_IDENTITY", ssl={"ca": "<ca_path>"})

由于 peewee 会将参数透传至 Driver 中,使用 peewee 时请注意 Driver 的使用类型。

若你使用 TiDB Cloud Serverless Tier 集群,更改 mysqlclient_example.pyget_connection 函数:

  1. def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
  2. return MySQLdb.connect(
  3. host="127.0.0.1",
  4. port=4000,
  5. user="root",
  6. password="",
  7. database="test",
  8. autocommit=autocommit
  9. )

若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 get_connection 更改为:

  1. def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
  2. return MySQLdb.connect(
  3. host="xxx.tidbcloud.com",
  4. port=4000,
  5. user="2aEp24QWEDLqRFs.root",
  6. password="123456",
  7. database="test",
  8. autocommit=autocommit,
  9. ssl_mode="VERIFY_IDENTITY",
  10. ssl={
  11. "ca": "<ca_path>"
  12. }
  13. )

若你使用 TiDB Cloud Serverless Tier 集群,更改 pymysql_example.pyget_connection 函数:

  1. def get_connection(autocommit: bool = False) -> Connection:
  2. return pymysql.connect(host='127.0.0.1',
  3. port=4000,
  4. user='root',
  5. password='',
  6. database='test',
  7. cursorclass=DictCursor,
  8. autocommit=autocommit)

若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 get_connection 更改为:

  1. def get_connection(autocommit: bool = False) -> Connection:
  2. return pymysql.connect(host='xxx.tidbcloud.com',
  3. port=4000,
  4. user='2aEp24QWEDLqRFs.root',
  5. password='123546',
  6. database='test',
  7. cursorclass=DictCursor,
  8. autocommit=autocommit,
  9. ssl_ca='<ca_path>',
  10. ssl_verify_cert=True,
  11. ssl_verify_identity=True)

若你使用 TiDB Cloud Serverless Tier 集群,更改 mysql_connector_python_example.pyget_connection 函数:

  1. def get_connection(autocommit: bool = True) -> MySQLConnection:
  2. connection = connect(host='127.0.0.1',
  3. port=4000,
  4. user='root',
  5. password='',
  6. database='test')
  7. connection.autocommit = autocommit
  8. return connection

若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 get_connection 更改为:

  1. def get_connection(autocommit: bool = True) -> MySQLConnection:
  2. connection = connect(
  3. host="xxx.tidbcloud.com",
  4. port=4000,
  5. user="2aEp24QWEDLqRFs.root",
  6. password="123456",
  7. database="test",
  8. autocommit=autocommit,
  9. ssl_ca='<ca_path>',
  10. ssl_verify_identity=True
  11. )
  12. connection.autocommit = autocommit
  13. return connection

第 3 步第 3 部分:运行

运行前请先安装依赖:

  1. pip3 install -r requirement.txt

当以后需要多次运行脚本时,请在每次运行前先依照表初始化一节再次进行表初始化。

  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python
  1. python3 sqlalchemy_example.py
  1. python3 peewee_example.py
  1. python3 mysqlclient_example.py
  1. python3 pymysql_example.py
  1. python3 mysql_connector_python_example.py

第 4 步:预期输出

  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python

SQLAlchemy 预期输出

peewee 预期输出

mysqlclient 预期输出

PyMySQL 预期输出

mysql-connector-python 预期输出