HybridDB(基于Greenplum)经过长达四个月时间的公测,终于开始商业化的征程、为我们客户提供计算分析能力。

在这之前,我们团队做了许多技术、产品上的打磨,其中OSS的高效访问与处理是其中较为重要的一环。这个功能可以给用户在数据流转方面带来质的变化。

缘起

在传统的OLAP方案中,链路是比较长的,数据流转的代价较为高昂。而且往往常用的数据同步工具未必能够满足需求,复杂的分析在同步上会需要一些功能的定制。而且资源的不够弹性、管理上的诸多麻烦,也带来成本的上升。

那么在云环境中,这一情况可能发生变化,数据的同步、冷备或许会变得更加简单,在线和离线数据的界线或距离可能会变得不那么明显。

oss_ext 与 oss_fdw

HybridDB和PostgreSQL分别是通过oss_ext和oss_fdw来实现对OSS的读写操作。

  • oss_ext

    HybridDB是在PostgreSQL的基础上修改,因此和PostgreSQL一样拥有良好、可靠、高效的扩展性。基于HybridDB插件机制,我们开发了oss_ext这个插件,用于在PostgreSQL和HybridDB中无缝访问OSS。

    HybridDB的外表功能与PG稍有不同,因此用法上也有细微差别,分为读写两种,详细请参考高速 OSS 并行导入导出。下面我们会以具体例子来介绍。

  • oss_fdw

    PostgreSQL的Foreign Data Wrapper是一套很强大的机制,可以允许用户以SQL方式访问各种各样的数据源,OSS也不在话下。详细的用法参考从 OSS 装载数据到 PostgreSQL

注:OSS(Object Storage Service)是阿里云对外提供的海量、安全和高可靠的云存储服务,容量和处理能力的弹性扩展,按实际容量付费,比较适合做数据的冷备。详细请参考对象存储OSS

OSS –> HybridDB 并行加载

HybridDB是多节点的分布式数据库,在读写OSS的时候,每个节点都会建立一个到OSS的连接,并行读写。如图所示:

并行导入

数据在导入后,会在每个节点计算HASH值后做分布,相比通过Master的写入,性能可以随节点数线性提升。

  1. create extension oss_ext;
  2. create READABLE external table singlepath (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com dir=loaddata/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';

这里的dir参数会指定一个目录loaddata,当前对该表进行查询的时候,会将OSS中该目录下所有文本全部加载进来,但不包括次级目录。执行结果如下:

  1. gpdb=> select * from singlepath;
  2. id | name
  3. ----+---------
  4. 0 | zero
  5. 1 | first
  6. 2 | second
  7. (3 rows)

现在在OSS中的目录结构是这样的,两个”output”中的内容是一样的:

  1. loaddata/
  2. -- secondary/
  3. -- output
  4. -- output

创建可以读取层级目录的外部表:

  1. create READABLE external table hyrachi (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com prefix=loaddata/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';

查询的结果如下:

  1. gpdb=> select * from hyrachi;
  2. id | name
  3. ----+---------
  4. 0 | zero
  5. 1 | first
  6. 2 | second
  7. 0 | zero
  8. 1 | first
  9. 2 | second
  10. (6 rows)

关于数据导入,一些最佳实践的建议:

  • 文件数最好是集群总CPU的整数倍,可以充分利用整个HybridDB集群的资源,最大化的并行导入
  • 单个文件不宜过小也不宜过大。OSS上文件打开的时间消耗要大于普通的文件打开,文件过小会导致打开时间占比过高,也会导致文件数过多而影响加载;而过大则会导致单个文件的导入时间过久
  • 目录(多个文件)的加载,不同的文件之间大小不宜差别过大(如一个几M、一个几G等),会引起木桶效应
  • 可以通过管理文件和目录实现对数据的过滤、分割等一系列操作

PostgreSQl –> OSS

相比于HybridDB的oss_ext,PostgreSQL的oss_fdw要更强大一些,可以直接进行读写。

执行以下步骤创建相应外表:

  1. create extension oss_fdw ;
  2. CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS (host 'oss-cn-beijing.aliyuncs.com' , id 'xxxx', key 'xxxxx', bucket 'ext-buffer');
  3. create FOREIGN TABLE singpath (id int, name text) SERVER ossserver OPTIONS (dir 'loaddata/', delimiter ',', format 'csv');

查询结果如下:

  1. postgres=> select * from singpath;
  2. id | name
  3. ----+---------
  4. 0 | zero
  5. 1 | first
  6. 2 | second
  7. (3 rows)

从另外一张表中写入到OSS外表,从而写入到OSS:

  1. postgres=> select * from oss_examp ;
  2. id | name
  3. ----+-------
  4. 0 | zero
  5. 1 | first
  6. (2 rows)
  7. postgres=> insert into singpath select id, name from oss_examp where id = 0;
  8. NOTICE: begin writiing data to oss directory loaddata/, with block size 32 MB and oss file size 1024 MB
  9. INSERT 0 2
  10. postgres=> select * from singpath ;
  11. NOTICE: a total of 2 files will be loaded
  12. id | name
  13. ----+---------
  14. 0 | zero
  15. 1 | first
  16. 2 | second
  17. 0 | zero
  18. (5 rows)

利用对OSS外表的读写这一强大的功能,可以很方便地通过SQL实现对数据的清洗、过滤、导出等功能。上面的”insert into … select …” 语句中的select子句可以换成任意条件的查询,从而达到数据处理的目的。

PostgreSQL –> OSS –> HybridDB 数据同步

非常重要的一点是:PostgreSQL和HybridDB在接口上基本兼容,基本可以共用。

通过oss_fdw,PostgreSQL可以通过SQL实现很方便地将数据流到OSS中;利用oss_ext,HybridDB则可以通过SQL直接、高效地读取OSS,实现数据的流入。架构如下:

数据同步

通过在OSS上合理控制目录结构并写入,可以实现PostgreSQL与HybridDB的无缝同步。这里从以下几个例子予以说明:

  • 增量导入
  • 数据的分柝与快速分析
  • 不同PG实例中相同表的数据合并

增量导入

增量导入一直是被问的比较多的问题,这对于异构数据库一直是一个大的难题。利用oss_ext和oss_fdw则可以基本上通过SQL语句就可以完成需要的工作。

在PG中,建立外表,根据业务需要,定期不定期的根据一定条件将数据同步到OSS的特定目录。例如现在有一张售卖表,每天都有新的售卖记录,需要每天定时统计今天的业务量:

  1. create TABLE sales(id int, name text, deal_day date);

这里是按天来存储数据,一张外表对应OSS中sales目录下的一个子目录,当需要导入新的一天的数据的时候,只需要执行下面两条SQL:

  1. create FOREIGN TABLE sales_2016_12_13 (id int, name text, deal_day date) SERVER ossserver OPTIONS (dir 'sales/2016_12_13/', delimiter ',', format 'csv');
  2. insert into sales_2016_12_13 select * from sales where deal_day == '2016-12-13';

然后,在GP端,可以提前建立好一张总表:

  1. create TABLE sales(id int, name text, deal_day date) distributed by (id);

同样两条SQL,将数据加载进来:

  1. create READABLE external table sales_2016_12_13 (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com dir=sales/2016-12-13/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';
  2. insert into sales select * from sales_2016_12_13;

也可以一次性完整的加载:

  1. create READABLE external table sales_ext (id int, name text) location('oss://oss-cn-beijing.aliyuncs.com prefix=sales/ id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';
  2. insert into sales select * from sales_ext;

注:请注意这里的location定义中的”dir” 换成了”prefix”。

将上面的逻辑整理成一个Python脚本就是:

  1. #! /bin/env python
  2. import time
  3. import json
  4. import psycopg2
  5. import datetime
  6. PREFIX = "sales"
  7. def sync_day():
  8. now_date = datetime.datetime.now()
  9. today = "%s_%s_%s" % (now_date.year, now_date.month, now_date.day)
  10. table_name = "%s_%s" % (PREFIX, today)
  11. oss_dir = "%s/%s/" % (PREFIX, today)
  12. print today
  13. print table_name
  14. print oss_dir
  15. pgsql_conn_string = "host=apsaradbsamplepgsqlinstance.pg.rds.aliyuncs.com port=3569 dbname=postgres password= sample user=sample"
  16. pgsql_create_table = "create FOREIGN TABLE %s (id int, name text, deal_day date) SERVER ossserver OPTIONS (dir '%s', delimiter ',', format 'csv');" % ( table_name, oss_dir)
  17. pgsql_insert = "insert into %s select * from %s where deal_day = '%s';" % (table_name, PREFIX, today)
  18. gpdb_conn_string = "host=apsaradbsamplegpdbinstance.gpdb.rds.aliyuncs.com port=3569 dbname=gpdb user=sample password=sample"
  19. gpdb_create_table = "create READABLE external table %s(id int, name text, deal_day date) location('oss://oss-cn-beijing.aliyuncs.com dir=%s id=xxxx key=xxxxx bucket=ext-buffer') format 'csv';" % (table_name, oss_dir)
  20. gpdb_insert = "insert into %s select * from %s where deal_day = '%s';" % (PREFIX, table_name, today)
  21. print pgsql_create_table
  22. execute(pgsql_create_table, pgsql_conn_string)
  23. print pgsql_insert
  24. execute(pgsql_insert, pgsql_conn_string)
  25. print gpdb_create_table
  26. execute(gpdb_create_table, gpdb_conn_string)
  27. print gpdb_insert
  28. execute(gpdb_insert, gpdb_conn_string)
  29. def execute(sql, conn_string):
  30. conn = None
  31. try:
  32. conn = psycopg2.connect(conn_string)
  33. conn.autocommit = True
  34. cur = conn.cursor()
  35. cur.execute(sql)
  36. except Exception as e:
  37. if conn:
  38. try:
  39. conn.close()
  40. except:
  41. pass
  42. time.sleep(10)
  43. print e
  44. return None
  45. def main():
  46. sync_day()
  47. if __name__ == "__main__":
  48. main()

更完善一些的话,可以:

  1. 加上一些错误处理
  2. 加上对OSS的管理部分

数据的分柝与快速分析

这里的“数据的分柝”是指根据原始数据的某个字段或维度,将数据拆分成多份,分开存储与计算。相比要更简单一些,将上面的例子中稍做修改即可。比如,在PG中,将数据依据条件导出到不同的OSS目录:

  1. create table sales(id int, name text, deal_day date, site_id integer);
  2. create FOREIGN TABLE sales_site_0 (id int, name text, deal_day date, site_id integer) SERVER ossserver OPTIONS (dir 'sales/site_0/', delimiter ',', format 'csv');
  3. create FOREIGN TABLE sales_site_1 (id int, name text, deal_day date, site_id integer) SERVER ossserver OPTIONS (dir 'sales/site_1/', delimiter ',', format 'csv');
  4. create FOREIGN TABLE sales_site_2 (id int, name text, deal_day date, site_id integer) SERVER ossserver OPTIONS (dir 'sales/site_2/', delimiter ',', format 'csv');
  5. insert into sales_site_0 select * from sales where site_id = 0;
  6. insert into sales_site_1 select * from sales where site_id = 1;
  7. insert into sales_site_2 select * from sales where site_id = 2;

导入到OSS中的目录结构是:

  1. sales
  2. --> site_0
  3. --> file1
  4. --> file2
  5. --> file3
  6. --> site_1
  7. --> file1
  8. --> file2
  9. --> site_2
  10. --> file1
  11. --> file2
  12. --> file3

那么,可以将site_0、site_1、site_2分别导入不同的GP实例,或同一个GP实例的不同表中,以达到数据拆分的目的。

当数据量不是特别大的时候,也可以通过下面语句直接进行分析而不必将数据完全导入,以实现快速的检查性分析:

  1. select * from sales_2016_12_13 where deal_day != '2016-12-13';

请注意:这种快速分析的方式,会先将所有数据加载到内存进行计算,不适合大于总内存数据量的场景。

不同PG实例中相同表的数据合并

在了解了“增量同步”的部分之后,数据的合并也是比较简单的。这里的“数据合并”是指:多个不同的PG实例中,具有相同的表结构的数据,最后需要汇总进行分析、计算,比如分库分表。

那么这个时候,只需将不同的PG实例中的数据写入到同一个OSS目录中、然后在GP中递归加载(prefix参数)上一层目录即可。这里需要注意的有以下几点:

  1. 不同的PG实例中的表结构需要相同,不然GP在加载的时候会报错(某些文件会报字段不存在、或字段多了)
  2. 写入同一个OSS目录中,可以继续分层级,但层级不宜过多

总结

从上面的例子中可以看出:HybridDB + OSS + PostgreSQL,可以有效覆盖OLTP和OLAP一体化的场景。当前在使用上,需要用户做一些自行的定义,同时也提供了足够的灵活度。而在后面,我们可能会进一步进行一体化打造,比如PG到GP的逻辑复制、MySQL到GP的逻辑复制等。让我们在使用体验上更加顺滑。

很荣幸的,见到了很多客户对我们的认可与信赖,甚至有一些客户不断催促我们尽快的商业化,因为他们的业务也要跟着正式上线。这是一份沉甸甸的信任,同时也让我们心存忐忑与敬畏。