TiFlash 支持的计算下推

本文档介绍 TiFlash 支持的计算下推。

支持下推的算子

TiFlash 支持部分算子的下推,支持的算子如下:

  • TableScan:该算子从表中读取数据
  • Selection:该算子对数据进行过滤
  • HashAgg:该算子基于 Hash Aggregation 算法对数据进行聚合运算
  • StreamAgg:该算子基于 Stream Aggregation 算法对数据进行聚合运算。StreamAgg 仅支持不带 GROUP BY 条件的列。
  • TopN:该算子对数据求 TopN 运算
  • Limit:该算子对数据进行 limit 运算
  • Project:该算子对数据进行投影运算
  • HashJoin:该算子基于 Hash Join 算法对数据进行连接运算:
    • 只有在 MPP 模式下才能被下推
    • 支持的 Join 类型包括 Inner Join、Left Join、Semi Join、Anti Semi Join、Left Semi Join、Anti Left Semi Join
    • 对于上述类型,既支持带等值条件的连接,也支持不带等值条件的连接(即 Cartesian Join);在计算 Cartesian Join 时,只会使用 Broadcast 算法,而不会使用 Shuffle Hash Join 算法
  • Window:当前支持下推的窗口函数包括 ROW_NUMBER()RANK()DENSE_RANK()LEAD()LAG()

在 TiDB 中,算子之间会呈现树型组织结构。一个算子能下推到 TiFlash 的前提条件,是该算子的所有子算子都能下推到 TiFlash。因为大部分算子都包含有表达式计算,当且仅当一个算子所包含的所有表达式均支持下推到 TiFlash 时,该算子才有可能下推给 TiFlash。

支持下推的表达式

表达式类型运算
数学函数+, -, /, *, %, >=, <=, =, !=, <, >, ROUND(), ABS(), FLOOR(int), CEIL(int), CEILING(int), SQRT(), LOG(), LOG2(), LOG10(), LN(), EXP(), POW(), SIGN(), RADIANS(), DEGREES(), CONV(), CRC32(), GREATEST(int/real), LEAST(int/real)
逻辑函数算子AND, OR, NOT, CASE WHEN, IF(), IFNULL(), ISNULL(), IN, LIKE, COALESCE, IS
位运算& (bitand), | (bitor), ~ (bitneg), ^ (bitxor)
字符串函数SUBSTR(), CHAR_LENGTH(), REPLACE(), CONCAT(), CONCAT_WS(), LEFT(), RIGHT(), ASCII(), LENGTH(), TRIM(), LTRIM(), RTRIM(), POSITION(), FORMAT(), LOWER(), UCASE(), UPPER(), SUBSTRING_INDEX(), LPAD(), RPAD(), STRCMP()
正则函数和算子REGEXP, REGEXP_LIKE(), REGEXP_INSTR(), REGEXP_SUBSTR()
日期函数DATE_FORMAT(), TIMESTAMPDIFF(), FROM_UNIXTIME(), UNIX_TIMESTAMP(int), UNIX_TIMESTAMP(decimal), STR_TO_DATE(date), STR_TO_DATE(datetime), DATEDIFF(), YEAR(), MONTH(), DAY(), EXTRACT(datetime), DATE(), HOUR(), MICROSECOND(), MINUTE(), SECOND(), SYSDATE(), DATE_ADD/ADDDATE(datetime, int), DATE_ADD/ADDDATE(string, int/real), DATE_SUB/SUBDATE(datetime, int), DATE_SUB/SUBDATE(string, int/real), QUARTER(), DAYNAME(), DAYOFMONTH(), DAYOFWEEK(), DAYOFYEAR(), LAST_DAY(), MONTHNAME(), TO_SECONDS(), TO_DAYS(), FROM_DAYS(), WEEKOFYEAR()
JSON 函数JSON_LENGTH(), ->, ->>, JSON_EXTRACT()
转换函数CAST(int AS DOUBLE), CAST(int AS DECIMAL), CAST(int AS STRING), CAST(int AS TIME), CAST(double AS INT), CAST(double AS DECIMAL), CAST(double AS STRING), CAST(double AS TIME), CAST(string AS INT), CAST(string AS DOUBLE), CAST(string AS DECIMAL), CAST(string AS TIME), CAST(decimal AS INT), CAST(decimal AS STRING), CAST(decimal AS TIME), CAST(time AS INT), CAST(time AS DECIMAL), CAST(time AS STRING), CAST(time AS REAL)
聚合函数MIN(), MAX(), SUM(), COUNT(), AVG(), APPROX_COUNT_DISTINCT(), GROUP_CONCAT()
其他函数INET_NTOA(), INET_ATON(), INET6_NTOA(), INET6_ATON()

下推限制

  • 所有包含 Bit、Set 和 Geometry 类型的表达式均不能下推到 TiFlash
  • DATE_ADD()DATE_SUB()ADDDATE() 以及 SUBDATE() 中的 interval 类型只支持如下几种,如使用了其他类型的 interval,TiFlash 会在运行时报错。
    • DAY
    • WEEK
    • MONTH
    • YEAR
    • HOUR
    • MINUTE
    • SECOND

如查询遇到不支持的下推计算,则需要依赖 TiDB 完成剩余计算,可能会很大程度影响 TiFlash 加速效果。对于暂不支持的算子/表达式,将会在后续版本中陆续支持。

类似 MAX() 这样的函数在聚合算子中支持下推,但是在窗口函数算子中还不支持下推。

示例

以下通过一些例子对下推算子和表达式到 TiFlash 进行说明。

示例 1:下推算子到 TiFlash 存储

  1. CREATE TABLE t(id INT PRIMARY KEY, a INT);
  2. ALTER TABLE t SET TIFLASH REPLICA 1;
  3. EXPLAIN SELECT * FROM t LIMIT 3;
  4. +------------------------------+---------+--------------+---------------+--------------------------------+
  5. | id | estRows | task | access object | operator info |
  6. +------------------------------+---------+--------------+---------------+--------------------------------+
  7. | Limit_9 | 3.00 | root | | offset:0, count:3 |
  8. | └─TableReader_17 | 3.00 | root | | data:ExchangeSender_16 |
  9. | └─ExchangeSender_16 | 3.00 | mpp[tiflash] | | ExchangeType: PassThrough |
  10. | └─Limit_15 | 3.00 | mpp[tiflash] | | offset:0, count:3 |
  11. | └─TableFullScan_14 | 3.00 | mpp[tiflash] | table:t | keep order:false, stats:pseudo |
  12. +------------------------------+---------+--------------+---------------+--------------------------------+
  13. 5 rows in set (0.18 sec)

在该查询中,算子 Limit 被下推到 TiFlash 对数据进行过滤,减少了网络传输数据量,进而减少网络传输开销。具体可查看以上示例中 Limit_15 算子的 task 列,其值为 mpp[tiflash],表示该算子被下推到 TiFlash。

示例 2:下推表达式到 TiFlash 存储

  1. CREATE TABLE t(id INT PRIMARY KEY, a INT);
  2. ALTER TABLE t SET TIFLASH REPLICA 1;
  3. INSERT INTO t(id,a) VALUES (1,2),(2,4),(11,2),(12,4),(13,4),(14,7);
  4. EXPLAIN SELECT MAX(id + a) FROM t GROUP BY a;
  5. +------------------------------------+---------+--------------+---------------+---------------------------------------------------------------------------+
  6. | id | estRows | task | access object | operator info |
  7. +------------------------------------+---------+--------------+---------------+---------------------------------------------------------------------------+
  8. | TableReader_45 | 4.80 | root | | data:ExchangeSender_44 |
  9. | └─ExchangeSender_44 | 4.80 | mpp[tiflash] | | ExchangeType: PassThrough |
  10. | └─Projection_39 | 4.80 | mpp[tiflash] | | Column#3 |
  11. | └─HashAgg_37 | 4.80 | mpp[tiflash] | | group by:Column#9, funcs:max(Column#8)->Column#3 |
  12. | └─Projection_46 | 6.00 | mpp[tiflash] | | plus(test.t.id, test.t.a)->Column#8, test.t.a |
  13. | └─ExchangeReceiver_23 | 6.00 | mpp[tiflash] | | |
  14. | └─ExchangeSender_22 | 6.00 | mpp[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.a, collate: binary] |
  15. | └─TableFullScan_21 | 6.00 | mpp[tiflash] | table:t | keep order:false, stats:pseudo |
  16. +------------------------------------+---------+--------------+---------------+---------------------------------------------------------------------------+
  17. 8 rows in set (0.18 sec)

在该查询中,表达式 id + a 被下推到 TiFlash,从而能提前进行计算,减少网络传输数据量,进而减少网络传输开销,提升整体计算性能。具体可查看以上示例中 operator 列为 plus(test.t.id, test.t.a) 的行的 task 列,其值为 mpp[tiflash],表示该表达式被下推到 TiFlash。

示例 3:下推限制

  1. CREATE TABLE t(id INT PRIMARY KEY, a INT);
  2. ALTER TABLE t SET TIFLASH REPLICA 1;
  3. INSERT INTO t(id,a) VALUES (1,2),(2,4),(11,2),(12,4),(13,4),(14,7);
  4. EXPLAIN SELECT id FROM t WHERE TIME(now()+ a) < '12:00:00';
  5. +-----------------------------+---------+--------------+---------------+--------------------------------------------------------------------------------------------------+
  6. | id | estRows | task | access object | operator info |
  7. +-----------------------------+---------+--------------+---------------+--------------------------------------------------------------------------------------------------+
  8. | Projection_4 | 4.80 | root | | test.t.id |
  9. | └─Selection_6 | 4.80 | root | | lt(cast(time(cast(plus(20230110083056, test.t.a), var_string(20))), var_string(10)), "12:00:00") |
  10. | └─TableReader_11 | 6.00 | root | | data:ExchangeSender_10 |
  11. | └─ExchangeSender_10 | 6.00 | mpp[tiflash] | | ExchangeType: PassThrough |
  12. | └─TableFullScan_9 | 6.00 | mpp[tiflash] | table:t | keep order:false, stats:pseudo |
  13. +-----------------------------+---------+--------------+---------------+--------------------------------------------------------------------------------------------------+
  14. 5 rows in set, 3 warnings (0.20 sec)

分析执行计划可以发现,该查询在执行时只在 TiFlash 中进行了 TableFullScan,其他的函数计算和过滤均在 root 进行,并未下推至 TiFlash。

执行以下命令,可以查找不能下推的算子和表达式。

  1. SHOW WARNINGS;
  2. +---------+------+------------------------------------------------------------------------------------------------------------------------------------+
  3. | Level | Code | Message |
  4. +---------+------+------------------------------------------------------------------------------------------------------------------------------------+
  5. | Warning | 1105 | Scalar function 'time'(signature: Time, return type: time) is not supported to push down to storage layer now. |
  6. | Warning | 1105 | Scalar function 'cast'(signature: CastDurationAsString, return type: var_string(10)) is not supported to push down to tiflash now. |
  7. | Warning | 1105 | Scalar function 'cast'(signature: CastDurationAsString, return type: var_string(10)) is not supported to push down to tiflash now. |
  8. +---------+------+------------------------------------------------------------------------------------------------------------------------------------+
  9. 3 rows in set (0.18 sec)

可以看出,该查询的表达式无法完全下推至 TiFlash,因为 Time 函数和 Cast 函数无法下推至 TiFlash。

示例 4:窗口函数

  1. CREATE TABLE t(id INT PRIMARY KEY, c1 VARCHAR(100));
  2. ALTER TABLE t SET TIFLASH REPLICA 1;
  3. INSERT INTO t VALUES(1,"foo"),(2,"bar"),(3,"bar foo"),(10,"foo"),(20,"bar"),(30,"bar foo");
  4. EXPLAIN SELECT id, ROW_NUMBER() OVER (PARTITION BY id > 10) FROM t;
  5. +----------------------------------+----------+--------------+---------------+---------------------------------------------------------------------------------------------------------------+
  6. | id | estRows | task | access object | operator info |
  7. +----------------------------------+----------+--------------+---------------+---------------------------------------------------------------------------------------------------------------+
  8. | TableReader_30 | 10000.00 | root | | MppVersion: 1, data:ExchangeSender_29 |
  9. | └─ExchangeSender_29 | 10000.00 | mpp[tiflash] | | ExchangeType: PassThrough |
  10. | └─Projection_7 | 10000.00 | mpp[tiflash] | | test.t.id, Column#5, stream_count: 4 |
  11. | └─Window_28 | 10000.00 | mpp[tiflash] | | row_number()->Column#5 over(partition by Column#4 rows between current row and current row), stream_count: 4 |
  12. | └─Sort_14 | 10000.00 | mpp[tiflash] | | Column#4, stream_count: 4 |
  13. | └─ExchangeReceiver_13 | 10000.00 | mpp[tiflash] | | stream_count: 4 |
  14. | └─ExchangeSender_12 | 10000.00 | mpp[tiflash] | | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#4, collate: binary], stream_count: 4 |
  15. | └─Projection_10 | 10000.00 | mpp[tiflash] | | test.t.id, gt(test.t.id, 10)->Column#4 |
  16. | └─TableFullScan_11 | 10000.00 | mpp[tiflash] | table:t | keep order:false, stats:pseudo |
  17. +----------------------------------+----------+--------------+---------------+---------------------------------------------------------------------------------------------------------------+
  18. 9 rows in set (0.0073 sec)

可以看到,Window 操作在 task 列中有一个 mpp[tiflash] 的值,表示 ROW_NUMBER() OVER (PARTITION BY id > 10) 操作能够被下推至 TiFlash。

  1. CREATE TABLE t(id INT PRIMARY KEY, c1 VARCHAR(100));
  2. ALTER TABLE t SET TIFLASH REPLICA 1;
  3. INSERT INTO t VALUES(1,"foo"),(2,"bar"),(3,"bar foo"),(10,"foo"),(20,"bar"),(30,"bar foo");
  4. EXPLAIN SELECT id, MAX(id) OVER (PARTITION BY id > 10) FROM t;
  5. +-----------------------------+----------+-----------+---------------+------------------------------------------------------------+
  6. | id | estRows | task | access object | operator info |
  7. +-----------------------------+----------+-----------+---------------+------------------------------------------------------------+
  8. | Projection_6 | 10000.00 | root | | test.t1.id, Column#5 |
  9. | └─Shuffle_14 | 10000.00 | root | | execution info: concurrency:5, data sources:[Projection_8] |
  10. | └─Window_7 | 10000.00 | root | | max(test.t1.id)->Column#5 over(partition by Column#4) |
  11. | └─Sort_13 | 10000.00 | root | | Column#4 |
  12. | └─Projection_8 | 10000.00 | root | | test.t1.id, gt(test.t1.id, 10)->Column#4 |
  13. | └─TableReader_10 | 10000.00 | root | | data:TableFullScan_9 |
  14. | └─TableFullScan_9 | 10000.00 | cop[tikv] | table:t1 | keep order:false, stats:pseudo |
  15. +-----------------------------+----------+-----------+---------------+------------------------------------------------------------+
  16. 7 rows in set (0.0010 sec)

可以看到,Window 操作在 task 列中有一个 root 的值,表示 MAX(id) OVER (PARTITION BY id > 10) 操作不能被下推至 TiFlash。这是因为,MAX() 只支持作为聚合函数下推,而不支持作为窗口函数下推。