使用 MPP 模式

本文档介绍 TiFlash 的 MPP 模式及其使用方法。

TiFlash 支持 MPP 模式的查询执行,即在计算中引入跨节点的数据交换(data shuffle 过程)。TiDB 默认由优化器自动选择是否使用 MPP 模式,你可以通过修改变量 tidb_allow_mpptidb_enforce_mpp 的值来更改选择策略。

控制是否选择 MPP 模式

变量 tidb_allow_mpp 控制 TiDB 能否选择 MPP 模式执行查询。变量 tidb_enforce_mpp 控制是否忽略优化器代价估算,强制使用 TiFlash 的 MPP 模式执行查询。

这两个变量所有取值对应的结果如下:

tidb_allow_mpp=off tidb_allow_mpp=on(默认)
tidb_enforce_mpp=off(默认) 不使用 MPP 模式。 优化器根据代价估算选择。(默认)
tidb_enforce_mpp=on 不使用 MPP 模式。 TiDB 无视代价估算,选择 MPP 模式。

例如,如果你不想使用 MPP 模式,可以通过以下语句来设置:

  1. set @@session.tidb_allow_mpp=0;

如果想要通过优化器代价估算来智能选择是否使用 MPP(默认情况),可以通过如下语句来设置:

  1. set @@session.tidb_allow_mpp=1;
  2. set @@session.tidb_enforce_mpp=0;

如果想要 TiDB 忽略优化器的代价估算,强制使用 MPP,可以通过如下语句来设置:

  1. set @@session.tidb_allow_mpp=1;
  2. set @@session.tidb_enforce_mpp=1;

Session 变量 tidb_enforce_mpp 的初始值等于这台 tidb-server 实例的 enforce-mpp 配置项值(默认为 false)。在一个 TiDB 集群中,如果有若干台 tidb-server 实例只执行分析型查询,要确保它们能够选中 MPP 模式,你可以将它们的 enforce-mpp 配置值修改为 true.

注意:

tidb_enforce_mpp=1 在生效时,TiDB 优化器会忽略代价估算选择 MPP 模式。但如果存在其它不支持 MPP 的因素,例如没有 TiFlash 副本、TiFlash 副本同步未完成、语句中含有 MPP 模式不支持的算子或函数等,那么 TiDB 仍然不会选择 MPP 模式。

如果由于代价估算之外的原因导致 TiDB 优化器无法选择 MPP,在你使用 EXPLAIN 语句查看执行计划时,会返回警告说明原因,例如:

  1. set @@session.tidb_enforce_mpp=1;
  2. create table t(a int);
  3. explain select count(*) from t;
  4. show warnings;
  1. +---------+------+-----------------------------------------------------------------------------+
  2. | Level | Code | Message |
  3. +---------+------+-----------------------------------------------------------------------------+
  4. | Warning | 1105 | MPP mode may be blocked because there aren't tiflash replicas of table `t`. |
  5. +---------+------+-----------------------------------------------------------------------------+

MPP 模式的算法支持

MPP 模式目前支持的物理算法有:Broadcast Hash Join、Shuffled Hash Join、 Shuffled Hash Aggregation、Union All、 TopN 和 Limit。算法的选择由优化器自动判断。通过 EXPLAIN 语句可以查看具体的查询执行计划。如果 EXPLAIN 语句的结果中出现 ExchangeSender 和 ExchangeReceiver 算子,表明 MPP 已生效。

以 TPC-H 测试集中的表结构为例:

  1. mysql> explain select count(*) from customer c join nation n on c.c_nationkey=n.n_nationkey;
  2. +------------------------------------------+------------+-------------------+---------------+----------------------------------------------------------------------------+
  3. | id | estRows | task | access object | operator info |
  4. +------------------------------------------+------------+-------------------+---------------+----------------------------------------------------------------------------+
  5. | HashAgg_23 | 1.00 | root | | funcs:count(Column#16)->Column#15 |
  6. | └─TableReader_25 | 1.00 | root | | data:ExchangeSender_24 |
  7. | └─ExchangeSender_24 | 1.00 | batchCop[tiflash] | | ExchangeType: PassThrough |
  8. | └─HashAgg_12 | 1.00 | batchCop[tiflash] | | funcs:count(1)->Column#16 |
  9. | └─HashJoin_17 | 3000000.00 | batchCop[tiflash] | | inner join, equal:[eq(tpch.nation.n_nationkey, tpch.customer.c_nationkey)] |
  10. | ├─ExchangeReceiver_21(Build) | 25.00 | batchCop[tiflash] | | |
  11. | └─ExchangeSender_20 | 25.00 | batchCop[tiflash] | | ExchangeType: Broadcast |
  12. | └─TableFullScan_18 | 25.00 | batchCop[tiflash] | table:n | keep order:false |
  13. | └─TableFullScan_22(Probe) | 3000000.00 | batchCop[tiflash] | table:c | keep order:false |
  14. +------------------------------------------+------------+-------------------+---------------+----------------------------------------------------------------------------+
  15. 9 rows in set (0.00 sec)

在执行计划中,出现了 ExchangeReceiverExchangeSender 算子。该执行计划表示 nation 表读取完毕后,经过 ExchangeSender 算子广播到各个节点中,与 customer 表先后进行 HashJoinHashAgg 操作,再将结果返回至 TiDB 中。

TiFlash 提供了两个全局/会话变量决定是否选择 Broadcast Hash Join,分别为:

  • tidb_broadcast_join_threshold_size,单位为 bytes。如果表大小(字节数)小于该值,则选择 Broadcast Hash Join 算法。否则选择 Shuffled Hash Join 算法。
  • tidb_broadcast_join_threshold_count,单位为行数。如果 join 的对象为子查询,优化器无法估计子查询结果集大小,在这种情况下通过结果集行数判断。如果子查询的行数估计值小于该变量,则选择 Broadcast Hash Join 算法。否则选择 Shuffled Hash Join 算法。

MPP 模式访问分区表

如果希望使用 MPP 模式访问分区表,需要先开启动态裁剪模式

示例如下:

  1. mysql> DROP TABLE if exists test.employees;
  2. Query OK, 0 rows affected, 1 warning (0.00 sec)
  3. mysql> CREATE TABLE test.employees
  4. (id int(11) NOT NULL,
  5. fname varchar(30) DEFAULT NULL,
  6. lname varchar(30) DEFAULT NULL,
  7. hired date NOT NULL DEFAULT '1970-01-01',
  8. separated date DEFAULT '9999-12-31',
  9. job_code int DEFAULT NULL,
  10. store_id int NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
  11. PARTITION BY RANGE (store_id)
  12. (PARTITION p0 VALUES LESS THAN (6),
  13. PARTITION p1 VALUES LESS THAN (11),
  14. PARTITION p2 VALUES LESS THAN (16),
  15. PARTITION p3 VALUES LESS THAN (MAXVALUE));
  16. Query OK, 0 rows affected (0.10 sec)
  17. mysql> ALTER table test.employees SET tiflash replica 1;
  18. Query OK, 0 rows affected (0.09 sec)
  19. mysql> SET tidb_partition_prune_mode=static;
  20. Query OK, 0 rows affected (0.00 sec)
  21. mysql> explain SELECT count(*) FROM test.employees;
  22. +----------------------------------+----------+-------------------+-------------------------------+-----------------------------------+
  23. | id | estRows | task | access object | operator info |
  24. +----------------------------------+----------+-------------------+-------------------------------+-----------------------------------+
  25. | HashAgg_18 | 1.00 | root | | funcs:count(Column#10)->Column#9 |
  26. | └─PartitionUnion_20 | 4.00 | root | | |
  27. | ├─StreamAgg_35 | 1.00 | root | | funcs:count(Column#12)->Column#10 |
  28. | └─TableReader_36 | 1.00 | root | | data:StreamAgg_26 |
  29. | └─StreamAgg_26 | 1.00 | batchCop[tiflash] | | funcs:count(1)->Column#12 |
  30. | └─TableFullScan_34 | 10000.00 | batchCop[tiflash] | table:employees, partition:p0 | keep order:false, stats:pseudo |
  31. | ├─StreamAgg_52 | 1.00 | root | | funcs:count(Column#14)->Column#10 |
  32. | └─TableReader_53 | 1.00 | root | | data:StreamAgg_43 |
  33. | └─StreamAgg_43 | 1.00 | batchCop[tiflash] | | funcs:count(1)->Column#14 |
  34. | └─TableFullScan_51 | 10000.00 | batchCop[tiflash] | table:employees, partition:p1 | keep order:false, stats:pseudo |
  35. | ├─StreamAgg_69 | 1.00 | root | | funcs:count(Column#16)->Column#10 |
  36. | └─TableReader_70 | 1.00 | root | | data:StreamAgg_60 |
  37. | └─StreamAgg_60 | 1.00 | batchCop[tiflash] | | funcs:count(1)->Column#16 |
  38. | └─TableFullScan_68 | 10000.00 | batchCop[tiflash] | table:employees, partition:p2 | keep order:false, stats:pseudo |
  39. | └─StreamAgg_86 | 1.00 | root | | funcs:count(Column#18)->Column#10 |
  40. | └─TableReader_87 | 1.00 | root | | data:StreamAgg_77 |
  41. | └─StreamAgg_77 | 1.00 | batchCop[tiflash] | | funcs:count(1)->Column#18 |
  42. | └─TableFullScan_85 | 10000.00 | batchCop[tiflash] | table:employees, partition:p3 | keep order:false, stats:pseudo |
  43. +----------------------------------+----------+-------------------+-------------------------------+-----------------------------------+
  44. 18 rows in set (0,00 sec)
  45. mysql> SET tidb_partition_prune_mode=dynamic;
  46. Query OK, 0 rows affected (0.00 sec)
  47. mysql> explain SELECT count(*) FROM test.employees;
  48. +------------------------------+----------+--------------+-----------------+---------------------------------------------------------+
  49. | id | estRows | task | access object | operator info |
  50. +------------------------------+----------+--------------+-----------------+---------------------------------------------------------+
  51. | HashAgg_17 | 1.00 | root | | funcs:count(Column#11)->Column#9 |
  52. | └─TableReader_19 | 1.00 | root | partition:all | data:ExchangeSender_18 |
  53. | └─ExchangeSender_18 | 1.00 | mpp[tiflash] | | ExchangeType: PassThrough |
  54. | └─HashAgg_8 | 1.00 | mpp[tiflash] | | funcs:count(1)->Column#11 |
  55. | └─TableFullScan_16 | 10000.00 | mpp[tiflash] | table:employees | keep order:false, stats:pseudo, PartitionTableScan:true |
  56. +------------------------------+----------+--------------+-----------------+---------------------------------------------------------+
  57. 5 rows in set (0,00 sec)