PXF HDFS连接器支持SequenceFile格式的二进制数据。本节描述如何使用PXF读写HDFS SequenceFile数据,包括如何创建引用HDFS文件的外部表,查询和向外部表写入数据。

先决条件

在尝试从HDFS读取或向HDFS写入数据之前,请确保已满足PXF Hadoop先决条件

创建外部表

PXF HDFS连接器 hdfs:SequenceFile 配置文件支持读写HDFS中SequenceFile格式的二进制数据。 当您将记录插入可写外部表中时,您插入的数据块将写入指定目录中的一个或多个文件。

注意: 使用可写配置创建的外部表仅INSERT操作。 如果要查询插入的数据,则必须单独创建一个引用相关HDFS目录的可读外部表。

使用以下语法创建一个引用HDFS目录的Greenplum数据库外部表:

  1. CREATE [WRITABLE] EXTERNAL TABLE <table_name>
  2. ( <column_name> <data_type> [, ...] | LIKE <other_table> )
  3. LOCATION ('pxf://<path-to-hdfs-dir>
  4. ?PROFILE=hdfs:SequenceFile[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
  5. FORMAT 'CUSTOM' (<formatting-properties>)
  6. [DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];

CREATE EXTERNAL TABLE命令中使用的特定关键字和值见下表中描述。

关键字
<path‑to‑hdfs‑dir>HDFS数据存储中目录或文件的绝对路径
PROFILEPROFILE 关键字必须指定为 hdfs:SequenceFile.
SERVER=<server_name>PXF用于访问数据的命名服务器配置。可选的; 如果未指定,PXF将使用default服务器。
<custom‑option><custom-option> 描述见下方.
FORMAT使用 FORMATCUSTOM’ 时指定 (FORMATTER=’pxfwritable_export’) (写入) 或 (FORMATTER=’pxfwritable_import’) (读取).
DISTRIBUTED BY如果您计划将现有Greenplum数据库表中的数据加载到可写外部表,考虑在可写外部表上使用与Greenplum表相同的分布策略或<字段名>。 这样做可以避免数据加载操作中segment节点间额外的数据移动。

SequenceFile 格式数据可以选择采用record或block压缩。 PXF hdfs:SequenceFile 配置支持以下压缩编解码器:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.BZip2Codec

使用 hdfs:SequenceFile 配置文件写入SequenceFile格式数据时,则必须提供Java类的名称,以用于序列化/反序列化二进制数据。 这个类必须为数据模式中引用的每种数据类型提供读写方法。

您可以通过 CREATE EXTERNAL TABLE LOCATION 子句的自定义选项指定压缩编解码器和Java 序列化类。 hdfs:SequenceFile 配置文件支持以下自定义选项:

选项值描述
COMPRESSION_CODEC压缩编码器类名称。 如果此选项未提供,Greenplum 数据库不执行数据压缩。 支持的压缩编解码器包括:
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.GzipCodec
COMPRESSION_TYPE采用的压缩类型; 支持的值有 RECORD (默认) 或 BLOCK.
DATA-SCHEMA编写器序列化/反序列化类的名称。 此类所在的jar文件必须位于PXF类路径中。 hdfs:SequenceFile 配置文件需要此选项,并且没有默认值。
THREAD-SAFE确定表查询是否可以在多线程模式下运行的布尔值。 默认为 TRUE。 将此选项设置为 FALSE 以处理单个线程中所有非线程安全操作 (例如,压缩)的请求。

读写二进制数据

当您要读写HDFS中 SequenceFile 格式的数据,请使用HDFS 连接器 hdfs:SequenceFile 配置文件。 这种类型的文件由二进制键/值对组成。 SequenceFile 格式是MapReduce作业之间常见的数据传输格式。

示例: 将二进制数据写入HDFS

在此示例中,您将创建一个名为 PxfExample_CustomWritable 的Java类,该类将对先前示例中使用的示例模式中的字段进行序列化/反序列化。然后,您将使用此类访问通过hdfs:SequenceFile配置文件创建的可写外部表,该表使用默认的PXF服务器。

执行以下步骤来创建Java类和可写外部表。

  1. 准备创建示例Java类:

    1. $ mkdir -p pxfex/com/example/pxf/hdfs/writable/dataschema
    2. $ cd pxfex/com/example/pxf/hdfs/writable/dataschema
    3. $ vi PxfExample_CustomWritable.java
  2. 将以下文本复制并黏贴到 PxfExample_CustomWritable.java 文件中:

    1. package com.example.pxf.hdfs.writable.dataschema;
    2. import org.apache.hadoop.io.*;
    3. import java.io.DataInput;
    4. import java.io.DataOutput;
    5. import java.io.IOException;
    6. import java.lang.reflect.Field;
    7. /**
    8. * PxfExample_CustomWritable class - used to serialize and deserialize data with
    9. * text, int, and float data types
    10. */
    11. public class PxfExample_CustomWritable implements Writable {
    12. public String st1, st2;
    13. public int int1;
    14. public float ft;
    15. public PxfExample_CustomWritable() {
    16. st1 = new String("");
    17. st2 = new String("");
    18. int1 = 0;
    19. ft = 0.f;
    20. }
    21. public PxfExample_CustomWritable(int i1, int i2, int i3) {
    22. st1 = new String("short_string___" + i1);
    23. st2 = new String("short_string___" + i1);
    24. int1 = i2;
    25. ft = i1 * 10.f * 2.3f;
    26. }
    27. String GetSt1() {
    28. return st1;
    29. }
    30. String GetSt2() {
    31. return st2;
    32. }
    33. int GetInt1() {
    34. return int1;
    35. }
    36. float GetFt() {
    37. return ft;
    38. }
    39. @Override
    40. public void write(DataOutput out) throws IOException {
    41. Text txt = new Text();
    42. txt.set(st1);
    43. txt.write(out);
    44. txt.set(st2);
    45. txt.write(out);
    46. IntWritable intw = new IntWritable();
    47. intw.set(int1);
    48. intw.write(out);
    49. FloatWritable fw = new FloatWritable();
    50. fw.set(ft);
    51. fw.write(out);
    52. }
    53. @Override
    54. public void readFields(DataInput in) throws IOException {
    55. Text txt = new Text();
    56. txt.readFields(in);
    57. st1 = txt.toString();
    58. txt.readFields(in);
    59. st2 = txt.toString();
    60. IntWritable intw = new IntWritable();
    61. intw.readFields(in);
    62. int1 = intw.get();
    63. FloatWritable fw = new FloatWritable();
    64. fw.readFields(in);
    65. ft = fw.get();
    66. }
    67. public void printFieldTypes() {
    68. Class myClass = this.getClass();
    69. Field[] fields = myClass.getDeclaredFields();
    70. for (int i = 0; i < fields.length; i++) {
    71. System.out.println(fields[i].getType().getName());
    72. }
    73. }
    74. }
  3. 编译并为 PxfExample_CustomWritable 创建Java类JAR文件。 为您的Hadoop发行版提供一个包含 hadoop-common.jar 文件的类路径。 例如, 如果您安装了Hortonworks Data Platform Hadoop客户端:

    1. $ javac -classpath /usr/hdp/current/hadoop-client/hadoop-common.jar PxfExample_CustomWritable.java
    2. $ cd ../../../../../../
    3. $ jar cf pxfex-customwritable.jar com
    4. $ cp pxfex-customwritable.jar /tmp/

    (您的Hadoop库类路径可能不同)

  4. pxfex-customwritable.jar 文件复制到Greenplum 数据库master节点。 例如:

    1. $ scp pxfex-customwritable.jar gpadmin@gpmaster:/home/gpadmin
  5. 登录到您的Greenplum数据库master节点:

    1. $ ssh gpadmin@<gpmaster>
  6. pxfex-customwritable.jar JAR文件复制到用户运行时类库目录中,并记下位置。 例如, 如果 PXF_CONF=/usr/local/greenplum-pxf

    1. gpadmin@gpmaster$ cp /home/gpadmin/pxfex-customwritable.jar /usr/local/greenplum-pxf/lib/pxfex-customwritable.jar
  7. 将PXF配置同步到Greenplum数据库集群。 例如:

    1. gpadmin@gpmaster$ $GPHOME/pxf/bin/pxf cluster sync
  8. 重启PXF中所述,在每个Greenplum数据库segment主机上重启PXF。

  9. 使用PXF hdfs:SequenceFile 配置文件创建Greenplum数据库可写外部表。 在 DATA-SCHEMA 自定义选项 中标识您在上面创建的序列化/反序列化Java类。 创建可写外部表时,使用 BZip2 压缩并指定 BLOCK 压缩模式。

    1. postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
    2. LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable&COMPRESSION_TYPE=BLOCK&COMPRESSION_CODEC=org.apache.hadoop.io.compress.BZip2Codec')
    3. FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

    注意 'CUSTOM' FORMAT 格式化属性 指定为内置的 pxfwritable_export 格式化程序。

  10. 通过直接插入 pxf_tbl_seqfile 表将一些数据写入 pxf_seqfile HDFS目录中。 例如:

    1. postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
    2. postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
  11. 回想一下,Greenplum 数据库不支持直接查询一个可写外部表。要读取pxf_seqfile中的数据,请创建一个引用此HDFS目录的可读外部表:

    1. postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
    2. LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
    3. FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');

    当您通过 hdfs:SequenceFile 配置文件读取HDFS数据是必须指定 DATA-SCHEMA 自定义选项 。 您无需提供压缩相关的选项。

  12. 查询可读外部表 read_pxf_tbl_seqfile

    1. gpadmin=# SELECT * FROM read_pxf_tbl_seqfile ORDER BY total_sales;
    1. location | month | number_of_orders | total_sales
    2. -----------+-------+------------------+-------------
    3. Frankfurt | Mar | 777 | 3956.98
    4. Cleveland | Oct | 3812 | 96645.4
    5. (2 rows)

读取记录键

当Greenplum数据库外部表引用SequenceFile或其他以键-值对存储行的数据格式时, 您可以通过使用 recordkey 关键字作为字段名称,在Greenplum查询中访问记录键的值。

recordkey 字段类型必须与记录键类型相对应,就像其他字段必须与HDFS数据匹配一样。.

您可以将 recordkey 定义为以下任何Hadoop类型:

  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • Text

如果没有为行定义记录键, Greenplum数据库将返回处理该行的segment的id。

示例: 使用记录键

创建一个可读外部表,以访问您在 示例: 将二进制数据写入HDFS 创建的可写外部表 pxf_tbl_seqfile 的记录键。 本例中将 recordkey 定义为 int8 类型。

  1. postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile_recordkey(recordkey int8, location text, month text, number_of_orders integer, total_sales real)
  2. LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
  3. FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
  4. gpadmin=# SELECT * FROM read_pxf_tbl_seqfile_recordkey;
  1. recordkey | location | month | number_of_orders | total_sales
  2. -----------+-------------+-------+------------------+-------------
  3. 2 | Frankfurt | Mar | 777 | 3956.98
  4. 1 | Cleveland | Oct | 3812 | 96645.4
  5. (2 rows)

当您将行插入到可写外部表时,您没有定义记录键, 因此 recordkey 标识为处理该行的segment。