Database · 最佳实践 · 高性能 Hash Join 算法实现简述

Hash Join 算法

Hash join 是利用 hash 函数来实现和加速数据库中 join 操作的一类算法。其主要优势来源于 hash 函数可以只通过一次运算就将任意键值映射到固定大小、固定值域的 hash 值。在实践中,针对等值 join 所需的等值比较,一般数据库系统会仔细选择和优化 hash 函数或函数簇,使其能够快速缩小需要和一个键值进行等值比较的其它键值的数量或范围,从而实现了通过减少计算量、内外存访问量等手段来降低 join 算法的执行开销。

Simple Hash Join

经典的 hash join 算法(又称 Simple Hash Join,SHJ)包括两步:

  • Build:选择两个输入 relation 中 cardinality 较小的一个(一般称其为 build relation),使用一个或一簇 hash 函数将其中的每一条记录的主键 key 值计算为一个 hash 值,然后根据 hash 值将该记录插入到一张表中,这张表就叫做 hash 表;
  • Probe:选择另一个 cardinality 较大的 relation (一般称为 probe relation),针对其中的每一条记录,使用和 build 中相同的 hash 函数,计算出相应的 hash 值,然后根据 hash 值在 hash 表中寻找到需要比较的记录,一一比较,得到最终结果。 经典的 SHJ 算法步骤简单直接,在过去的很长一段时间内是首选的 hash join 算法,但也需要拥抱变化。进入 21 世纪后,随着现代处理器的发展逐渐遇到了 frequency wall, memory wall 等瓶颈,处理器从高频单核架构逐渐演化为较低频的多核架构。此外,随着内存容量的不断增高,纯内存的分析型数据库查询逐渐流行,join 算法的执行开销瓶颈从 I/O 逐渐变为 CPU 和内存开销。因为 SHJ 算法中往 hash 表中插入记录和从 hash 表中读取记录往往都有大量的随机内存读写,而随机访问的代价在各个软硬件层面上都比顺序访问高,SHJ 算法在多核 CPU 处理器上逐渐尽显疲态。

    Partitioned Hash Join

    Partitioned Hash Join (分区 hash join,PHJ)算法在多核架构上逐渐取得了性能上的优势。和 SHJ 算法相比,PHJ 算法共用了完全相同的 build 和 probe 两个步骤,但是 PHJ 在 build 之前会先对输入 relation 进行 partition,然后针对每个 partition 应用 SHJ 算法。在这个算法过程中,我们有很多不同的选择:

  • 只对单个 relation 进行分区,还是对两个 relation 同时进行分区?

  • 对于一个 relation,将其分出多少个区来?
  • 给定分区总数,我们是一次性全部分配完成,还是逐级完成(比如从一个国家先分出省,再分出城市,最后分出区县)? 除此以外,在决定了上述 3 个问题以后,在算法的执行层次,也有很多优化空间可以研究(比如多并发、向量化、NUMA-aware、FPGA/GPU 加速等)。所以 hash join 算法的设计和实现在过去的很多年间都是系统界研究的热点。目前我们已经拥有众多详实和成熟的研究成果。

    常见性能问题

    Hash join 算法的常见性能问题包括同步所需的锁开销、cache miss、TLB miss、多线程间负载不均衡、NUMA 等等,下文简述一二。

  • 锁开销:在多线程的 SHJ 算法实现中,由于记录未经过分区,多个线程将操作一个相同的 hash 表,那么很有可能出现多个线程争抢同一个 hash bucket 的情况。为了实现线程间的同步,往往需要为每一个 hash bucket 加锁,那么等锁的开销就会成为一大热点。为了减少锁竞争,可以提高 hash bucket 的数量,减少线程的数量,或者使用乐观的无锁算法。此处往往存在各种 trade-off,比如更多的 hash bucket 数量对于一些 hash 表实现来说(如 linear hash 表)会带来更大的内存空间占用。

  • Cache miss:当 hash 表的数据量超过处理器 cache 大小的时候,对 hash 表的频繁访问有很大概率会遭受 cache miss,不得不访问内存。以 Intel 的某款 CPU 处理器为例,单个 CPU 上,L1 cache 有 32 KB 的数据空间(对比指令 cache),L2 cache 有 256 KB;同时存在多个 CPU 共享的 60 MB 的 L3 cache。对于 GB 级别以上的 hash 表来说,如此的 cache 大小杯水车薪。常见的有效解法就是使用 PHJ 算法,将 relation 不断分区,直至一个分区的 hash 表可以放进 cache 中,但此处分区本身也拥有较高的执行代价开销。实践中依然需要仔细处理此处的 trade-off。有的研究认为在拥有超线程能力的处理器中,因为同个 CPU core 上的多个(如 2 个)硬件线程可以轮流执行,那么可以通过执行一个线程中的计算指令来隐藏另一条线程中的访存 latency 开销,从而减少 cache miss 对总执行代价带来的影响。但对于数据库中的各种操作来说,它们绝大部分的工作往往就是读写数据,这样一来通过计算隐藏访存开销的效果就因为机会不足而十分有限了。
  • TLB miss:在一个 CPU 处理器的内存管理单元(MMU)中,有一个 Translation Lookaside Buffer (TLB)。TLB 往往缓存了最近的从虚拟内存地址到其相应的内存页的转换。如果一个访存操作命中了 TLB,那么就可以直接获得相应的内存页;反之,就必须去内存页表中查询相应的内存页,此处代价就高出很多了。由于 TLB 的 slot 条数是固定的,对于 PHJ 算法来说,如果一次分区操作的扇出(目标分区数量,fanout)高于 TLB 的 slot size,那么除去第一次访存时的 compulsory miss 以后,后续访存也会出现 TLB miss;fanout 越高,TLB miss 越多,多到我们不如我们少分一些区的地步。一种调优办法是进行 multi-pass partition,每个 pass 少分一些区,但是多跑几个 pass 来完成同样的分区总数。显然,multi-pass 会带来更多重复的内存读写,所以此处也存在 trade-off,需要根据实际情况进行优化。
  • ……

    关键实现细节举例

    上面我们简述了基本的算法概念和常见性能问题,现在我们举一个具体的性能优化的代码实现例子:如何使用 SIMD 指令对 hash 冲突的处理进行向量化。 Hash 冲突是 hash 表上的常见现象,即多个记录 hash 到了相同的 hash bucket 中。Hash 冲突对执行开销和内存空间开销的影响在不同的 hash 表实现中是不同的,但常见共性问题包括需要额外的 overflow space 来存储 hash 表本身装不下的有冲突的记录,以及冲突造成的互斥锁等待等。 向量化是常见的 hash join 实现优化技术,根据 Amdahl’s law,其加速效果取决于 hash join 算法的全过程中有多少部分能够被完完全全的向量化,数据尽可能多地存储在向量寄存器中,从而得到更高的加速比。如果对 hash 冲突的处理不能实现向量化,那么向量化的加速效果就会打折扣了。向量化指令一般属于 SIMD 类型(Single Instruction Multiple Data),如果一个向量中的多个 key 值内出现了 hash conflict,理论上就出现了至少两个 branch(无冲突 key 的处理和有冲突 key 的处理),而 SIMD 一定永远在向量上使用相同的指令,此处如何处理理论上的 branch 呢? 解决办法是使用 SIMD 的 gather 和 scatter 命令,以下是 AVX512 中处理 32位整形数据的 SIMD gether 和 scatter 的相应伪代码。

    1. __m512i _mm512_i32gather_epi32 (__m512i vindex, void const* base_addr, int scale)
    2. {
    3. FOR j := 0 to 15
    4. i := j*32
    5. m := j*32
    6. addr := base_addr + SignExtend64(vindex[m+31:m]) * ZeroExtend64(scale) * 8
    7. dst[i+31:i] := MEM[addr+31:addr]
    8. ENDFOR
    9. dst[MAX:512] := 0
    10. }
    1. void _mm512_mask_i32scatter_epi32 (void* base_addr, __mmask16 k, __m512i vindex, __m512i a, int scale)
    2. {
    3. FOR j := 0 to 15
    4. i := j*32
    5. m := j*32
    6. IF k[j]
    7. addr := base_addr + SignExtend64(vindex[m+31:m]) * ZeroExtend64(scale) * 8
    8. MEM[addr+31:addr] := a[i+31:i]
    9. FI
    10. ENDFOR
    11. }

    从中可以看出,对于一个装有 16 个 32 位整形数的 512 位向量来说,虽然表面上看一条读指令在这 16 个数(可以理解为地址 offset)应该同时并行执行,但指令内部其实是存在一个 for loop 的。也就是说,假如这个 for loop 的第 5 个和第 10 个 iteration 都往相同的 offset 写入数据,那么靠后的第 10 个 iteration 会覆盖前面的第 5 个 iteration 写入的结果。 利用这个特性,对于一个存储了 16 个 hash 值(每一个 hash 值对应 array 中的一个 offset 位置)的向量 V0 来说,我们只需要把它按照 scatter 的方式先写进内存中,再通过 gather 读回来获得 V1,然后比较 V0 和 V1 的内容,凡是不相等的 SIMD lane 即为因 hash 冲突而被覆盖的地方。我们只需利用 mask 将其标记出来,放在待处理的向量中留到下个 iteration 再插入 hash 表即可。具体的实现举例如下:

    1. if (size >= 16) do {
    2. // replace keys & payloads processed in the previous iteration with new values from the memory
    3. key = _mm512_mask_expandloadu_epi32 (key, k, &keys[i]);
    4. val = _mm512_mask_expandloadu_epi32 (val, k, &vals[i]);
    5. off = _mm512_mask_xor_epi32(off, k, off, off);
    6. i += _mm_countbits_64(_mm512_kconcatlo_64(blend_0000, k));
    7. // hash keys
    8. __m512i factors = _mm512_mask_blend_epi32(k, mask_factor_2, mask_factor_1);
    9. __m512i buckets = _mm512_mask_blend_epi32(k, mask_buckets_minus_1, mask_buckets);
    10. __m512i hash = simd_hash(key, factors, buckets);
    11. // combine with old offset and fix overflows
    12. off = _mm512_add_epi32(off, hash);
    13. k = _mm512_cmpge_epu32_mask(off, mask_buckets);
    14. off = _mm512_mask_sub_epi32(off, k, off, mask_buckets);
    15. // load keys from table and detect conflicts
    16. __m512i tab = _mm512_i32gather_epi32(off, table, 8);
    17. k = _mm512_cmpeq_epi32_mask(tab, mask_empty);
    18. _mm512_mask_i32scatter_epi32(table, k, off, mask_pack, 8);
    19. tab = _mm512_mask_i32gather_epi32(tab, k, off, table, 8);
    20. k = _mm512_mask_cmpeq_epi32_mask(k, tab, mask_pack);
    21. // mix keys and payloads in pairs
    22. __m512i key_tmp = _mm512_permutevar_epi32(mask_pack, key);
    23. __m512i val_tmp = _mm512_permutevar_epi32(mask_pack, val);
    24. __m512i lo = _mm512_mask_blend_epi32(blend_AAAA, key_tmp, _mm512_swizzle_epi32(val_tmp, _MM_SWIZ_REG_CDAB));
    25. __m512i hi = _mm512_mask_blend_epi32(blend_5555, val_tmp, _mm512_swizzle_epi32(key_tmp, _MM_SWIZ_REG_CDAB));
    26. // store valid pairs
    27. _mm512_mask_i32loscatter_epi64(table, k, off, lo, 8);
    28. __mmask16 rev_k = _mm512_kunpackb (k,k>>8);
    29. __m512i rev_off = _mm512_permute4f128_epi32(off, _MM_PERM_BADC);
    30. _mm512_mask_i32loscatter_epi64(table, rev_k, rev_off, hi, 8);
    31. off = _mm512_add_epi32(off, mask_1);
    32. } while (i <= size_minus_16);