PXF External Tables and API

You can use the PXF API to create your own connectors to access any other type of parallel data store or processing engine.

The PXF Java API lets you extend PXF functionality and add new services and formats without changing HAWQ. The API includes three classes that are extended to allow HAWQ to access an external data source: Fragmenter, Accessor, and Resolver.

The Fragmenter produces a list of data fragments that can be read in parallel from the data source. The Accessor produces a list of records from a single fragment, and the Resolver both deserializes and serializes records.

Together, the Fragmenter, Accessor, and Resolver classes implement a connector. PXF includes plug-ins for HDFS and JSON files and tables in HBase and Hive.

Creating an External Table

The syntax for an EXTERNAL TABLE that uses the PXF protocol is as follows:

  1. CREATE [READABLE|WRITABLE] EXTERNAL TABLE <table_name>
  2. ( <column_name> <data_type> [, ...] | LIKE <other_table> )
  3. LOCATION('pxf://<host>[:<port>]/<path-to-data>?<pxf-parameters>[&<custom-option>=<value>[...]]')
  4. FORMAT 'custom' (formatter='pxfwritable_import|pxfwritable_export');

where is:

  1. [FRAGMENTER=<fragmenter_class>&ACCESSOR=<accessor_class>
  2. &RESOLVER=<resolver_class>] | ?PROFILE=profile-name

Note: Not every PXF profile supports writable external tables. Refer to Writing Data to HDFS for a detailed discussion of the HDFS plug-in profiles that support this feature.

Table 1. Parameter values and description

ParameterValue and description
<host>The PXF host. While <host> may identify any PXF agent node, use the HDFS NameNode as it is guaranteed to be available in a running HDFS cluster. If HDFS High Availability is enabled, <host> must identify the HDFS NameService.
<port>The PXF port. If <port> is omitted, PXF assumes <host> identifies a High Availability HDFS Nameservice and connects to the port number designated by the pxf_service_port server configuration parameter value. Default is 51200.
<path-to-data>A directory, file name, wildcard pattern, table name, etc.
PROFILEThe profile PXF uses to access the data. PXF supports multiple plug-ins that currently expose profiles named HBase, Hive, HiveRC, HiveText, HiveORC, HiveVectorizedORC, HdfsTextSimple, HdfsTextMulti, Avro, SequenceWritable, and Json.
FRAGMENTERThe Java class the plug-in uses for fragmenting data. Used for READABLE external tables only.
ACCESSORThe Java class the plug-in uses for accessing the data. Used for READABLE and WRITABLE tables.
RESOLVERThe Java class the plug-in uses for serializing and deserializing the data. Used for READABLE and WRITABLE tables.
<custom-option>Additional values to pass to the plug-in at runtime. A plug-in can parse custom options with the PXF helper class org.apache.hawq.pxf.api.utilities.InputData

Note: When creating PXF external tables, you cannot use the HEADER option in your FORMAT specification.

About the Java Class Services and Formats

The LOCATION string in a PXF CREATE EXTERNAL TABLE statement is a URI that specifies the host and port of an external data source and the path to the data in the external data source. The query portion of the URI, introduced by the question mark (?), must include the PXF profile name or the plug-in’s FRAGMENTER (readable tables only), ACCESSOR, and RESOLVER class names.

PXF profiles are defined in the /etc/pxf/conf/pxf-profiles.xml file. Profile definitions include plug-in class names. For example, the HdfsTextSimple profile definition is:

  1. <profile>
  2. <name>HdfsTextSimple</name>
  3. <description> This profile is suitable for use when reading delimited
  4. single line records from plain text files on HDFS.
  5. </description>
  6. <plugins>
  7. <fragmenter>org.apache.hawq.pxf.plugins.hdfs.HdfsDataFragmenter</fragmenter>
  8. <accessor>org.apache.hawq.pxf.plugins.hdfs.LineBreakAccessor</accessor>
  9. <resolver>org.apache.hawq.pxf.plugins.hdfs.StringPassResolver</resolver>
  10. </plugins>
  11. </profile>

The parameters in the PXF URI are passed from HAWQ as headers to the PXF Java service. You can pass custom information to user-implemented PXF plug-ins by adding optional parameters to the LOCATION string.

The Java PXF service retrieves the source data from the external data source and converts it to a HAWQ-readable table format.

The Accessor, Resolver, and Fragmenter Java classes extend the org.apache.hawq.pxf.api.utilities.Plugin class:

  1. package org.apache.hawq.pxf.api.utilities;
  2. /**
  3. * Base class for all plug-in types (Accessor, Resolver, Fragmenter, ...).
  4. * Manages the meta data.
  5. */
  6. public class Plugin {
  7. protected InputData inputData;
  8. /**
  9. * Constructs a plug-in.
  10. *
  11. * @param input the input data
  12. */
  13. public Plugin(InputData input) {
  14. this.inputData = input;
  15. }
  16. /**
  17. * Checks if the plug-in is thread safe or not, based on inputData.
  18. *
  19. * @return true if plug-in is thread safe
  20. */
  21. public boolean isThreadSafe() {
  22. return true;
  23. }
  24. }

The parameters in the LOCATION string are available to the plug-ins through methods in the org.apache.hawq.pxf.api.utilities.InputData class. Plug-ins can look up the custom parameters added to the location string with the getUserProperty() method.

  1. /**
  2. * Common configuration available to all PXF plug-ins. Represents input data
  3. * coming from client applications, such as HAWQ.
  4. */
  5. public class InputData {
  6. /**
  7. * Constructs an InputData from a copy.
  8. * Used to create from an extending class.
  9. *
  10. * @param copy the input data to copy
  11. */
  12. public InputData(InputData copy);
  13. /**
  14. * Returns value of a user defined property.
  15. *
  16. * @param userProp the lookup user property
  17. * @return property value as a String
  18. */
  19. public String getUserProperty(String userProp);
  20. /**
  21. * Sets the byte serialization of a fragment meta data
  22. * @param location start, len, and location of the fragment
  23. */
  24. public void setFragmentMetadata(byte[] location);
  25. /** Returns the byte serialization of a data fragment */
  26. public byte[] getFragmentMetadata();
  27. /**
  28. * Gets any custom user data that may have been passed from the
  29. * fragmenter. Will mostly be used by the accessor or resolver.
  30. */
  31. public byte[] getFragmentUserData();
  32. /**
  33. * Sets any custom user data that needs to be shared across plug-ins.
  34. * Will mostly be set by the fragmenter.
  35. */
  36. public void setFragmentUserData(byte[] userData);
  37. /** Returns the number of segments in GP. */
  38. public int getTotalSegments();
  39. /** Returns the current segment ID. */
  40. public int getSegmentId();
  41. /** Returns true if there is a filter string to parse. */
  42. public boolean hasFilter();
  43. /** Returns the filter string, <tt>null</tt> if #hasFilter is <tt>false</tt> */
  44. public String getFilterString();
  45. /** Returns tuple description. */
  46. public ArrayList<ColumnDescriptor> getTupleDescription();
  47. /** Returns the number of columns in tuple description. */
  48. public int getColumns();
  49. /** Returns column index from tuple description. */
  50. public ColumnDescriptor getColumn(int index);
  51. /**
  52. * Returns the column descriptor of the recordkey column. If the recordkey
  53. * column was not specified by the user in the create table statement will
  54. * return null.
  55. */
  56. public ColumnDescriptor getRecordkeyColumn();
  57. /** Returns the data source of the required resource (i.e a file path or a table name). */
  58. public String getDataSource();
  59. /** Sets the data source for the required resource */
  60. public void setDataSource(String dataSource);
  61. /** Returns the ClassName for the java class that was defined as Accessor */
  62. public String getAccessor();
  63. /** Returns the ClassName for the java class that was defined as Resolver */
  64. public String getResolver();
  65. /**
  66. * Returns the ClassName for the java class that was defined as Fragmenter
  67. * or null if no fragmenter was defined
  68. */
  69. public String getFragmenter();
  70. /**
  71. * Returns the contents of pxf_remote_service_login set in Hawq.
  72. * Should the user set it to an empty string this function will return null.
  73. *
  74. * @return remote login details if set, null otherwise
  75. */
  76. public String getLogin();
  77. /**
  78. * Returns the contents of pxf_remote_service_secret set in Hawq.
  79. * Should the user set it to an empty string this function will return null.
  80. *
  81. * @return remote password if set, null otherwise
  82. */
  83. public String getSecret();
  84. /**
  85. * Returns true if the request is thread safe. Default true. Should be set
  86. * by a user to false if the request contains non thread-safe plug-ins or
  87. * components, such as BZip2 codec.
  88. */
  89. public boolean isThreadSafe();
  90. /**
  91. * Returns a data fragment index. plan to deprecate it in favor of using
  92. * getFragmentMetadata().
  93. */
  94. public int getDataFragment();
  95. }

Fragmenter

Note: You use the Fragmenter class to read data into HAWQ. You cannot use this class to write data out of HAWQ.

The Fragmenter is responsible for passing datasource metadata back to HAWQ. It also returns a list of data fragments to the Accessor or Resolver. Each data fragment describes some part of the requested data set. It contains the datasource name, such as the file or table name, including the hostname where it is located. For example, if the source is an HDFS file, the Fragmenter returns a list of data fragments containing an HDFS file block. Each fragment includes the location of the block. If the source data is an HBase table, the Fragmenter returns information about table regions, including their locations.

The ANALYZE command now retrieves advanced statistics for PXF readable tables by estimating the number of tuples in a table, creating a sample table from the external table, and running advanced statistics queries on the sample table in the same way statistics are collected for native HAWQ tables.

The configuration parameter pxf_enable_stat_collection controls collection of advanced statistics. If pxf_enable_stat_collection is set to false, no analysis is performed on PXF tables. An additional parameter, pxf_stat_max_fragments, controls the number of fragments sampled to build a sample table. By default pxf_stat_max_fragments is set to 100, which means that even if there are more than 100 fragments, only this number of fragments will be used in ANALYZE to sample the data. Increasing this number will result in better sampling, but can also impact performance.

When a PXF table is analyzed, any of the following conditions might result in a warning message with no statistics gathered for the table:

  • pxf_enable_stat_collection is set to off,
  • an error occurs because the table is not defined correctly,
  • the PXF service is down, or
  • getFragmentsStats() is not implemented

If ANALYZE is running over all tables in the database, the next table will be processed – a failure processing one table does not stop the command.

For a detailed explanation about HAWQ statistical data gathering, refer to the ANALYZE SQL command reference.

Note:

  • Depending on external table size, the time required to complete an ANALYZE operation can be lengthy. The boolean parameter pxf_enable_stat_collection enables statistics collection for PXF. The default value is on. Turning this parameter off (disabling PXF statistics collection) can help decrease the time needed for the ANALYZE operation.
  • You can also use pxf_stat_max_fragments to limit the number of fragments to be sampled by decreasing it from the default (100). However, if the number is too low, the sample might not be uniform and the statistics might be skewed.
  • You can also implement getFragmentsStats() to return an error. This will cause ANALYZE on a table with this Fragmenter to fail immediately, and default statistics values will be used for that table.

The following table lists the Fragmenter plug-in implementations included with the PXF API.

Table 2. Fragmenter base classes

Fragmenter class

Description

org.apache.hawq.pxf.plugins.hdfs.HdfsDataFragmenterFragmenter for HDFS, JSON files
org.apache.hawq.pxf.plugins.hbase.HBaseDataFragmenterFragmenter for HBase tables
org.apache.hawq.pxf.plugins.hive.HiveDataFragmenterFragmenter for Hive tables 
org.apache.hawq.pxf.plugins.hdfs.HiveInputFormatFragmenterFragmenter for Hive tables with RC, ORC, or text file formats 

A Fragmenter class extends org.apache.hawq.pxf.api.Fragmenter:

org.apache.hawq.pxf.api.Fragmenter

  1. package org.apache.hawq.pxf.api;
  2. /**
  3. * Abstract class that defines the splitting of a data resource into fragments
  4. * that can be processed in parallel.
  5. */
  6. public abstract class Fragmenter extends Plugin {
  7. protected List<Fragment> fragments;
  8. public Fragmenter(InputData metaData) {
  9. super(metaData);
  10. fragments = new LinkedList<Fragment>();
  11. }
  12. /**
  13. * Gets the fragments of a given path (source name and location of each
  14. * fragment). Used to get fragments of data that could be read in parallel
  15. * from the different segments.
  16. */
  17. public abstract List<Fragment> getFragments() throws Exception;
  18. /**
  19. * Default implementation of statistics for fragments. The default is:
  20. * <ul>
  21. * <li>number of fragments - as gathered by {@link #getFragments()}</li>
  22. * <li>first fragment size - 64MB</li>
  23. * <li>total size - number of fragments times first fragment size</li>
  24. * </ul>
  25. * Each fragmenter implementation can override this method to better match
  26. * its fragments stats.
  27. *
  28. * @return default statistics
  29. * @throws Exception if statistics cannot be gathered
  30. */
  31. public FragmentsStats getFragmentsStats() throws Exception {
  32. List<Fragment> fragments = getFragments();
  33. long fragmentsNumber = fragments.size();
  34. return new FragmentsStats(fragmentsNumber,
  35. FragmentsStats.DEFAULT_FRAGMENT_SIZE, fragmentsNumber
  36. * FragmentsStats.DEFAULT_FRAGMENT_SIZE);
  37. }
  38. }

getFragments() returns a string in JSON format of the retrieved fragment. For example, if the input path is a HDFS directory, the source name for each fragment should include the file name including the path for the fragment.

Class Description

The Fragmenter.getFragments() method returns a List<Fragment>:

  1. package org.apache.hawq.pxf.api;
  2. /*
  3. * Fragment holds a data fragment' information.
  4. * Fragmenter.getFragments() returns a list of fragments.
  5. */
  6. public class Fragment
  7. {
  8. private String sourceName; // File path+name, table name, etc.
  9. private int index; // Fragment index (incremented per sourceName)
  10. private String[] replicas; // Fragment replicas (1 or more)
  11. private byte[] metadata; // Fragment metadata information (starting point + length, region location, etc.)
  12. private byte[] userData; // ThirdParty data added to a fragment. Ignored if null
  13. ...
  14. }

org.apache.hawq.pxf.api.FragmentsStats

The Fragmenter.getFragmentsStats() method returns a FragmentsStats:

  1. package org.apache.hawq.pxf.api;
  2. /**
  3. * FragmentsStats holds statistics for a given path.
  4. */
  5. public class FragmentsStats {
  6. // number of fragments
  7. private long fragmentsNumber;
  8. // first fragment size
  9. private SizeAndUnit firstFragmentSize;
  10. // total fragments size
  11. private SizeAndUnit totalSize;
  12. /**
  13. * Enum to represent unit (Bytes/KB/MB/GB/TB)
  14. */
  15. public enum SizeUnit {
  16. /**
  17. * Byte
  18. */
  19. B,
  20. /**
  21. * KB
  22. */
  23. KB,
  24. /**
  25. * MB
  26. */
  27. MB,
  28. /**
  29. * GB
  30. */
  31. GB,
  32. /**
  33. * TB
  34. */
  35. TB;
  36. };
  37. /**
  38. * Container for size and unit
  39. */
  40. public class SizeAndUnit {
  41. long size;
  42. SizeUnit unit;
  43. ...

getFragmentsStats() returns a string in JSON format of statistics for the data source. For example, if the input path is a HDFS directory of 3 files, each one of 1 block, the output will be the number of fragments (3), the size of the first file, and the size of all files in that directory.

Accessor

The Accessor retrieves specific fragments and passes records back to the Resolver. For example, the HDFS plug-ins create a org.apache.hadoop.mapred.FileInputFormat and a org.apache.hadoop.mapred.RecordReader for an HDFS file and sends this to the Resolver. In the case of HBase or Hive files, the Accessor returns single rows from an HBase or Hive table. PXF includes the following Accessor implementations:

Table 3. Accessor base classes

Accessor class

Description

org.apache.hawq.pxf.plugins.hdfs.HdfsAtomicDataAccessorBase class for accessing datasources which cannot be split. These will be accessed by a single HAWQ segment
org.apache.hawq.pxf.plugins.hdfs.QuotedLineBreakAccessorAccessor for TEXT files that have records with embedded linebreaks
org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor

Base class for accessing HDFS files using RecordReaders

org.apache.hawq.pxf.plugins.hdfs.LineBreakAccessorAccessor for TEXT files (replaced the deprecated TextFileAccessor, LineReaderAccessor)
org.apache.hawq.pxf.plugins.hdfs.AvroFileAccessorAccessor for Avro files
org.apache.hawq.pxf.plugins.hdfs.SequenceFileAccessorAccessor for Sequence files
org.apache.hawq.pxf.plugins.hbase.HBaseAccessor Accessor for HBase tables 
org.apache.hawq.pxf.plugins.hive.HiveAccessorAccessor for Hive tables 
org.apache.hawq.pxf.plugins.hive.HiveLineBreakAccessorAccessor for Hive tables stored as text file format
org.apache.hawq.pxf.plugins.hive.HiveRCFileAccessorAccessor for Hive tables stored as RC file format
org.apache.hawq.pxf.plugins.hive.HiveORCAccessorAccessor for Hive tables stored as ORC format
org.apache.hawq.pxf.plugins.hive.HiveORCVectorizedAccessorAccessor for Hive tables stored as ORC format
org.apache.hawq.pxf.plugins.json.JsonAccessorAccessor for JSON files

The class must extend the org.apache.hawq.pxf.Plugin class, and implement one or both of the interfaces:

  • org.apache.hawq.pxf.api.ReadAccessor
  • org.apache.hawq.pxf.api.WriteAccessor
  1. package org.apache.hawq.pxf.api;
  2. /*
  3. * Internal interface that defines the access to data on the source
  4. * data store (e.g, a file on HDFS, a region of an HBase table, etc).
  5. * All classes that implement actual access to such data sources must
  6. * respect this interface
  7. */
  8. public interface ReadAccessor {
  9. boolean openForRead() throws Exception;
  10. OneRow readNextObject() throws Exception;
  11. void closeForRead() throws Exception;
  12. }
  1. package org.apache.hawq.pxf.api;
  2. /*
  3. * An interface for writing data into a data store
  4. * (e.g, a sequence file on HDFS).
  5. * All classes that implement actual access to such data sources must
  6. * respect this interface
  7. */
  8. public interface WriteAccessor {
  9. boolean openForWrite() throws Exception;
  10. OneRow writeNextObject(OneRow onerow) throws Exception;
  11. void closeForWrite() throws Exception;
  12. }

The Accessor calls openForRead() to read existing data. After reading the data, it calls closeForRead(). readNextObject() returns one of the following:

  • a single record, encapsulated in a OneRow object
  • null if it reaches EOF

The Accessor calls openForWrite() to write data out. After writing the data, it writes a OneRow object with writeNextObject(), and when done calls closeForWrite(). OneRow represents a key-value item.

org.apache.hawq.pxf.api.OneRow

  1. package org.apache.hawq.pxf.api;
  2. /*
  3. * Represents one row in the external system data store. Supports
  4. * the general case where one row contains both a record and a
  5. * separate key like in the HDFS key/value model for MapReduce
  6. * (Example: HDFS sequence file)
  7. */
  8. public class OneRow {
  9. /*
  10. * Default constructor
  11. */
  12. public OneRow();
  13. /*
  14. * Constructor sets key and data
  15. */
  16. public OneRow(Object inKey, Object inData);
  17. /*
  18. * Setter for key
  19. */
  20. public void setKey(Object inKey);
  21. /*
  22. * Setter for data
  23. */
  24. public void setData(Object inData);
  25. /*
  26. * Accessor for key
  27. */
  28. public Object getKey();
  29. /*
  30. * Accessor for data
  31. */
  32. public Object getData();
  33. /*
  34. * Show content
  35. */
  36. public String toString();
  37. }