动态过滤

本节介绍openLooKeng动态过滤特性。动态过滤适用于高选择性join场景,即大多数的probe侧的表在读取之后由于不匹配join条件而被过滤掉。 openLooKeng在查询运行时,依靠join条件以及build侧读出的数据,生成动态过滤条件,并作为额外的过滤条件应用到probe侧表的table scan阶段,从而减少参与join操作的probe表的数据量,有效地减少IO读取与网络传输。

适用场景

openLooKeng动态过滤主要应用于高选择性的join场景(包含针对分区表的分区裁剪以及非分区表的行过滤)。openLooKeng动态过滤当前适用于inner joinsemi join 以及right join场景,适用于Hive connectorDC connector以及Memory connector

使用

openLooKeng动态过滤特性依赖于分布式缓存组件,请参考State Store章节配置。

/etc/config.properties需要配置如下参数

  1. enable-dynamic-filtering=true
  2. dynamic-filtering-data-type=BLOOM_FILTER
  3. dynamic-filtering-max-per-driver-size=100MB
  4. dynamic-filtering-max-per-driver-row-count=10000
  5. dynamic-filtering-bloom-filter-fpp=0.1

上述属性说明如下:

  • enable-dynamic-filtering:是否开启动态过滤特性。
  • dynamic-filtering-wait-time:等待动态过滤条件生成的最长等待时间,默认值是1s。
  • dynamic-filtering-data-type:设置动态过滤类型,可选包含BLOOM_FILTER以及HASHSET,默认类型为BLOOM_FILTER
  • dynamic-filtering-max-size: 每个dynamic filter的大小上限,如果预估大小超过设定值,代价优化器不会生成对应的dynamic filter,默认值是1000000。
  • dynamic-filtering-max-per-driver-size:每个driver可以收集的数据大小上限,默认值是1MB。
  • dynamic-filtering-max-per-driver-row-count:每个driver可以收集的数据条目上限,默认值是10000。
  • dynamic-filtering-bloom-filter-fpp:动态过滤使用的bloomfilter的FPP值,默认是0.1。

如果应用于Hive connector,需要对catalog/hive.properties如下修改:

  1. hive.dynamic-filter-partition-filtering=true
  2. hive.dynamic-filtering-row-filtering-threshold=5000

上述属性说明如下:

  • hive.dynamic-filter-partition-filtering:使用动态过滤条件根据分区值进行预先过滤,默认值是false。
  • hive.dynamic-filtering-row-filtering-threshold:如果动态过滤条件大小低于阈值,则应用行过滤,默认值是2000。

执行计划

下面的例子展示了SQL语句如何应用动态过滤条件,在执行计划中标记为dynamicFilter。 可以使用explain命令查看动态过滤是否应用,也可以在webUI中的liveplan查看当前执行是否应用动态过滤。

  1. create table table1 (id integer, year varchar);
  2. create table table2 (id integer, total integer);
  3. insert into table1 values (1, '2019'), (2, '2020'), (3, '2021');
  4. insert into table2 values (1, 100), (2, 200);

Inner join:

  1. explain select t1.id, t1.year from table1 t1, table2 t2 where t1.id = t2.id and t2.total = 200;
  2. Query Plan
  3. ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  4. Output[id, year]
  5. Layout: [id:integer, year:varchar]
  6. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
  7. └─ RemoteExchange[GATHER]
  8. Layout: [year:varchar, id:integer]
  9. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
  10. └─ InnerJoin[("id" = "id_0")][$hashvalue, $hashvalue_9]
  11. Layout: [id:integer, year:varchar]
  12. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
  13. Distribution: PARTITIONED
  14. dynamicFilterAssignments = {id_0 -> 238}
  15. ├─ RemoteExchange[REPARTITION][$hashvalue]
  16. Layout: [id:integer, year:varchar, $hashvalue:bigint]
  17. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
  18. └─ ScanFilterProject[table = memory:0, filterPredicate = true, dynamicFilter = {238 -> "id"}]
  19. Layout: [id:integer, year:varchar, $hashvalue_8:bigint]
  20. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
  21. $hashvalue_8 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id"), 0))
  22. year := 1
  23. id := 0
  24. └─ LocalExchange[HASH][$hashvalue_9] ("id_0")
  25. Layout: [id_0:integer, $hashvalue_9:bigint]
  26. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
  27. └─ RemoteExchange[REPARTITION][$hashvalue_10]
  28. Layout: [id_0:integer, $hashvalue_10:bigint]
  29. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
  30. └─ ScanFilterProject[table = memory:5, filterPredicate = ("total" = 200)]
  31. Layout: [id_0:integer, $hashvalue_11:bigint]
  32. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
  33. $hashvalue_11 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
  34. total := 1
  35. id_0 := 0

Semi join:

  1. explain select * from table1 where id in (select id from table2);
  2. Query Plan
  3. -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  4. Output[id, year]
  5. Layout: [id:integer, year:varchar]
  6. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
  7. └─ RemoteExchange[GATHER]
  8. Layout: [id:integer, year:varchar]
  9. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
  10. └─ FilterProject[filterPredicate = "expr_6"]
  11. Layout: [id:integer, year:varchar]
  12. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}/{rows: ? (?), cpu: ?, memory: ?, network: ?}
  13. └─ Project[]
  14. Layout: [id:integer, year:varchar, expr_6:boolean]
  15. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
  16. └─ SemiJoin[id = id_1][$hashvalue, $hashvalue_16]
  17. Layout: [id:integer, year:varchar, $hashvalue:bigint, expr_6:boolean]
  18. Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
  19. Distribution: PARTITIONED
  20. dynamicFilterId: 279
  21. ├─ RemoteExchange[REPARTITION][$hashvalue]
  22. Layout: [id:integer, year:varchar, $hashvalue:bigint]
  23. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
  24. └─ ScanFilterProject[table = memory:0, filterPredicate = true, dynamicFilter = {279 -> "id"}]
  25. Layout: [id:integer, year:varchar, $hashvalue_15:bigint]
  26. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
  27. $hashvalue_15 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id"), 0))
  28. year := 1
  29. id := 0
  30. └─ LocalExchange[SINGLE] ()
  31. Layout: [id_1:integer, $hashvalue_16:bigint]
  32. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
  33. └─ RemoteExchange[REPARTITION - REPLICATE NULLS AND ANY][$hashvalue_17]
  34. Layout: [id_1:integer, $hashvalue_17:bigint]
  35. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
  36. └─ ScanProject[table = memory:5]
  37. Layout: [id_1:integer, $hashvalue_18:bigint]
  38. Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
  39. $hashvalue_18 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id_1"), 0))
  40. id_1 := 0