HAWQ InputFormat for MapReduce

MapReduce is a programming model developed by Google for processing and generating large data sets on an array of commodity servers. You can use the HAWQ InputFormat class to enable MapReduce jobs to access HAWQ data stored in HDFS.

To use HAWQ InputFormat, you need only to provide the URL of the database to connect to, along with the table name you want to access. HAWQ InputFormat fetches only the metadata of the database and table of interest, which is much less data than the table data itself. After getting the metadata, HAWQ InputFormat determines where and how the table data is stored in HDFS. It reads and parses those HDFS files and processes the parsed table tuples directly inside a Map task.

This chapter describes the document format and schema for defining HAWQ MapReduce jobs.

Supported Data Types

HAWQ InputFormat supports the following data types:

SQL/HAWQJDBC/JAVAsetXXXgetXXX
DECIMAL/NUMERICjava.math.BigDecimalsetBigDecimalgetBigDecimal
FLOAT8/DOUBLE PRECISIONdoublesetDoublegetDouble
INT8/BIGINTlongsetLonggetLong
INTEGER/INT4/INTintsetIntgetInt
FLOAT4/REALfloatsetFloatgetFloat
SMALLINT/INT2shortsetShortgetShort
BOOL/BOOLEANbooleansetBooleangetBoolean
VARCHAR/CHAR/TEXTStringsetStringgetString
DATEjava.sql.DatesetDategetDate
TIME/TIMETZjava.sql.TimesetTimegetTime
TIMESTAMP/TIMSTAMPTZjava.sql.TimestampsetTimestampgetTimestamp
ARRAYjava.sq.ArraysetArraygetArray
BIT/VARBITcom.pivotal.hawq.mapreduce.datatype.setVarbitgetVarbit
BYTEAbyte[]setBytegetByte
INTERVALcom.pivotal.hawq.mapreduce.datatype.HAWQIntervalsetIntervalgetInterval
POINTcom.pivotal.hawq.mapreduce.datatype.HAWQPointsetPointgetPoint
LSEGcom.pivotal.hawq.mapreduce.datatype.HAWQLsegsetLseggetLseg
BOXcom.pivotal.hawq.mapreduce.datatype.HAWQBoxsetBoxgetBox
CIRCLEcom.pivotal.hawq.mapreduce.datatype.HAWQCirclesetVirclegetCircle
PATHcom.pivotal.hawq.mapreduce.datatype.HAWQPathsetPathgetPath
POLYGONcom.pivotal.hawq.mapreduce.datatype.HAWQPolygonsetPolygongetPolygon
MACADDRcom.pivotal.hawq.mapreduce.datatype.HAWQMacaddrsetMacaddrgetMacaddr
INETcom.pivotal.hawq.mapreduce.datatype.HAWQInetsetInetgetInet
CIDRcom.pivotal.hawq.mapreduce.datatype.HAWQCIDRsetCIDRgetCIDR

HAWQ InputFormat Example

The following example shows how you can use the HAWQInputFormat class to access HAWQ table data from MapReduce jobs.

  1. package com.mycompany.app;
  2. import com.pivotal.hawq.mapreduce.HAWQException;
  3. import com.pivotal.hawq.mapreduce.HAWQInputFormat;
  4. import com.pivotal.hawq.mapreduce.HAWQRecord;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.conf.Configured;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.util.Tool;
  13. import org.apache.hadoop.util.ToolRunner;
  14. import org.apache.hadoop.io.IntWritable;
  15. import java.io.IOException;
  16. public class HAWQInputFormatDemoDriver extends Configured
  17. implements Tool {
  18. // CREATE TABLE employees (
  19. // id INTEGER NOT NULL, name VARCHAR(32) NOT NULL);
  20. public static class DemoMapper extends
  21. Mapper<Void, HAWQRecord, IntWritable, Text> {
  22. int id = 0;
  23. String name = null;
  24. public void map(Void key, HAWQRecord value, Context context)
  25. throws IOException, InterruptedException {
  26. try {
  27. id = value.getInt(1);
  28. name = value.getString(2);
  29. } catch (HAWQException hawqE) {
  30. throw new IOException(hawqE.getMessage());
  31. }
  32. context.write(new IntWritable(id), new Text(name));
  33. }
  34. }
  35. private static int printUsage() {
  36. System.out.println("HAWQInputFormatDemoDriver
  37. <database_url> <table_name> <output_path> [username]
  38. [password]");
  39. ToolRunner.printGenericCommandUsage(System.out);
  40. return 2;
  41. }
  42. public int run(String[] args) throws Exception {
  43. if (args.length < 3) {
  44. return printUsage();
  45. }
  46. Job job = Job.getInstance(getConf());
  47. job.setJobName("hawq-inputformat-demo");
  48. job.setJarByClass(HAWQInputFormatDemoDriver.class);
  49. job.setMapperClass(DemoMapper.class);
  50. job.setMapOutputValueClass(Text.class);
  51. job.setOutputValueClass(Text.class);
  52. String db_url = args[0];
  53. String table_name = args[1];
  54. String output_path = args[2];
  55. String user_name = null;
  56. if (args.length > 3) {
  57. user_name = args[3];
  58. }
  59. String password = null;
  60. if (args.length > 4) {
  61. password = args[4];
  62. }
  63. job.setInputFormatClass(HAWQInputFormat.class);
  64. HAWQInputFormat.setInput(job.getConfiguration(), db_url,
  65. user_name, password, table_name);
  66. FileOutputFormat.setOutputPath(job, new
  67. Path(output_path));
  68. job.setNumReduceTasks(0);
  69. int res = job.waitForCompletion(true) ? 0 : 1;
  70. return res;
  71. }
  72. public static void main(String[] args) throws Exception {
  73. int res = ToolRunner.run(new Configuration(),
  74. new HAWQInputFormatDemoDriver(), args);
  75. System.exit(res);
  76. }
  77. }

To compile and run the example:

  1. Create a work directory:

    1. $ mkdir mrwork
    2. $ cd mrwork
  2. Copy and paste the Java code above into a .java file.

    1. $ mkdir -p com/mycompany/app
    2. $ cd com/mycompany/app
    3. $ vi HAWQInputFormatDemoDriver.java
  3. Note the following dependencies required for compilation:

    1. HAWQInputFormat jars (located in the $GPHOME/lib/postgresql/hawq-mr-io directory):
      • hawq-mapreduce-common.jar
      • hawq-mapreduce-ao.jar
      • hawq-mapreduce-parquet.jar
      • hawq-mapreduce-tool.jar
    2. Required 3rd party jars (located in the $GPHOME/lib/postgresql/hawq-mr-io/lib directory):
      • parquet-common-1.1.0.jar
      • parquet-format-1.1.0.jar
      • parquet-hadoop-1.1.0.jar
      • postgresql-n.n-n-jdbc4.jar
      • snakeyaml-n.n.jar
    3. Hadoop Mapreduce related jars (located in the install directory of your Hadoop distribution).
  4. Compile the Java program. You may choose to use a different compilation command:

    1. javac -classpath /usr/hdp/2.4.2.0-258/hadoop-mapreduce/*:/usr/local/hawq/lib/postgresql/hawq-mr-io/*:/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/*:/usr/hdp/current/hadoop-client/* HAWQInputFormatDemoDriver.java
  5. Build the JAR file.

    1. $ cd ../../..
    2. $ jar cf my-app.jar com
    3. $ cp my-app.jar /tmp
  6. Check that you have installed HAWQ and HDFS and your HAWQ cluster is running.

  7. Create sample table:

    1. Log in to HAWQ:

      1. $ psql -d postgres
    2. Create the table:

      1. CREATE TABLE employees (
      2. id INTEGER NOT NULL,
      3. name TEXT NOT NULL);

      Or a Parquet table:

      1. CREATE TABLE employees ( id INTEGER NOT NULL, name TEXT NOT NULL) WITH (APPENDONLY=true, ORIENTATION=parquet);
    3. Insert one tuple:

      1. INSERT INTO employees VALUES (1, 'Paul');
      2. \q
  8. Ensure the system pg_hba.conf configuration file is set up to allow gpadmin access to the postgres database.

  9. Use the following shell script snippet showing how to run the Mapreduce job:

    1. #!/bin/bash
    2. # set up environment variables
    3. HAWQMRLIB=/usr/local/hawq/lib/postgresql/hawq-mr-io
    4. export HADOOP_CLASSPATH=$HAWQMRLIB/hawq-mapreduce-ao.jar:$HAWQMRLIB/hawq-mapreduce-common.jar:$HAWQMRLIB/hawq-mapreduce-tool.jar:$HAWQMRLIB/hawq-mapreduce-parquet.jar:$HAWQMRLIB/lib/postgresql-9.2-1003-jdbc4.jar:$HAWQMRLIB/lib/snakeyaml-1.12.jar:$HAWQMRLIB/lib/parquet-hadoop-1.1.0.jar:$HAWQMRLIB/lib/parquet-common-1.1.0.jar:$HAWQMRLIB/lib/parquet-format-1.0.0.jar
    5. export LIBJARS=$HAWQMRLIB/hawq-mapreduce-ao.jar,$HAWQMRLIB/hawq-mapreduce-common.jar,$HAWQMRLIB/hawq-mapreduce-tool.jar,$HAWQMRLIB/lib/postgresql-9.2-1003-jdbc4.jar,$HAWQMRLIB/lib/snakeyaml-1.12.jar,$HAWQMRLIB/hawq-mapreduce-parquet.jar,$HAWQMRLIB/lib/parquet-hadoop-1.1.0.jar,$HAWQMRLIB/lib/parquet-common-1.1.0.jar,$HAWQMRLIB/lib/parquet-format-1.0.0.jar
    6. # usage: hadoop jar JARFILE CLASSNAME -libjars JARS <database_url> <table_name> <output_path_on_HDFS>
    7. # - writing output to HDFS, so run as hdfs user
    8. # - if not using the default postgres port, replace 5432 with port number for your HAWQ cluster
    9. HADOOP_USER_NAME=hdfs hadoop jar /tmp/my-app.jar com.mycompany.app.HAWQInputFormatDemoDriver -libjars $LIBJARS localhost:5432/postgres employees /tmp/employees

    The MapReduce job output is written to the /tmp/employees directory on the HDFS file system.

  10. Use the following command to check the result of the Mapreduce job:

    1. $ sudo -u hdfs hdfs dfs -ls /tmp/employees
    2. $ sudo -u hdfs hdfs dfs -cat /tmp/employees/*

    The output will appear as follows:

    1. 1 Paul
  11. If you choose to run the program again, delete the output file and directory:

    1. $ sudo -u hdfs hdfs dfs -rm /tmp/employees/*
    2. $ sudo -u hdfs hdfs dfs -rmdir /tmp/employees

Accessing HAWQ Data

You can access HAWQ data using the HAWQInputFormat.setInput() interface. You will use a different API signature depending on whether HAWQ is running or not.

  • When HAWQ is running, use HAWQInputFormat.setInput(Configuration conf, String db_url, String username, String password, String tableName).
  • When HAWQ is not running, first extract the table metadata to a file with the Metadata Export Tool and then use HAWQInputFormat.setInput(Configuration conf, String pathStr).

HAWQ is Running

  1. /**
  2. * Initializes the map-part of the job with the appropriate input settings
  3. * through connecting to Database.
  4. *
  5. * @param conf
  6. * The map-reduce job configuration
  7. * @param db_url
  8. * The database URL to connect to
  9. * @param username
  10. * The username for setting up a connection to the database
  11. * @param password
  12. * The password for setting up a connection to the database
  13. * @param tableName
  14. * The name of the table to access to
  15. * @throws Exception
  16. */
  17. public static void setInput(Configuration conf, String db_url,
  18. String username, String password, String tableName)
  19. throws Exception;

HAWQ is not Running

Use the metadata export tool, hawq extract, to export the metadata of the target table into a local YAML file:

  1. $ hawq extract [-h hostname] [-p port] [-U username] [-d database] [-o output_file] [-W] <tablename>

Using the extracted metadata, access HAWQ data through the following interface. Pass the complete path to the .yaml file in the pathStr argument.

  1. /**
  2. * Initializes the map-part of the job with the appropriate input settings through reading metadata file stored in local filesystem.
  3. *
  4. * To get metadata file, please use hawq extract first
  5. *
  6. * @param conf
  7. * The map-reduce job configuration
  8. * @param pathStr
  9. * The metadata file path in local filesystem. e.g.
  10. * /home/gpadmin/metadata/postgres_test
  11. * @throws Exception
  12. */
  13. public static void setInput(Configuration conf, String pathStr)
  14. throws Exception;