Docker Demo

A Demo using docker containers

Lets use a real world example to see how hudi works end to end. For this purpose, a self contained data infrastructure is brought up in a local docker cluster within your computer.

The steps have been tested on a Mac laptop

Prerequisites

  • Docker Setup : For Mac, Please follow the steps as defined in [https://docs.docker.com/v17.12/docker-for-mac/install/\]. For running Spark-SQL queries, please ensure atleast 6 GB and 4 CPUs are allocated to Docker (See Docker -> Preferences -> Advanced). Otherwise, spark-SQL queries could be killed because of memory issues.
  • kafkacat : A command-line utility to publish/consume from kafka topics. Use brew install kafkacat to install kafkacat
  • /etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts
  1. 127.0.0.1 adhoc-1
  2. 127.0.0.1 adhoc-2
  3. 127.0.0.1 namenode
  4. 127.0.0.1 datanode1
  5. 127.0.0.1 hiveserver
  6. 127.0.0.1 hivemetastore
  7. 127.0.0.1 kafkabroker
  8. 127.0.0.1 sparkmaster
  9. 127.0.0.1 zookeeper

Also, this has not been tested on some environments like Docker on Windows.

Setting up Docker Cluster

Build Hudi

The first step is to build hudi

  1. cd <HUDI_WORKSPACE>
  2. mvn package -DskipTests

Bringing up Demo Cluster

The next step is to run the docker compose script and setup configs for bringing up the cluster. This should pull the docker images from docker hub and setup docker cluster.

  1. cd docker
  2. ./setup_demo.sh
  3. ....
  4. ....
  5. ....
  6. Stopping spark-worker-1 ... done
  7. Stopping hiveserver ... done
  8. Stopping hivemetastore ... done
  9. Stopping historyserver ... done
  10. .......
  11. ......
  12. Creating network "hudi_demo" with the default driver
  13. Creating hive-metastore-postgresql ... done
  14. Creating namenode ... done
  15. Creating zookeeper ... done
  16. Creating kafkabroker ... done
  17. Creating hivemetastore ... done
  18. Creating historyserver ... done
  19. Creating hiveserver ... done
  20. Creating datanode1 ... done
  21. Creating presto-coordinator-1 ... done
  22. Creating sparkmaster ... done
  23. Creating presto-worker-1 ... done
  24. Creating adhoc-1 ... done
  25. Creating adhoc-2 ... done
  26. Creating spark-worker-1 ... done
  27. Copying spark default config and setting up configs
  28. Copying spark default config and setting up configs
  29. Copying spark default config and setting up configs
  30. $ docker ps

At this point, the docker cluster will be up and running. The demo cluster brings up the following services

  • HDFS Services (NameNode, DataNode)
  • Spark Master and Worker
  • Hive Services (Metastore, HiveServer2 along with PostgresDB)
  • Kafka Broker and a Zookeeper Node (Kafka will be used as upstream source for the demo)
  • Adhoc containers to run Hudi/Hive CLI commands

Demo

Stock Tracker data will be used to showcase both different Hudi Views and the effects of Compaction.

Take a look at the directory docker/demo/data. There are 2 batches of stock data - each at 1 minute granularity. The first batch contains stocker tracker data for some stock symbols during the first hour of trading window (9:30 a.m to 10:30 a.m). The second batch contains tracker data for next 30 mins (10:30 - 11 a.m). Hudi will be used to ingest these batches to a dataset which will contain the latest stock tracker data at hour level granularity. The batches are windowed intentionally so that the second batch contains updates to some of the rows in the first batch.

Step 1 : Publish the first batch to Kafka

Upload the first batch to Kafka topic ‘stock ticks’ cat docker/demo/data/batch_1.json | kafkacat -b kafkabroker -t stock_ticks -P

To check if the new topic shows up, use

  1. kafkacat -b kafkabroker -L -J | jq .
  2. {
  3. "originating_broker": {
  4. "id": 1001,
  5. "name": "kafkabroker:9092/1001"
  6. },
  7. "query": {
  8. "topic": "*"
  9. },
  10. "brokers": [
  11. {
  12. "id": 1001,
  13. "name": "kafkabroker:9092"
  14. }
  15. ],
  16. "topics": [
  17. {
  18. "topic": "stock_ticks",
  19. "partitions": [
  20. {
  21. "partition": 0,
  22. "leader": 1001,
  23. "replicas": [
  24. {
  25. "id": 1001
  26. }
  27. ],
  28. "isrs": [
  29. {
  30. "id": 1001
  31. }
  32. ]
  33. }
  34. ]
  35. }
  36. ]
  37. }

Step 2: Incrementally ingest data from Kafka topic

Hudi comes with a tool named DeltaStreamer. This tool can connect to variety of data sources (including Kafka) to pull changes and apply to Hudi dataset using upsert/insert primitives. Here, we will use the tool to download json data from kafka topic and ingest to both COW and MOR tables we initialized in the previous step. This tool automatically initializes the datasets in the file-system if they do not exist yet.

  1. docker exec -it adhoc-2 /bin/bash
  2. # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow dataset in HDFS
  3. spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
  4. # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor dataset in HDFS
  5. spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
  6. # As part of the setup (Look at setup_demo.sh), the configs needed for DeltaStreamer is uploaded to HDFS. The configs
  7. # contain mostly Kafa connectivity settings, the avro-schema to be used for ingesting along with key and partitioning fields.
  8. exit

You can use HDFS web-browser to look at the datasets http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_cow.

You can explore the new partition folder created in the dataset along with a “deltacommit” file under .hoodie which signals a successful commit.

There will be a similar setup when you browse the MOR dataset http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_mor

Step 3: Sync with Hive

At this step, the datasets are available in HDFS. We need to sync with Hive to create new Hive tables and add partitions inorder to run Hive queries against those datasets.

  1. docker exec -it adhoc-2 /bin/bash
  2. # THis command takes in HIveServer URL and COW Hudi Dataset location in HDFS and sync the HDFS state to Hive
  3. /var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
  4. .....
  5. 2018-09-24 22:22:45,568 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_cow
  6. .....
  7. # Now run hive-sync for the second data-set in HDFS using Merge-On-Read (MOR storage)
  8. /var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor
  9. ...
  10. 2018-09-24 22:23:09,171 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor
  11. ...
  12. 2018-09-24 22:23:09,559 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor_rt
  13. ....
  14. exit

After executing the above command, you will notice

  1. A hive table named stock_ticks_cow created which provides Read-Optimized view for the Copy On Write dataset.
  2. Two new tables stock_ticks_mor and stock_ticks_mor_rt created for the Merge On Read dataset. The former provides the ReadOptimized view for the Hudi dataset and the later provides the realtime-view for the dataset.

Step 4 (a): Run Hive Queries

Run a hive query to find the latest timestamp ingested for stock symbol ‘GOOG’. You will notice that both read-optimized (for both COW and MOR dataset)and realtime views (for MOR dataset)give the same value “10:29 a.m” as Hudi create a parquet file for the first batch of data.

  1. docker exec -it adhoc-2 /bin/bash
  2. beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
  3. # List Tables
  4. 0: jdbc:hive2://hiveserver:10000> show tables;
  5. +---------------------+--+
  6. | tab_name |
  7. +---------------------+--+
  8. | stock_ticks_cow |
  9. | stock_ticks_mor |
  10. | stock_ticks_mor_rt |
  11. +---------------------+--+
  12. 2 rows selected (0.801 seconds)
  13. 0: jdbc:hive2://hiveserver:10000>
  14. # Look at partitions that were added
  15. 0: jdbc:hive2://hiveserver:10000> show partitions stock_ticks_mor_rt;
  16. +----------------+--+
  17. | partition |
  18. +----------------+--+
  19. | dt=2018-08-31 |
  20. +----------------+--+
  21. 1 row selected (0.24 seconds)
  22. # COPY-ON-WRITE Queries:
  23. =========================
  24. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
  25. +---------+----------------------+--+
  26. | symbol | _c1 |
  27. +---------+----------------------+--+
  28. | GOOG | 2018-08-31 10:29:00 |
  29. +---------+----------------------+--+
  30. Now, run a projection query:
  31. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
  32. +----------------------+---------+----------------------+---------+------------+-----------+--+
  33. | _hoodie_commit_time | symbol | ts | volume | open | close |
  34. +----------------------+---------+----------------------+---------+------------+-----------+--+
  35. | 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  36. | 20180924221953 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
  37. +----------------------+---------+----------------------+---------+------------+-----------+--+
  38. # Merge-On-Read Queries:
  39. ==========================
  40. Lets run similar queries against M-O-R dataset. Lets look at both
  41. ReadOptimized and Realtime views supported by M-O-R dataset
  42. # Run against ReadOptimized View. Notice that the latest timestamp is 10:29
  43. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
  44. WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  45. +---------+----------------------+--+
  46. | symbol | _c1 |
  47. +---------+----------------------+--+
  48. | GOOG | 2018-08-31 10:29:00 |
  49. +---------+----------------------+--+
  50. 1 row selected (6.326 seconds)
  51. # Run against Realtime View. Notice that the latest timestamp is again 10:29
  52. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
  53. WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  54. +---------+----------------------+--+
  55. | symbol | _c1 |
  56. +---------+----------------------+--+
  57. | GOOG | 2018-08-31 10:29:00 |
  58. +---------+----------------------+--+
  59. 1 row selected (1.606 seconds)
  60. # Run projection query against Read Optimized and Realtime tables
  61. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
  62. +----------------------+---------+----------------------+---------+------------+-----------+--+
  63. | _hoodie_commit_time | symbol | ts | volume | open | close |
  64. +----------------------+---------+----------------------+---------+------------+-----------+--+
  65. | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  66. | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
  67. +----------------------+---------+----------------------+---------+------------+-----------+--+
  68. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
  69. +----------------------+---------+----------------------+---------+------------+-----------+--+
  70. | _hoodie_commit_time | symbol | ts | volume | open | close |
  71. +----------------------+---------+----------------------+---------+------------+-----------+--+
  72. | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  73. | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
  74. +----------------------+---------+----------------------+---------+------------+-----------+--+
  75. exit
  76. exit

Step 4 (b): Run Spark-SQL Queries

Hudi support Spark as query processor just like Hive. Here are the same hive queries running in spark-sql

  1. docker exec -it adhoc-1 /bin/bash
  2. $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --master local[2] --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0
  3. ...
  4. Welcome to
  5. ____ __
  6. / __/__ ___ _____/ /__
  7. _\ \/ _ \/ _ `/ __/ '_/
  8. /___/ .__/\_,_/_/ /_/\_\ version 2.3.1
  9. /_/
  10. Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
  11. Type in expressions to have them evaluated.
  12. Type :help for more information.
  13. scala>
  14. scala> spark.sql("show tables").show(100, false)
  15. +--------+------------------+-----------+
  16. |database|tableName |isTemporary|
  17. +--------+------------------+-----------+
  18. |default |stock_ticks_cow |false |
  19. |default |stock_ticks_mor |false |
  20. |default |stock_ticks_mor_rt|false |
  21. +--------+------------------+-----------+
  22. # Copy-On-Write Table
  23. ## Run max timestamp query against COW table
  24. scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
  25. [Stage 0:> (0 + 1) / 1]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  26. SLF4J: Defaulting to no-operation (NOP) logger implementation
  27. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  28. +------+-------------------+
  29. |symbol|max(ts) |
  30. +------+-------------------+
  31. |GOOG |2018-08-31 10:29:00|
  32. +------+-------------------+
  33. ## Projection Query
  34. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false)
  35. +-------------------+------+-------------------+------+---------+--------+
  36. |_hoodie_commit_time|symbol|ts |volume|open |close |
  37. +-------------------+------+-------------------+------+---------+--------+
  38. |20180924221953 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
  39. |20180924221953 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
  40. +-------------------+------+-------------------+------+---------+--------+
  41. # Merge-On-Read Queries:
  42. ==========================
  43. Lets run similar queries against M-O-R dataset. Lets look at both
  44. ReadOptimized and Realtime views supported by M-O-R dataset
  45. # Run against ReadOptimized View. Notice that the latest timestamp is 10:29
  46. scala> spark.sql("select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'").show(100, false)
  47. +------+-------------------+
  48. |symbol|max(ts) |
  49. +------+-------------------+
  50. |GOOG |2018-08-31 10:29:00|
  51. +------+-------------------+
  52. # Run against Realtime View. Notice that the latest timestamp is again 10:29
  53. scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
  54. +------+-------------------+
  55. |symbol|max(ts) |
  56. +------+-------------------+
  57. |GOOG |2018-08-31 10:29:00|
  58. +------+-------------------+
  59. # Run projection query against Read Optimized and Realtime tables
  60. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'").show(100, false)
  61. +-------------------+------+-------------------+------+---------+--------+
  62. |_hoodie_commit_time|symbol|ts |volume|open |close |
  63. +-------------------+------+-------------------+------+---------+--------+
  64. |20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
  65. |20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
  66. +-------------------+------+-------------------+------+---------+--------+
  67. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
  68. +-------------------+------+-------------------+------+---------+--------+
  69. |_hoodie_commit_time|symbol|ts |volume|open |close |
  70. +-------------------+------+-------------------+------+---------+--------+
  71. |20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
  72. |20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
  73. +-------------------+------+-------------------+------+---------+--------+

Step 4 (c): Run Presto Queries

Here are the Presto queries for similar Hive and Spark queries. Currently, Hudi does not support Presto queries on realtime views.

  1. docker exec -it presto-worker-1 presto --server presto-coordinator-1:8090
  2. presto> show catalogs;
  3. Catalog
  4. -----------
  5. hive
  6. jmx
  7. localfile
  8. system
  9. (4 rows)
  10. Query 20190817_134851_00000_j8rcz, FINISHED, 1 node
  11. Splits: 19 total, 19 done (100.00%)
  12. 0:04 [0 rows, 0B] [0 rows/s, 0B/s]
  13. presto> use hive.default;
  14. USE
  15. presto:default> show tables;
  16. Table
  17. --------------------
  18. stock_ticks_cow
  19. stock_ticks_mor
  20. stock_ticks_mor_rt
  21. (3 rows)
  22. Query 20190822_181000_00001_segyw, FINISHED, 2 nodes
  23. Splits: 19 total, 19 done (100.00%)
  24. 0:05 [3 rows, 99B] [0 rows/s, 18B/s]
  25. # COPY-ON-WRITE Queries:
  26. =========================
  27. presto:default> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
  28. symbol | _col1
  29. --------+---------------------
  30. GOOG | 2018-08-31 10:29:00
  31. (1 row)
  32. Query 20190822_181011_00002_segyw, FINISHED, 1 node
  33. Splits: 49 total, 49 done (100.00%)
  34. 0:12 [197 rows, 613B] [16 rows/s, 50B/s]
  35. presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
  36. _hoodie_commit_time | symbol | ts | volume | open | close
  37. ---------------------+--------+---------------------+--------+-----------+----------
  38. 20190822180221 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
  39. 20190822180221 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
  40. (2 rows)
  41. Query 20190822_181141_00003_segyw, FINISHED, 1 node
  42. Splits: 17 total, 17 done (100.00%)
  43. 0:02 [197 rows, 613B] [109 rows/s, 341B/s]
  44. # Merge-On-Read Queries:
  45. ==========================
  46. Lets run similar queries against M-O-R dataset.
  47. # Run against ReadOptimized View. Notice that the latest timestamp is 10:29
  48. presto:default> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
  49. symbol | _col1
  50. --------+---------------------
  51. GOOG | 2018-08-31 10:29:00
  52. (1 row)
  53. Query 20190822_181158_00004_segyw, FINISHED, 1 node
  54. Splits: 49 total, 49 done (100.00%)
  55. 0:02 [197 rows, 613B] [110 rows/s, 343B/s]
  56. presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
  57. _hoodie_commit_time | symbol | ts | volume | open | close
  58. ---------------------+--------+---------------------+--------+-----------+----------
  59. 20190822180250 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
  60. 20190822180250 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
  61. (2 rows)
  62. Query 20190822_181256_00006_segyw, FINISHED, 1 node
  63. Splits: 17 total, 17 done (100.00%)
  64. 0:02 [197 rows, 613B] [92 rows/s, 286B/s]
  65. presto:default> exit

Step 5: Upload second batch to Kafka and run DeltaStreamer to ingest

Upload the second batch of data and ingest this batch using delta-streamer. As this batch does not bring in any new partitions, there is no need to run hive-sync

  1. cat docker/demo/data/batch_2.json | kafkacat -b kafkabroker -t stock_ticks -P
  2. # Within Docker container, run the ingestion command
  3. docker exec -it adhoc-2 /bin/bash
  4. # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow dataset in HDFS
  5. spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
  6. # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor dataset in HDFS
  7. spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
  8. exit

With Copy-On-Write table, the second ingestion by DeltaStreamer resulted in a new version of Parquet file getting created. See http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_cow/2018/08/31

With Merge-On-Read table, the second ingestion merely appended the batch to an unmerged delta (log) file. Take a look at the HDFS filesystem to get an idea: http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_mor/2018/08/31

Step 6(a): Run Hive Queries

With Copy-On-Write table, the read-optimized view immediately sees the changes as part of second batch once the batch got committed as each ingestion creates newer versions of parquet files.

With Merge-On-Read table, the second ingestion merely appended the batch to an unmerged delta (log) file. This is the time, when ReadOptimized and Realtime views will provide different results. ReadOptimized view will still return “10:29 am” as it will only read from the Parquet file. Realtime View will do on-the-fly merge and return latest committed data which is “10:59 a.m”.

  1. docker exec -it adhoc-2 /bin/bash
  2. beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
  3. # Copy On Write Table:
  4. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
  5. WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  6. +---------+----------------------+--+
  7. | symbol | _c1 |
  8. +---------+----------------------+--+
  9. | GOOG | 2018-08-31 10:59:00 |
  10. +---------+----------------------+--+
  11. 1 row selected (1.932 seconds)
  12. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
  13. +----------------------+---------+----------------------+---------+------------+-----------+--+
  14. | _hoodie_commit_time | symbol | ts | volume | open | close |
  15. +----------------------+---------+----------------------+---------+------------+-----------+--+
  16. | 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  17. | 20180924224524 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  18. +----------------------+---------+----------------------+---------+------------+-----------+--+
  19. As you can notice, the above queries now reflect the changes that came as part of ingesting second batch.
  20. # Merge On Read Table:
  21. # Read Optimized View
  22. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
  23. WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  24. +---------+----------------------+--+
  25. | symbol | _c1 |
  26. +---------+----------------------+--+
  27. | GOOG | 2018-08-31 10:29:00 |
  28. +---------+----------------------+--+
  29. 1 row selected (1.6 seconds)
  30. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
  31. +----------------------+---------+----------------------+---------+------------+-----------+--+
  32. | _hoodie_commit_time | symbol | ts | volume | open | close |
  33. +----------------------+---------+----------------------+---------+------------+-----------+--+
  34. | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  35. | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
  36. +----------------------+---------+----------------------+---------+------------+-----------+--+
  37. # Realtime View
  38. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
  39. WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  40. +---------+----------------------+--+
  41. | symbol | _c1 |
  42. +---------+----------------------+--+
  43. | GOOG | 2018-08-31 10:59:00 |
  44. +---------+----------------------+--+
  45. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
  46. +----------------------+---------+----------------------+---------+------------+-----------+--+
  47. | _hoodie_commit_time | symbol | ts | volume | open | close |
  48. +----------------------+---------+----------------------+---------+------------+-----------+--+
  49. | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  50. | 20180924224537 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  51. +----------------------+---------+----------------------+---------+------------+-----------+--+
  52. exit
  53. exit

Step 6(b): Run Spark SQL Queries

Running the same queries in Spark-SQL:

  1. docker exec -it adhoc-1 /bin/bash
  2. bash-4.4# $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --master local[2] --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0
  3. # Copy On Write Table:
  4. scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
  5. +------+-------------------+
  6. |symbol|max(ts) |
  7. +------+-------------------+
  8. |GOOG |2018-08-31 10:59:00|
  9. +------+-------------------+
  10. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false)
  11. +----------------------+---------+----------------------+---------+------------+-----------+--+
  12. | _hoodie_commit_time | symbol | ts | volume | open | close |
  13. +----------------------+---------+----------------------+---------+------------+-----------+--+
  14. | 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  15. | 20180924224524 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  16. +----------------------+---------+----------------------+---------+------------+-----------+--+
  17. As you can notice, the above queries now reflect the changes that came as part of ingesting second batch.
  18. # Merge On Read Table:
  19. # Read Optimized View
  20. scala> spark.sql("select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'").show(100, false)
  21. +---------+----------------------+--+
  22. | symbol | _c1 |
  23. +---------+----------------------+--+
  24. | GOOG | 2018-08-31 10:29:00 |
  25. +---------+----------------------+--+
  26. 1 row selected (1.6 seconds)
  27. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'").show(100, false)
  28. +----------------------+---------+----------------------+---------+------------+-----------+--+
  29. | _hoodie_commit_time | symbol | ts | volume | open | close |
  30. +----------------------+---------+----------------------+---------+------------+-----------+--+
  31. | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  32. | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
  33. +----------------------+---------+----------------------+---------+------------+-----------+--+
  34. # Realtime View
  35. scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
  36. +---------+----------------------+--+
  37. | symbol | _c1 |
  38. +---------+----------------------+--+
  39. | GOOG | 2018-08-31 10:59:00 |
  40. +---------+----------------------+--+
  41. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
  42. +----------------------+---------+----------------------+---------+------------+-----------+--+
  43. | _hoodie_commit_time | symbol | ts | volume | open | close |
  44. +----------------------+---------+----------------------+---------+------------+-----------+--+
  45. | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  46. | 20180924224537 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  47. +----------------------+---------+----------------------+---------+------------+-----------+--+
  48. exit
  49. exit

Step 6(c): Run Presto Queries

Running the same queries on Presto for ReadOptimized views.

  1. docker exec -it presto-worker-1 presto --server presto-coordinator-1:8090
  2. presto> use hive.default;
  3. USE
  4. # Copy On Write Table:
  5. presto:default>select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
  6. symbol | _col1
  7. --------+---------------------
  8. GOOG | 2018-08-31 10:59:00
  9. (1 row)
  10. Query 20190822_181530_00007_segyw, FINISHED, 1 node
  11. Splits: 49 total, 49 done (100.00%)
  12. 0:02 [197 rows, 613B] [125 rows/s, 389B/s]
  13. presto:default>select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
  14. _hoodie_commit_time | symbol | ts | volume | open | close
  15. ---------------------+--------+---------------------+--------+-----------+----------
  16. 20190822180221 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
  17. 20190822181433 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215
  18. (2 rows)
  19. Query 20190822_181545_00008_segyw, FINISHED, 1 node
  20. Splits: 17 total, 17 done (100.00%)
  21. 0:02 [197 rows, 613B] [106 rows/s, 332B/s]
  22. As you can notice, the above queries now reflect the changes that came as part of ingesting second batch.
  23. # Merge On Read Table:
  24. # Read Optimized View
  25. presto:default> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
  26. symbol | _col1
  27. --------+---------------------
  28. GOOG | 2018-08-31 10:29:00
  29. (1 row)
  30. Query 20190822_181602_00009_segyw, FINISHED, 1 node
  31. Splits: 49 total, 49 done (100.00%)
  32. 0:01 [197 rows, 613B] [139 rows/s, 435B/s]
  33. presto:default>select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
  34. _hoodie_commit_time | symbol | ts | volume | open | close
  35. ---------------------+--------+---------------------+--------+-----------+----------
  36. 20190822180250 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
  37. 20190822180250 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
  38. (2 rows)
  39. Query 20190822_181615_00010_segyw, FINISHED, 1 node
  40. Splits: 17 total, 17 done (100.00%)
  41. 0:01 [197 rows, 613B] [154 rows/s, 480B/s]
  42. presto:default> exit

Step 7 : Incremental Query for COPY-ON-WRITE Table

With 2 batches of data ingested, lets showcase the support for incremental queries in Hudi Copy-On-Write datasets

Lets take the same projection query example

  1. docker exec -it adhoc-2 /bin/bash
  2. beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
  3. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
  4. +----------------------+---------+----------------------+---------+------------+-----------+--+
  5. | _hoodie_commit_time | symbol | ts | volume | open | close |
  6. +----------------------+---------+----------------------+---------+------------+-----------+--+
  7. | 20180924064621 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  8. | 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  9. +----------------------+---------+----------------------+---------+------------+-----------+--+

As you notice from the above queries, there are 2 commits - 20180924064621 and 20180924065039 in timeline order. When you follow the steps, you will be getting different timestamps for commits. Substitute them in place of the above timestamps.

To show the effects of incremental-query, let us assume that a reader has already seen the changes as part of ingesting first batch. Now, for the reader to see effect of the second batch, he/she has to keep the start timestamp to the commit time of the first batch (20180924064621) and run incremental query

Hudi incremental mode provides efficient scanning for incremental queries by filtering out files that do not have any candidate rows using hudi-managed metadata.

  1. docker exec -it adhoc-2 /bin/bash
  2. beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
  3. 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL;
  4. No rows affected (0.009 seconds)
  5. 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.max.commits=3;
  6. No rows affected (0.009 seconds)
  7. 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.start.timestamp=20180924064621;

With the above setting, file-ids that do not have any updates from the commit 20180924065039 is filtered out without scanning. Here is the incremental query :

  1. 0: jdbc:hive2://hiveserver:10000>
  2. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '20180924064621';
  3. +----------------------+---------+----------------------+---------+------------+-----------+--+
  4. | _hoodie_commit_time | symbol | ts | volume | open | close |
  5. +----------------------+---------+----------------------+---------+------------+-----------+--+
  6. | 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  7. +----------------------+---------+----------------------+---------+------------+-----------+--+
  8. 1 row selected (0.83 seconds)
  9. 0: jdbc:hive2://hiveserver:10000>

Incremental Query with Spark SQL:

  1. docker exec -it adhoc-1 /bin/bash
  2. bash-4.4# $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --master local[2] --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0
  3. Welcome to
  4. ____ __
  5. / __/__ ___ _____/ /__
  6. _\ \/ _ \/ _ `/ __/ '_/
  7. /___/ .__/\_,_/_/ /_/\_\ version 2.3.1
  8. /_/
  9. Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
  10. Type in expressions to have them evaluated.
  11. Type :help for more information.
  12. scala> import org.apache.hudi.DataSourceReadOptions
  13. import org.apache.hudi.DataSourceReadOptions
  14. # In the below query, 20180925045257 is the first commit's timestamp
  15. scala> val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20180924064621").load("/user/hive/warehouse/stock_ticks_cow")
  16. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  17. SLF4J: Defaulting to no-operation (NOP) logger implementation
  18. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  19. hoodieIncViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 15 more fields]
  20. scala> hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")
  21. warning: there was one deprecation warning; re-run with -deprecation for details
  22. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false);
  23. +----------------------+---------+----------------------+---------+------------+-----------+--+
  24. | _hoodie_commit_time | symbol | ts | volume | open | close |
  25. +----------------------+---------+----------------------+---------+------------+-----------+--+
  26. | 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  27. +----------------------+---------+----------------------+---------+------------+-----------+--+

Step 8: Schedule and Run Compaction for Merge-On-Read dataset

Lets schedule and run a compaction to create a new version of columnar file so that read-optimized readers will see fresher data. Again, You can use Hudi CLI to manually schedule and run compaction

  1. docker exec -it adhoc-1 /bin/bash
  2. root@adhoc-1:/opt# /var/hoodie/ws/hudi-cli/hudi-cli.sh
  3. ============================================
  4. * *
  5. * _ _ _ _ *
  6. * | | | | | | (_) *
  7. * | |__| | __| | - *
  8. * | __ || | / _` | || *
  9. * | | | || || (_| | || *
  10. * |_| |_|\___/ \____/ || *
  11. * *
  12. ============================================
  13. Welcome to Hoodie CLI. Please type help if you are looking for help.
  14. hudi->connect --path /user/hive/warehouse/stock_ticks_mor
  15. 18/09/24 06:59:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  16. 18/09/24 06:59:35 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor
  17. 18/09/24 06:59:35 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]]
  18. 18/09/24 06:59:35 INFO table.HoodieTableConfig: Loading dataset properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties
  19. 18/09/24 06:59:36 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ from /user/hive/warehouse/stock_ticks_mor
  20. Metadata for table stock_ticks_mor loaded
  21. # Ensure no compactions are present
  22. hoodie:stock_ticks_mor->compactions show all
  23. 18/09/24 06:59:54 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924064636__clean__COMPLETED], [20180924064636__deltacommit__COMPLETED], [20180924065057__clean__COMPLETED], [20180924065057__deltacommit__COMPLETED]]
  24. ___________________________________________________________________
  25. | Compaction Instant Time| State | Total FileIds to be Compacted|
  26. |==================================================================|
  27. # Schedule a compaction. This will use Spark Launcher to schedule compaction
  28. hoodie:stock_ticks_mor->compaction schedule
  29. ....
  30. Compaction successfully completed for 20180924070031
  31. # Now refresh and check again. You will see that there is a new compaction requested
  32. hoodie:stock_ticks->connect --path /user/hive/warehouse/stock_ticks_mor
  33. 18/09/24 07:01:16 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor
  34. 18/09/24 07:01:16 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]]
  35. 18/09/24 07:01:16 INFO table.HoodieTableConfig: Loading dataset properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties
  36. 18/09/24 07:01:16 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ from /user/hive/warehouse/stock_ticks_mor
  37. Metadata for table stock_ticks_mor loaded
  38. hoodie:stock_ticks_mor->compactions show all
  39. 18/09/24 06:34:12 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924041125__clean__COMPLETED], [20180924041125__deltacommit__COMPLETED], [20180924042735__clean__COMPLETED], [20180924042735__deltacommit__COMPLETED], [==>20180924063245__compaction__REQUESTED]]
  40. ___________________________________________________________________
  41. | Compaction Instant Time| State | Total FileIds to be Compacted|
  42. |==================================================================|
  43. | 20180924070031 | REQUESTED| 1 |
  44. # Execute the compaction. The compaction instant value passed below must be the one displayed in the above "compactions show all" query
  45. hoodie:stock_ticks_mor->compaction run --compactionInstant 20180924070031 --parallelism 2 --sparkMemory 1G --schemaFilePath /var/demo/config/schema.avsc --retry 1
  46. ....
  47. Compaction successfully completed for 20180924070031
  48. ## Now check if compaction is completed
  49. hoodie:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_mor
  50. 18/09/24 07:03:00 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor
  51. 18/09/24 07:03:00 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]]
  52. 18/09/24 07:03:00 INFO table.HoodieTableConfig: Loading dataset properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties
  53. 18/09/24 07:03:00 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ from /user/hive/warehouse/stock_ticks_mor
  54. Metadata for table stock_ticks_mor loaded
  55. hoodie:stock_ticks->compactions show all
  56. 18/09/24 07:03:15 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924064636__clean__COMPLETED], [20180924064636__deltacommit__COMPLETED], [20180924065057__clean__COMPLETED], [20180924065057__deltacommit__COMPLETED], [20180924070031__commit__COMPLETED]]
  57. ___________________________________________________________________
  58. | Compaction Instant Time| State | Total FileIds to be Compacted|
  59. |==================================================================|
  60. | 20180924070031 | COMPLETED| 1 |

Step 9: Run Hive Queries including incremental queries

You will see that both ReadOptimized and Realtime Views will show the latest committed data. Lets also run the incremental query for MOR table. From looking at the below query output, it will be clear that the fist commit time for the MOR table is 20180924064636 and the second commit time is 20180924070031

  1. docker exec -it adhoc-2 /bin/bash
  2. beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
  3. # Read Optimized View
  4. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
  5. WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  6. +---------+----------------------+--+
  7. | symbol | _c1 |
  8. +---------+----------------------+--+
  9. | GOOG | 2018-08-31 10:59:00 |
  10. +---------+----------------------+--+
  11. 1 row selected (1.6 seconds)
  12. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
  13. +----------------------+---------+----------------------+---------+------------+-----------+--+
  14. | _hoodie_commit_time | symbol | ts | volume | open | close |
  15. +----------------------+---------+----------------------+---------+------------+-----------+--+
  16. | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  17. | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  18. +----------------------+---------+----------------------+---------+------------+-----------+--+
  19. # Realtime View
  20. 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
  21. WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  22. +---------+----------------------+--+
  23. | symbol | _c1 |
  24. +---------+----------------------+--+
  25. | GOOG | 2018-08-31 10:59:00 |
  26. +---------+----------------------+--+
  27. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
  28. +----------------------+---------+----------------------+---------+------------+-----------+--+
  29. | _hoodie_commit_time | symbol | ts | volume | open | close |
  30. +----------------------+---------+----------------------+---------+------------+-----------+--+
  31. | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  32. | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  33. +----------------------+---------+----------------------+---------+------------+-----------+--+
  34. # Incremental View:
  35. 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL;
  36. No rows affected (0.008 seconds)
  37. # Max-Commits covers both second batch and compaction commit
  38. 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.max.commits=3;
  39. No rows affected (0.007 seconds)
  40. 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.start.timestamp=20180924064636;
  41. No rows affected (0.013 seconds)
  42. # Query:
  43. 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG' and `_hoodie_commit_time` > '20180924064636';
  44. +----------------------+---------+----------------------+---------+------------+-----------+--+
  45. | _hoodie_commit_time | symbol | ts | volume | open | close |
  46. +----------------------+---------+----------------------+---------+------------+-----------+--+
  47. | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  48. +----------------------+---------+----------------------+---------+------------+-----------+--+
  49. exit
  50. exit

Step 10: Read Optimized and Realtime Views for MOR with Spark-SQL after compaction

  1. docker exec -it adhoc-1 /bin/bash
  2. bash-4.4# $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --master local[2] --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0
  3. # Read Optimized View
  4. scala> spark.sql("select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'").show(100, false)
  5. +---------+----------------------+--+
  6. | symbol | _c1 |
  7. +---------+----------------------+--+
  8. | GOOG | 2018-08-31 10:59:00 |
  9. +---------+----------------------+--+
  10. 1 row selected (1.6 seconds)
  11. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'").show(100, false)
  12. +----------------------+---------+----------------------+---------+------------+-----------+--+
  13. | _hoodie_commit_time | symbol | ts | volume | open | close |
  14. +----------------------+---------+----------------------+---------+------------+-----------+--+
  15. | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  16. | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  17. +----------------------+---------+----------------------+---------+------------+-----------+--+
  18. # Realtime View
  19. scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
  20. +---------+----------------------+--+
  21. | symbol | _c1 |
  22. +---------+----------------------+--+
  23. | GOOG | 2018-08-31 10:59:00 |
  24. +---------+----------------------+--+
  25. scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
  26. +----------------------+---------+----------------------+---------+------------+-----------+--+
  27. | _hoodie_commit_time | symbol | ts | volume | open | close |
  28. +----------------------+---------+----------------------+---------+------------+-----------+--+
  29. | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
  30. | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
  31. +----------------------+---------+----------------------+---------+------------+-----------+--+

Step 11: Presto queries over Read Optimized View on MOR dataset after compaction

  1. docker exec -it presto-worker-1 presto --server presto-coordinator-1:8090
  2. presto> use hive.default;
  3. USE
  4. # Read Optimized View
  5. resto:default> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
  6. symbol | _col1
  7. --------+---------------------
  8. GOOG | 2018-08-31 10:59:00
  9. (1 row)
  10. Query 20190822_182319_00011_segyw, FINISHED, 1 node
  11. Splits: 49 total, 49 done (100.00%)
  12. 0:01 [197 rows, 613B] [133 rows/s, 414B/s]
  13. presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
  14. _hoodie_commit_time | symbol | ts | volume | open | close
  15. ---------------------+--------+---------------------+--------+-----------+----------
  16. 20190822180250 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
  17. 20190822181944 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215
  18. (2 rows)
  19. Query 20190822_182333_00012_segyw, FINISHED, 1 node
  20. Splits: 17 total, 17 done (100.00%)
  21. 0:02 [197 rows, 613B] [98 rows/s, 307B/s]
  22. presto:default>

This brings the demo to an end.

Testing Hudi in Local Docker environment

You can bring up a hadoop docker environment containing Hadoop, Hive and Spark services with support for hudi.

  1. $ mvn pre-integration-test -DskipTests

The above command builds docker images for all the services with current Hudi source installed at /var/hoodie/ws and also brings up the services using a compose file. We currently use Hadoop (v2.8.4), Hive (v2.3.3) and Spark (v2.3.1) in docker images.

To bring down the containers

  1. $ cd hudi-integ-test
  2. $ mvn docker-compose:down

If you want to bring up the docker containers, use

  1. $ cd hudi-integ-test
  2. $ mvn docker-compose:up -DdetachedMode=true

Hudi is a library that is operated in a broader data analytics/ingestion environment involving Hadoop, Hive and Spark. Interoperability with all these systems is a key objective for us. We are actively adding integration-tests under hudi-integ-test/src/test/java that makes use of this docker environment (See hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java )

Building Local Docker Containers:

The docker images required for demo and running integration test are already in docker-hub. The docker images and compose scripts are carefully implemented so that they serve dual-purpose

  1. The docker images have inbuilt hudi jar files with environment variable pointing to those jars (HUDI_HADOOP_BUNDLE, …)
  2. For running integration-tests, we need the jars generated locally to be used for running services within docker. The docker-compose scripts (see docker/compose/docker-compose_hadoop284_hive233_spark231.yml) ensures local jars override inbuilt jars by mounting local HUDI workspace over the docker location
  3. As these docker containers have mounted local HUDI workspace, any changes that happen in the workspace would automatically reflect in the containers. This is a convenient way for developing and verifying Hudi for developers who do not own a distributed environment. Note that this is how integration tests are run.

This helps avoid maintaining separate docker images and avoids the costly step of building HUDI docker images locally. But if users want to test hudi from locations with lower network bandwidth, they can still build local images run the script docker/build_local_docker_images.sh to build local docker images before running docker/setup_demo.sh

Here are the commands:

  1. cd docker
  2. ./build_local_docker_images.sh
  3. .....
  4. [INFO] Reactor Summary:
  5. [INFO]
  6. [INFO] hoodie ............................................. SUCCESS [ 1.709 s]
  7. [INFO] hudi-common ...................................... SUCCESS [ 9.015 s]
  8. [INFO] hudi-hadoop-mr ................................... SUCCESS [ 1.108 s]
  9. [INFO] hudi-client ...................................... SUCCESS [ 4.409 s]
  10. [INFO] hudi-hive ........................................ SUCCESS [ 0.976 s]
  11. [INFO] hudi-spark ....................................... SUCCESS [ 26.522 s]
  12. [INFO] hudi-utilities ................................... SUCCESS [ 16.256 s]
  13. [INFO] hudi-cli ......................................... SUCCESS [ 11.341 s]
  14. [INFO] hudi-hadoop-mr-bundle ............................ SUCCESS [ 1.893 s]
  15. [INFO] hudi-hive-bundle ................................. SUCCESS [ 14.099 s]
  16. [INFO] hudi-spark-bundle ................................ SUCCESS [ 58.252 s]
  17. [INFO] hudi-hadoop-docker ............................... SUCCESS [ 0.612 s]
  18. [INFO] hudi-hadoop-base-docker .......................... SUCCESS [04:04 min]
  19. [INFO] hudi-hadoop-namenode-docker ...................... SUCCESS [ 6.142 s]
  20. [INFO] hudi-hadoop-datanode-docker ...................... SUCCESS [ 7.763 s]
  21. [INFO] hudi-hadoop-history-docker ....................... SUCCESS [ 5.922 s]
  22. [INFO] hudi-hadoop-hive-docker .......................... SUCCESS [ 56.152 s]
  23. [INFO] hudi-hadoop-sparkbase-docker ..................... SUCCESS [01:18 min]
  24. [INFO] hudi-hadoop-sparkmaster-docker ................... SUCCESS [ 2.964 s]
  25. [INFO] hudi-hadoop-sparkworker-docker ................... SUCCESS [ 3.032 s]
  26. [INFO] hudi-hadoop-sparkadhoc-docker .................... SUCCESS [ 2.764 s]
  27. [INFO] hudi-integ-test .................................. SUCCESS [ 1.785 s]
  28. [INFO] ------------------------------------------------------------------------
  29. [INFO] BUILD SUCCESS
  30. [INFO] ------------------------------------------------------------------------
  31. [INFO] Total time: 09:15 min
  32. [INFO] Finished at: 2018-09-10T17:47:37-07:00
  33. [INFO] Final Memory: 236M/1848M
  34. [INFO] ------------------------------------------------------------------------