背景

20亿用户,每个用户1000个标签,基于任意标签组合圈选、透视(业务上的需求是一次最多计算100个标签的组合)。

相当于要处理2000亿记录。

1、实时求标签组合的记录数。(即满足标签组合的用户有多少)

2、用户ID。(级满足标签组合的用户ID。)

要求实时响应。

通常你肯定会想,这个至少需要上百台机器来支撑。

但是我要给你一个惊喜,这个数据量,一台RDS PG实例即可。怎么做呢?听我道来,用最少的资源解决业务问题,用到RDS PG黑科技。

RDS PG 解决方案

方案如下:

《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》

优化方案,提高响应速度

1、bitmap切段

2、计算满足条件的USER COUNT值时,并行计算(使用dblink异步调用)

3、求用户ID时,使用游标,流式返回。

DEMO

1、需要用到的插件

  1. create extension dblink;
  2. create extension varbitx;

2、创建标签表,切段,例如20亿个用户,切成400段,每一段5000万个用户BIT。

  1. postgres=# create table t_bitmap (
  2. tagid int, -- 标签ID
  3. ofid int, -- 偏移值, 乘以5000
  4. v varbit -- userid 比特
  5. );
  6. CREATE TABLE

3、创建索引(约束)

  1. create unique index idx_t_bitmap_1 on t_bitmap (tagid, ofid);

4、创建1000个标签的BITMAP数据,每一个标签400条,每条的BIT长度为5000万位。

  1. postgres=# do language plpgsql $$
  2. declare v varbit := repeat('1',5000000)::varbit;
  3. begin
  4. for i in 1..100 loop
  5. for x in 0..399 loop
  6. insert into t_bitmap values (i, x, v);
  7. end loop;
  8. end loop;
  9. end;
  10. $$;
  11. DO
  12. Time: 150468.359 ms (02:30.468)

5、创建生成dblink连接的函数,重复创建不报错。

  1. create or replace function conn(
  2. name, -- dblink名字
  3. text -- 连接串,URL
  4. ) returns void as $$
  5. declare
  6. begin
  7. perform dblink_connect($1, $2);
  8. return;
  9. exception when others then
  10. return;
  11. end;
  12. $$ language plpgsql strict;

6、AND标签组合的并行计算函数(dblink 异步并行),返回USERID透视数。

  1. create or replace function get_bitcount_and(
  2. and_tagids int[], -- 输入标签ID数组
  3. v_bit int, -- 10的比特个数
  4. conn text, -- 连接串
  5. OUT cnt int8 -- 返回值, 多少个10
  6. ) returns setof int8 as $$
  7. declare
  8. begin
  9. for i in 0..399 loop -- 生成400个链接,因为每行5000万,20亿个BIT,刚好400条。并LOOP
  10. perform conn('link'||i, conn); -- 连接
  11. perform dblink_get_result('link'||i); -- 消耗掉上一次异步连接的结果,否则会报错。
  12. -- 发送异步DBLINK调用
  13. -- 每次操作一个bit分段,返回BIT01的位数
  14. perform dblink_send_query('link'||i, format('select bit_count(bit_and(v), %s) from t_bitmap where tagid = any (%L) and ofid=%s', v_bit, and_tagids, i));
  15. end loop;
  16. for i in 0..399 loop
  17. -- 返回异步调用结果,包括所有分段
  18. return query SELECT * FROM dblink_get_result('link'||i) as t(cnt int8);
  19. end loop;
  20. end;
  21. $$ language plpgsql strict;

7、OR标签组合的并行计算函数(dblink 异步并行),返回USERID透视数。

  1. create or replace function get_bitcount_or(
  2. or_tagids int[],
  3. v_bit int,
  4. conn text, -- 连接串
  5. OUT cnt int8
  6. ) returns setof int8 as $$
  7. declare
  8. begin
  9. for i in 0..399 loop
  10. perform conn('link'||i, conn);
  11. perform dblink_get_result('link'||i);
  12. perform dblink_send_query('link'||i, format('select bit_count(bit_or(v), %s) from t_bitmap where tagid = any (%L) and ofid=%s', v_bit, or_tagids, i));
  13. end loop;
  14. for i in 0..399 loop
  15. return query SELECT * FROM dblink_get_result('link'||i) as t(cnt int8);
  16. end loop;
  17. end;
  18. $$ language plpgsql strict;

8、AND,OR 标签组合的并行计算函数(dblink 异步并行),返回USERID透视数。

  1. create or replace function get_bitcount_and_or(
  2. and_tagids int[],
  3. or_tagids int[],
  4. v_bit int,
  5. conn text, -- 连接串
  6. OUT cnt int8
  7. ) returns setof int8 as $$
  8. declare
  9. begin
  10. for i in 0..399 loop
  11. perform conn('link'||i, conn);
  12. perform dblink_get_result('link'||i);
  13. perform dblink_send_query('link'||i, format('
  14. with t1 as (select bit_and(v) b from t_bitmap where tagid = any (%L) and ofid=%s),
  15. t2 as (select bit_or(v) b from t_bitmap where tagid = any (%L) and ofid=%s)
  16. select bit_count(bitor(t1.b, t2.b), %s) from t1,t2',
  17. and_tagids, i, or_tagids, i, v_bit));
  18. end loop;
  19. for i in 0..399 loop
  20. return query SELECT * FROM dblink_get_result('link'||i) as t(cnt int8);
  21. end loop;
  22. end;
  23. $$ language plpgsql strict;
  1. -- 更复杂的QUERY,可以自行修改函数。实际业务中这种需求较少。
  2. -- (a and b andc or d) or (a and c) or (d and not b)..........

9、计数透视的性能如下,50个标签组合,仅1.5秒,100个标签组合,仅2.6秒:

我们统计2000亿个user_tags组合(每个用户一条记录,每条记录1000个标签时的换算),仅仅需要2.6秒。

  1. 一个标签:
  2. postgres=# select sum(cnt) from (select get_bitcount_and(array_agg(id),1,'dbname=postgres user=postgres') cnt from generate_series(1,1) t(id)) t;
  3. sum
  4. ------------
  5. 2000000000
  6. (1 row)
  7. Time: 791.392 ms
  8. 10个标签组合:
  9. postgres=# select sum(cnt) from (select get_bitcount_and(array_agg(id),1,'dbname=postgres user=postgres') cnt from generate_series(1,10) t(id)) t;
  10. sum
  11. ------------
  12. 2000000000
  13. (1 row)
  14. Time: 847.427 ms
  15. 50个标签组合:
  16. postgres=# select sum(cnt) from (select get_bitcount_and(array_agg(id),1,'dbname=postgres user=postgres') cnt from generate_series(1,50) t(id)) t;
  17. sum
  18. ------------
  19. 2000000000
  20. (1 row)
  21. Time: 1478.847 ms (00:01.479)
  22. 100个标签组合:
  23. postgres=# select sum(cnt) from (select get_bitcount_and(array_agg(id),1,'dbname=postgres user=postgres') cnt from generate_series(1,100) t(id)) t;
  24. sum
  25. ------------
  26. 2000000000
  27. (1 row)
  28. Time: 2574.761 ms (00:02.575)

10、AND 、 OR组合性能如下,性能一样:

  1. postgres=# select sum(cnt) from (select get_bitcount_and_or(array_agg(case mod(id,2) when 0 then id end), array_agg(case mod(id,2) when 1 then id end), 1,'dbname=postgres user=postgres') cnt from generate_series(1,1) t(id)) t;
  2. sum
  3. -----
  4. (1 row)
  5. Time: 854.934 ms
  6. postgres=# select sum(cnt) from (select get_bitcount_and_or(array_agg(case mod(id,2) when 0 then id end), array_agg(case mod(id,2) when 1 then id end), 1,'dbname=postgres user=postgres') cnt from generate_series(1,10) t(id)) t;
  7. sum
  8. ------------
  9. 2000000000
  10. (1 row)
  11. Time: 889.472 ms
  12. postgres=# select sum(cnt) from (select get_bitcount_and_or(array_agg(case mod(id,2) when 0 then id end), array_agg(case mod(id,2) when 1 then id end), 1,'dbname=postgres user=postgres') cnt from generate_series(1,50) t(id)) t;
  13. sum
  14. ------------
  15. 2000000000
  16. (1 row)
  17. Time: 1519.031 ms (00:01.519)
  18. postgres=# select sum(cnt) from (select get_bitcount_and_or(array_agg(case mod(id,2) when 0 then id end), array_agg(case mod(id,2) when 1 then id end), 1,'dbname=postgres user=postgres') cnt from generate_series(1,100) t(id)) t;
  19. sum
  20. ------------
  21. 2000000000
  22. (1 row)
  23. Time: 2597.701 ms (00:02.598)

11、求USERID,AND 函数如下,我们为了达到高速响应,使用游标返回。

  1. create or replace function get_pos_and(
  2. and_tagids int[], -- 标签组合
  3. v_bit int -- 10BIT位,返回游标,游标包含ofid与位置下标(当然了,这个翻译动作也可以交给程序,那么返回BITofid即可)
  4. ) returns setof refcursor as $$
  5. declare
  6. ref refcursor[]; -- 返回游标数组
  7. res refcursor; -- 返回游标
  8. sql text; -- 游标对应的SQL,即取USERID位置的SQL
  9. begin
  10. for x in 1..400 loop -- 生成400个游标名
  11. ref[x] := 'cur'||x;
  12. end loop;
  13. for i in 0..399 loop
  14. -- 使用0399的偏移值, 乘以5000万系数。
  15. -- 赋予游标名
  16. res := ref[i+1];
  17. -- 生成游标对应的动态SQL(ofid, bit位置),注意bit位置可以不翻译,交给程序翻译也没问题。程序翻译的话,翻译好之后,再使用in查询字典
  18. -- select uid from uid_mapping where pos in (pos_array);
  19. -- 1亿,in 100万, 380毫秒
  20. -- [《HTAP数据库 PostgreSQL 场景与性能测试之 25 - (OLTP) IN , EXISTS 查询》](201711/20171107_26.md)
  21. sql := format('select %s, bit_posite(bit_and(v), %s, true) from t_bitmap where tagid = any (%L) and ofid=%s', i, v_bit, and_tagids, i);
  22. -- 打开游标
  23. open res for execute sql ;
  24. -- 返回游标
  25. return next res;
  26. end loop;
  27. end;
  28. $$ language plpgsql strict;

12、求USERID,OR 函数如下,我们为了达到高速响应,使用游标返回。

  1. create or replace function get_pos_or(
  2. or_tagids int[],
  3. v_bit int
  4. ) returns setof refcursor as $$
  5. declare
  6. ref refcursor[];
  7. res refcursor;
  8. sql text;
  9. begin
  10. for x in 1..400 loop
  11. ref[x] := 'cur'||x;
  12. end loop;
  13. for i in 0..399 loop
  14. res := ref[i+1];
  15. sql := format('select %s, bit_posite(bit_or(v), %s, true) from t_bitmap where tagid = any (%L) and ofid=%s', i, v_bit, or_tagids, i);
  16. open res for execute sql ;
  17. return next res;
  18. end loop;
  19. end;
  20. $$ language plpgsql strict;

13、求USERID,AND OR 函数如下,我们为了达到高速响应,使用游标返回。

  1. create or replace function get_pos_and_or(
  2. and_tagids int[],
  3. or_tagids int[],
  4. v_bit int
  5. ) returns setof refcursor as $$
  6. declare
  7. ref refcursor[];
  8. res refcursor;
  9. sql text;
  10. begin
  11. for x in 1..400 loop
  12. ref[x] := 'cur'||x;
  13. end loop;
  14. for i in 0..399 loop
  15. res := ref[i+1];
  16. sql := format('with t1 as
  17. (select bit_and(v) v from t_bitmap where tagid = any (%L) and ofid=%s),
  18. t2 as
  19. (select bit_or(v) v from t_bitmap where tagid = any (%L) and ofid=%s)
  20. select %s, bit_posite(bitor(t1.v, t2.v), %s, true) from t1,t2',
  21. and_tagids, i, or_tagids, i, i, v_bit);
  22. open res for execute sql ;
  23. return next res;
  24. end loop;
  25. end;
  26. $$ language plpgsql strict;

14、求USERID例子,88毫秒响应,极端速度。

  1. postgres=# begin;
  2. BEGIN
  3. Time: 0.031 ms
  4. postgres=# select * from get_pos_and_or(array[1,2,3], array[4,5,6], 1);
  5. get_pos_and_or
  6. ----------------
  7. cur1
  8. cur2
  9. cur3
  10. cur4
  11. cur5
  12. cur6
  13. cur7
  14. ....
  15. cur399
  16. cur400
  17. (400 rows)
  18. Time: 88.069 ms

获取游标值,5000万ID,仅692毫秒:

  1. fetch 1 from cur1;
  2. Time: 692.408 ms

15、如果我们把位置翻译放到客户端做,那么只需要获取结果BITMAP,那就更快了,224毫秒就可以获取5000万BIT走。 这块也能做成并发,每个客户端获取不同的ofid。

  1. CREATE OR REPLACE FUNCTION public.get_pos_and(and_tagids integer[])
  2. RETURNS SETOF refcursor
  3. LANGUAGE plpgsql
  4. STRICT
  5. AS $function$
  6. declare
  7. ref refcursor[];
  8. res refcursor;
  9. sql text;
  10. begin
  11. for x in 1..400 loop
  12. ref[x] := 'cur'||x;
  13. end loop;
  14. for i in 0..399 loop
  15. res := ref[i+1];
  16. -- sql := format('select %s, bit_posite(bit_and(v), %s, true) from t_bitmap where tagid = any (%L) and ofid=%s', i, v_bit, and_tagids, i);
  17. sql := format('select %s, bit_and(v) from t_bitmap where tagid = any (%L) and ofid=%s', i, and_tagids, i);
  18. open res for execute sql ;
  19. return next res;
  20. end loop;
  21. end;
  22. $function$;
  23. postgres=# \timing
  24. Timing is on.
  25. postgres=# begin;
  26. BEGIN
  27. Time: 0.045 ms
  28. postgres=# select get_pos_and(array_agg(id)) from generate_series(1,100) t(id);
  29. get_pos_and
  30. -------------
  31. cur1
  32. cur2
  33. cur3
  34. ...
  35. cur397
  36. cur398
  37. cur399
  38. cur400
  39. (400 rows)
  40. fetch 1 from cur1;
  41. Time: 224.776 ms

16、如果要求包含某标签,但是不包含某标签的用户,同样使用BIT操作即可。

例子:

  1. 包含b1,同时不包含b2的用户
  2. postgres=# select b1 & bitxor(b1,b2) from (values (bit'11001100', bit'11000001')) as t(b1,b2);
  3. ?column?
  4. ----------
  5. 00001100
  6. (1 row)
  7. 使用这个方法,新增一个UDF即可。

小结

varbitx是阿里云RDS PG提供的一个插件,使用它,单个RDS PG就可以实现万亿级别USER_TAGS的实时圈选。

使用BITMAP分段、DBLINK异步查询、游标等技术,提高性能。

性能指标:

1、求COUNT,2000亿(20亿用户,100个标签组合)USER_IDS,响应速度2.6秒。

2、求USERID明细,返回5000万用户ID位置,仅692毫秒。

3、求USERID明细,如果只返回BITMAP,5000万个BIT仅需224毫秒。

参考

《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》