5.5. Hive Connector

Overview

The Hive connector allows querying data stored in a Hivedata warehouse. Hive is a combination of three components:

  • Data files in varying formats that are typically stored in theHadoop Distributed File System (HDFS) or in Amazon S3.
  • Metadata about how the data files are mapped to schemas and tables.This metadata is stored in a database such as MySQL and is accessedvia the Hive metastore service.
  • A query language called HiveQL. This query language is executedon a distributed computing framework such as MapReduce or Tez.Presto only uses the first two components: the data and the metadata.It does not use HiveQL or any part of Hive’s execution environment.

Supported File Types

The following file types are supported for the Hive connector:

  • ORC
  • Parquet
  • Avro
  • RCFile
  • SequenceFile
  • JSON
  • Text

Configuration

The Hive connector supports Apache Hadoop 2.x and derivative distributionsincluding Cloudera CDH 5 and Hortonworks Data Platform (HDP).

Create etc/catalog/hive.properties with the following contentsto mount the hive-hadoop2 connector as the hive catalog,replacing example.net:9083 with the correct host and portfor your Hive metastore Thrift service:

  1. connector.name=hive-hadoop2
  2. hive.metastore.uri=thrift://example.net:9083

Multiple Hive Clusters

You can have as many catalogs as you need, so if you have additionalHive clusters, simply add another properties file to etc/catalogwith a different name (making sure it ends in .properties). Forexample, if you name the property file sales.properties, Prestowill create a catalog named sales using the configured connector.

HDFS Configuration

For basic setups, Presto configures the HDFS client automatically anddoes not require any configuration files. In some cases, such as when usingfederated HDFS or NameNode high availability, it is necessary to specifyadditional HDFS client options in order to access your HDFS cluster. To do so,add the hive.config.resources property to reference your HDFS config files:

  1. hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml

Only specify additional configuration files if necessary for your setup.We also recommend reducing the configuration files to have the minimumset of required properties, as additional properties may cause problems.

The configuration files must exist on all Presto nodes. If you arereferencing existing Hadoop config files, make sure to copy them toany Presto nodes that are not running Hadoop.

HDFS Username

When not using Kerberos with HDFS, Presto will access HDFS using theOS user of the Presto process. For example, if Presto is running asnobody, it will access HDFS as nobody. You can override thisusername by setting the HADOOP_USER_NAME system property in thePresto JVM Config, replacing hdfs_user with theappropriate username:

  1. -DHADOOP_USER_NAME=hdfs_user

Accessing Hadoop clusters protected with Kerberos authentication

Kerberos authentication is supported for both HDFS and the Hive metastore.However, Kerberos authentication by ticket cache is not yet supported.

The properties that apply to Hive connector security are listed in theHive Configuration Properties table. Please see theHive Security Configuration section for a more detailed discussion of thesecurity options in the Hive connector.

Hive Configuration Properties

Property NameDescriptionDefault
hive.metastore.uriThe URI(s) of the Hive metastore to connect to using theThrift protocol. If multiple URIs are provided, the firstURI is used by default and the rest of the URIs arefallback metastores. This property is required.Example: thrift://192.0.2.3:9083 orthrift://192.0.2.3:9083,thrift://192.0.2.4:9083
hive.metastore.usernameThe username Presto will use to access the Hive metastore.
hive.config.resourcesAn optional comma-separated list of HDFSconfiguration files. These files must exist on themachines running Presto. Only specify this ifabsolutely necessary to access HDFS.Example: /etc/hdfs-site.xml
hive.storage-formatThe default file format used when creating new tables.ORC
hive.compression-codecThe compression codec to use when writing files.GZIP
hive.force-local-schedulingForce splits to be scheduled on the same node as the HadoopDataNode process serving the split data. This is useful forinstallations where Presto is collocated with everyDataNode.false
hive.respect-table-formatShould new partitions be written using the existing tableformat or the default Presto format?true
hive.immutable-partitionsCan new data be inserted into existing partitions?false
hive.max-partitions-per-writersMaximum number of partitions per writer.100
hive.max-partitions-per-scanMaximum number of partitions for a single table scan.100,000
hive.metastore.authentication.typeHive metastore authentication type.Possible values are NONE or KERBEROS.NONE
hive.metastore.service.principalThe Kerberos principal of the Hive metastore service.
hive.metastore.client.principalThe Kerberos principal that Presto will use when connectingto the Hive metastore service.
hive.metastore.client.keytabHive metastore client keytab location.
hive.hdfs.authentication.typeHDFS authentication type.Possible values are NONE or KERBEROS.NONE
hive.hdfs.impersonation.enabledEnable HDFS end user impersonation.false
hive.hdfs.presto.principalThe Kerberos principal that Presto will use when connectingto HDFS.
hive.hdfs.presto.keytabHDFS client keytab location.
hive.securitySee Hive Security Configuration.
security.config-filePath of config file to use when hive.security=file.See File Based Authorization for details.
hive.non-managed-table-writes-enabledEnable writes to non-managed (external) Hive tables.false
hive.non-managed-table-creates-enabledEnable creating non-managed (external) Hive tables.true
hive.collect-column-statistics-on-writeEnables automatic column level statistics collectionon write. See Table Statistics fordetails.false
hive.s3select-pushdown.enabledEnable query pushdown to AWS S3 Select service.false
hive.s3select-pushdown.max-connectionsMaximum number of simultaneously open connections to S3 forS3SelectPushdown.500

Amazon S3 Configuration

The Hive Connector can read and write tables that are stored in S3.This is accomplished by having a table or database location thatuses an S3 prefix rather than an HDFS prefix.

Presto uses its own S3 filesystem for the URI prefixess3://, s3n:// and s3a://.

S3 Configuration Properties

Property NameDescription
hive.s3.use-instance-credentialsUse the EC2 metadata service to retrieve API credentials(defaults to true). This works with IAM roles in EC2.
hive.s3.aws-access-keyDefault AWS access key to use.
hive.s3.aws-secret-keyDefault AWS secret key to use.
hive.s3.endpointThe S3 storage endpoint server. This can be used toconnect to an S3-compatible storage system insteadof AWS. When using v4 signatures, it is recommended toset this to the AWS region-specific endpoint(e.g., http[s]://<bucket>.s3-<AWS-region>.amazonaws.com).
hive.s3.signer-typeSpecify a different signer type for S3-compatible storage.Example: S3SignerType for v2 signer type
hive.s3.path-style-accessUse path-style access for all requests to the S3-compatible storage.This is for S3-compatible storage that doesn’t support virtual-hosted-style access.(defaults to false)
hive.s3.staging-directoryLocal staging directory for data written to S3.This defaults to the Java temporary directory specifiedby the JVM system property java.io.tmpdir.
hive.s3.pin-client-to-current-regionPin S3 requests to the same region as the EC2instance where Presto is running (defaults to false).
hive.s3.ssl.enabledUse HTTPS to communicate with the S3 API (defaults to true).
hive.s3.sse.enabledUse S3 server-side encryption (defaults to false).
hive.s3.sse.typeThe type of key management for S3 server-side encryption.Use S3 for S3 managed or KMS for KMS-managed keys(defaults to S3).
hive.s3.sse.kms-key-idThe KMS Key ID to use for S3 server-side encryption withKMS-managed keys. If not set, the default key is used.
hive.s3.kms-key-idIf set, use S3 client-side encryption and use the AWSKMS to store encryption keys and use the value ofthis property as the KMS Key ID for newly createdobjects.
hive.s3.encryption-materials-providerIf set, use S3 client-side encryption and use thevalue of this property as the fully qualified name ofa Java class which implements the AWS SDK’sEncryptionMaterialsProvider interface. If theclass also implements Configurable from the HadoopAPI, the Hadoop configuration will be passed in afterthe object has been created.
hive.s3.upload-acl-typeCanned ACL to use while uploading files to S3 (defaultsto Private).
hive.s3.skip-glacier-objectsIgnore Glacier objects rather than failing the query. Thiswill skip data that may be expected to be part of the tableor partition. Defaults to false.

S3 Credentials

If you are running Presto on Amazon EC2 using EMR or another facility,it is highly recommended that you set hive.s3.use-instance-credentialsto true and use IAM Roles for EC2 to govern access to S3. If this isthe case, your EC2 instances will need to be assigned an IAM Role whichgrants appropriate access to the data stored in the S3 bucket(s) you wishto use. This is much cleaner than setting AWS access and secret keys inthe hive.s3.aws-access-key and hive.s3.aws-secret-key settings, and alsoallows EC2 to automatically rotate credentials on a regular basis withoutany additional work on your part.

Custom S3 Credentials Provider

You can configure a custom S3 credentials provider by setting the Hadoopconfiguration property presto.s3.credentials-provider to be thefully qualified class name of a custom AWS credentials providerimplementation. This class must implement theAWSCredentialsProviderinterface and provide a two-argument constructor that takes ajava.net.URI and a Hadoop org.apache.hadoop.conf.Configurationas arguments. A custom credentials provider can be used to providetemporary credentials from STS (using STSSessionCredentialsProvider),IAM role-based credentials (using STSAssumeRoleSessionCredentialsProvider),or credentials for a specific use case (e.g., bucket/user specific credentials).This Hadoop configuration property must be set in the Hadoop configurationfiles referenced by the hive.config.resources Hive connector property.

Tuning Properties

The following tuning properties affect the behavior of the clientused by the Presto S3 filesystem when communicating with S3.Most of these parameters affect settings on the ClientConfigurationobject associated with the AmazonS3Client.

Property NameDescriptionDefault
hive.s3.max-error-retriesMaximum number of error retries, set on the S3 client.10
hive.s3.max-client-retriesMaximum number of read attempts to retry.5
hive.s3.max-backoff-timeUse exponential backoff starting at 1 second up tothis maximum value when communicating with S3.10 minutes
hive.s3.max-retry-timeMaximum time to retry communicating with S3.10 minutes
hive.s3.connect-timeoutTCP connect timeout.5 seconds
hive.s3.socket-timeoutTCP socket read timeout.5 seconds
hive.s3.max-connectionsMaximum number of simultaneous open connections to S3.500
hive.s3.multipart.min-file-sizeMinimum file size before multi-part upload to S3 is used.16 MB
hive.s3.multipart.min-part-sizeMinimum multi-part upload part size.5 MB

S3 Data Encryption

Presto supports reading and writing encrypted data in S3 using bothserver-side encryption with S3 managed keys and client-side encryption usingeither the Amazon KMS or a software plugin to manage AES encryption keys.

With S3 server-side encryption,(called SSE-S3 in the Amazon documentation) the S3 infrastructure takes care of all encryption and decryptionwork (with the exception of SSL to the client, assuming you have hive.s3.ssl.enabled set to true).S3 also manages all the encryption keys for you. To enable this, set hive.s3.sse.enabled to true.

With S3 client-side encryption,S3 stores encrypted data and the encryption keys are managed outside of the S3 infrastructure. Data is encryptedand decrypted by Presto instead of in the S3 infrastructure. In this case, encryption keys can be managedeither by using the AWS KMS or your own key management system. To use the AWS KMS for key management, sethive.s3.kms-key-id to the UUID of a KMS key. Your AWS credentials or EC2 IAM role will need to begranted permission to use the given key as well.

To use a custom encryption key management system, set hive.s3.encryption-materials-provider to thefully qualified name of a class which implements theEncryptionMaterialsProviderinterface from the AWS Java SDK. This class will have to be accessible to the Hive Connector through theclasspath and must be able to communicate with your custom key management system. If this class also implementsthe org.apache.hadoop.conf.Configurable interface from the Hadoop Java API, then the Hadoop configurationwill be passed in after the object instance is created and before it is asked to provision or retrieve anyencryption keys.

S3SelectPushdown

S3SelectPushdown enables pushing down projection (SELECT) and predicate (WHERE)processing to S3 Select.With S3SelectPushdown Presto only retrieves the required data from S3 instead ofentire S3 objects reducing both latency and network usage.

Is S3 Select a good fit for my workload?

Performance of S3SelectPushdown depends on the amount of data filtered by thequery. Filtering a large number of rows should result in better performance. Ifthe query doesn’t filter any data then pushdown may not add any additional valueand user will be charged for S3 Select requests. Thus, we recommend that youbenchmark your workloads with and without S3 Select to see if using it may besuitable for your workload. By default, S3SelectPushdown is disabled and youshould enable it in production after proper benchmarking and cost analysis. Formore information on S3 Select request cost, please seeAmazon S3 Cloud Storage Pricing.

Use the following guidelines to determine if S3 Select is a good fit for yourworkload:

  • Your query filters out more than half of the original data set.
  • Your query filter predicates use columns that have a data type supported byPresto and S3 Select.The TIMESTAMP, REAL, and DOUBLE data types are not supported by S3Select Pushdown. We recommend using the decimal data type for numerical data.For more information about supported data types for S3 Select, see theData Types documentation.
  • Your network connection between Amazon S3 and the Amazon EMR cluster has goodtransfer speed and available bandwidth. Amazon S3 Select does not compressHTTP responses, so the response size may increase for compressed input files.

Considerations and Limitations

  • Only objects stored in CSV format are supported. Objects can be uncompressedor optionally compressed with gzip or bzip2.
  • The “AllowQuotedRecordDelimiters” property is not supported. If this propertyis specified, the query fails.
  • Amazon S3 server-side encryption with customer-provided encryption keys(SSE-C) and client-side encryption are not supported.
  • S3 Select Pushdown is not a substitute for using columnar or compressed fileformats such as ORC and Parquet.

Enabling S3 Select Pushdown

You can enable S3 Select Pushdown using the s3_select_pushdown_enabledHive session property or using the hive.s3select-pushdown.enabledconfiguration property. The session property will override the configproperty, allowing you enable or disable on a per-query basis.

Understanding and Tuning the Maximum Connections

Presto can use its native S3 file system or EMRFS. When using the native FS, themaximum connections is configured via the hive.s3.max-connectionsconfiguration property. When using EMRFS, the maximum connections is configuredvia the fs.s3.maxConnections Hadoop configuration property.

S3 Select Pushdown bypasses the file systems when accessing Amazon S3 forpredicate operations. In this case, the value ofhive.s3select-pushdown.max-connections determines the maximum number ofclient connections allowed for those operations from worker nodes.

If your workload experiences the error Timeout waiting for connection frompool, increase the value of both hive.s3select-pushdown.max-connections andthe maximum connections configuration for the file system you are using.

Table Statistics

The Hive connector automatically collects basic statistics(numFiles', ``numRows, rawDataSize, totalSize)on INSERT and CREATE TABLE AS operations.

The Hive connector can also collect column level statistics:

Column TypeCollectible Statistics
TINYINTnumber of nulls, number of distinct values, min/max values
SMALLINTnumber of nulls, number of distinct values, min/max values
INTEGERnumber of nulls, number of distinct values, min/max values
BIGINTnumber of nulls, number of distinct values, min/max values
DOUBLEnumber of nulls, number of distinct values, min/max values
REALnumber of nulls, number of distinct values, min/max values
DECIMALnumber of nulls, number of distinct values, min/max values
DATEnumber of nulls, number of distinct values, min/max values
TIMESTAMPnumber of nulls, number of distinct values, min/max values
VARCHARnumber of nulls, number of distinct values
CHARnumber of nulls, number of distinct values
VARBINARYnumber of nulls
BOOLEANnumber of nulls, number of true/false values

Automatic column level statistics collection on write is controlled bythe collect-column-statistics-on-write catalog session property.

Collecting table and column statistics

The Hive connector supports collection of table and partition statisticsvia the ANALYZE statement. When analyzing a partitioned table,the partitions to analyze can be specified via the optional partitionsproperty, which is an array containing the values of the partition keysin the order they are declared in the table schema:

  1. ANALYZE hive.sales WITH (
  2. partitions = ARRAY[
  3. ARRAY['partition1_value1', 'partition1_value2'],
  4. ARRAY['partition2_value1', 'partition2_value2']]);

This query will collect statistics for 2 partitions with keys:

  • partition1_value1, partition1_value2
  • partition2_value1, partition2_value2

Schema Evolution

Hive allows the partitions in a table to have a different schema than thetable. This occurs when the column types of a table are changed afterpartitions already exist (that use the original column types). The Hiveconnector supports this by allowing the same conversions as Hive:

  • varchar to and from tinyint, smallint, integer and bigint
  • real to double
  • Widening conversions for integers, such as tinyint to smallintAny conversion failure will result in null, which is the same behavioras Hive. For example, converting the string 'foo' to a number,or converting the string '1234' to a tinyint (which has amaximum value of 127).

Avro Schema Evolution

Presto supports querying and manipulating Hive tables with Avro storage format which has the schema setbased on an Avro schema file/literal. It is also possible to create tables in Presto which infers the schemafrom a valid Avro schema file located locally or remotely in HDFS/Web server.

To specify that Avro schema should be used for interpreting table’s data one must use avro_schema_url table property.The schema can be placed remotely inHDFS (e.g. avro_schema_url = 'hdfs://user/avro/schema/avro_data.avsc'),S3 (e.g. avro_schema_url = 's3n:///schema_bucket/schema/avro_data.avsc'),a web server (e.g. avro_schema_url = 'http://example.org/schema/avro_data.avsc&#39;)as well as local file system. This url where the schema is located, must be accessible from theHive metastore and Presto coordinator/worker nodes.

The table created in Presto using avro_schema_url behaves the same way as a Hive table with avro.schema.url or avro.schema.literal set.

Example:

  1. CREATE TABLE hive.avro.avro_data (
  2. id bigint
  3. )
  4. WITH (
  5. format = 'AVRO',
  6. avro_schema_url = '/usr/local/avro_data.avsc'
  7. )

The columns listed in the DDL (id in the above example) will be ignored if avro_schema_url is specified.The table schema will match the schema in the Avro schema file. Before any read operation, the Avro schema isaccessed so query result reflects any changes in schema. Thus Presto takes advantage of Avro’s backward compatibility abilities.

If the schema of the table changes in the Avro schema file, the new schema can still be used to read old data.Newly added/renamed fields must have a default value in the Avro schema file.

The schema evolution behavior is as follows:

  • Column added in new schema:Data created with an older schema will produce a default value when table is using the new schema.
  • Column removed in new schema:Data created with an older schema will no longer output the data from the column that was removed.
  • Column is renamed in the new schema:This is equivalent to removing the column and adding a new one, and data created with an older schemawill produce a default value when table is using the new schema.
  • Changing type of column in the new schema:If the type coercion is supported by Avro or the Hive connector, then the conversion happens.An error is thrown for incompatible types.

Limitations

The following operations are not supported when avro_schema_url is set:

  • CREATE TABLE AS is not supported.
  • Using partitioning(partitioned_by) or bucketing(bucketed_by) columns are not supported in CREATE TABLE.
  • ALTER TABLE commands modifying columns are not supported.

Procedures

  • system.create_empty_partition(schema_name, table_name, partition_columns, partition_values)

Create an empty partition in the specified table.

Examples

The Hive connector supports querying and manipulating Hive tables and schemas(databases). While some uncommon operations will need to be performed usingHive directly, most operations can be performed using Presto.

Create a new Hive schema named web that will store tables in anS3 bucket named my-bucket:

  1. CREATE SCHEMA hive.web
  2. WITH (location = 's3://my-bucket/')

Create a new Hive table named page_views in the web schemathat is stored using the ORC file format, partitioned by date andcountry, and bucketed by user into 50 buckets (note that Hiverequires the partition columns to be the last columns in the table):

  1. CREATE TABLE hive.web.page_views (
  2. view_time timestamp,
  3. user_id bigint,
  4. page_url varchar,
  5. ds date,
  6. country varchar
  7. )
  8. WITH (
  9. format = 'ORC',
  10. partitioned_by = ARRAY['ds', 'country'],
  11. bucketed_by = ARRAY['user_id'],
  12. bucket_count = 50
  13. )

Drop a partition from the page_views table:

  1. DELETE FROM hive.web.page_views
  2. WHERE ds = DATE '2016-08-09'
  3. AND country = 'US'

Add an empty partition to the page_views table:

  1. CALL system.create_empty_partition(
  2. schema_name => 'web',
  3. table_name => 'page_views',
  4. partition_columns => ARRAY['ds', 'country'],
  5. partition_values => ARRAY['2016-08-09', 'US']);

Query the page_views table:

  1. SELECT * FROM hive.web.page_views

List the partitions of the page_views table:

  1. SELECT * FROM hive.web."page_views$partitions"

Create an external Hive table named request_logs that points atexisting data in S3:

  1. CREATE TABLE hive.web.request_logs (
  2. request_time timestamp,
  3. url varchar,
  4. ip varchar,
  5. user_agent varchar
  6. )
  7. WITH (
  8. format = 'TEXTFILE',
  9. external_location = 's3://my-bucket/data/logs/'
  10. )

Drop the external table request_logs. This only drops the metadatafor the table. The referenced data directory is not deleted:

  1. DROP TABLE hive.web.request_logs

Drop a schema:

  1. DROP SCHEMA hive.web

Hive Connector Limitations

DELETE is only supported if the WHERE clause matches entire partitions.