Quick-Start Guide

本指南通过使用spark-shell简要介绍了Hudi功能。使用Spark数据源,我们将通过代码段展示如何插入和更新Hudi的默认存储类型数据集: 写时复制。每次写操作之后,我们还将展示如何读取快照和增量数据。

Scala 示例

设置spark-shell

Hudi适用于Spark-2.x版本。您可以按照此处的说明设置spark。 在提取的目录中,使用spark-shell运行Hudi:

  1. bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

设置表名、基本路径和数据生成器来为本指南生成记录。

  1. import org.apache.hudi.QuickstartUtils._
  2. import scala.collection.JavaConversions._
  3. import org.apache.spark.sql.SaveMode._
  4. import org.apache.hudi.DataSourceReadOptions._
  5. import org.apache.hudi.DataSourceWriteOptions._
  6. import org.apache.hudi.config.HoodieWriteConfig._
  7. val tableName = "hudi_cow_table"
  8. val basePath = "file:///tmp/hudi_cow_table"
  9. val dataGen = new DataGenerator

数据生成器 可以基于行程样本模式 生成插入和更新的样本。

插入数据

生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。

  1. val inserts = convertToStringList(dataGen.generateInserts(10))
  2. val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  3. df.write.format("org.apache.hudi").
  4. options(getQuickstartWriteConfigs).
  5. option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  6. option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  7. option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  8. option(TABLE_NAME, tableName).
  9. mode(Overwrite).
  10. save(basePath);

mode(Overwrite)覆盖并重新创建数据集(如果已经存在)。 您可以检查在/tmp/hudi_cow_table/<region>/<country>/<city>/下生成的数据。我们提供了一个记录键 (schema中的uuid),分区字段(region/country/city)和组合逻辑(schema中的ts) 以确保行程记录在每个分区中都是唯一的。更多信息请参阅 对Hudi中的数据进行建模, 有关将数据提取到Hudi中的方法的信息,请参阅写入Hudi数据集。 这里我们使用默认的写操作:插入更新。 如果您的工作负载没有更新,也可以使用更快的插入批量插入操作。 想了解更多信息,请参阅写操作

查询数据

将数据文件加载到DataFrame中。

  1. val roViewDF = spark.
  2. read.
  3. format("org.apache.hudi").
  4. load(basePath + "/*/*/*/*")
  5. //load(basePath) 如果使用 "/partitionKey=partitionValue" 文件夹命名格式,Spark将自动识别分区信息
  6. roViewDF.registerTempTable("hudi_ro_table")
  7. spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()
  8. spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show()

该查询提供已提取数据的读取优化视图。由于我们的分区路径(region/country/city)是嵌套的3个级别 从基本路径开始,我们使用了load(basePath + "/*/*/*/*")。 有关支持的所有存储类型和视图的更多信息,请参考存储类型和视图

更新数据

这类似于插入新数据。使用数据生成器生成对现有行程的更新,加载到DataFrame中并将DataFrame写入hudi数据集。

  1. val updates = convertToStringList(dataGen.generateUpdates(10))
  2. val df = spark.read.json(spark.sparkContext.parallelize(updates, 2));
  3. df.write.format("org.apache.hudi").
  4. options(getQuickstartWriteConfigs).
  5. option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  6. option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  7. option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  8. option(TABLE_NAME, tableName).
  9. mode(Append).
  10. save(basePath);

注意,保存模式现在为追加。通常,除非您是第一次尝试创建数据集,否则请始终使用追加模式。 查询现在再次查询数据将显示更新的行程。每个写操作都会生成一个新的由时间戳表示的commit 。在之前提交的相同的_hoodie_record_key中寻找_hoodie_commit_time, rider, driver字段变更。

增量查询

Hudi还提供了获取给定提交时间戳以来已更改的记录流的功能。 这可以通过使用Hudi的增量视图并提供所需更改的开始时间来实现。 如果我们需要给定提交之后的所有更改(这是常见的情况),则无需指定结束时间。

  1. // reload data
  2. spark.
  3. read.
  4. format("org.apache.hudi").
  5. load(basePath + "/*/*/*/*").
  6. createOrReplaceTempView("hudi_ro_table")
  7. val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
  8. val beginTime = commits(commits.length - 2) // commit time we are interested in
  9. // 增量查询数据
  10. val incViewDF = spark.
  11. read.
  12. format("org.apache.hudi").
  13. option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
  14. option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  15. load(basePath);
  16. incViewDF.registerTempTable("hudi_incr_table")
  17. spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

这将提供在开始时间提交之后发生的所有更改,其中包含票价大于20.0的过滤器。关于此功能的独特之处在于,它现在使您可以在批量数据上创作流式管道。

特定时间点查询

让我们看一下如何查询特定时间的数据。可以通过将结束时间指向特定的提交时间,将开始时间指向”000”(表示最早的提交时间)来表示特定时间。

  1. val beginTime = "000" // Represents all commits > this time.
  2. val endTime = commits(commits.length - 2) // commit time we are interested in
  3. // 增量查询数据
  4. val incViewDF = spark.read.format("org.apache.hudi").
  5. option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
  6. option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  7. option(END_INSTANTTIME_OPT_KEY, endTime).
  8. load(basePath);
  9. incViewDF.registerTempTable("hudi_incr_table")
  10. spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()

删除数据

删除传入的 HoodieKeys 的记录。

  1. // spark-shell
  2. // 获取记录总数
  3. spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
  4. // 拿到两条将要删除的数据
  5. val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
  6. // 执行删除
  7. val deletes = dataGen.generateDeletes(ds.collectAsList())
  8. val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
  9. df.write.format("hudi").
  10. options(getQuickstartWriteConfigs).
  11. option(OPERATION_OPT_KEY,"delete").
  12. option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  13. option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  14. option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  15. option(TABLE_NAME, tableName).
  16. mode(Append).
  17. save(basePath)
  18. // 向之前一样运行查询
  19. val roAfterDeleteViewDF = spark.
  20. read.
  21. format("hudi").
  22. load(basePath + "/*/*/*/*")
  23. roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
  24. // 应返回 (total - 2) 条记录
  25. spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

注意: 删除操作只支持 Append 模式。

Pyspark 示例

设置spark-shell

Hudi适用于Spark-2.x版本。您可以按照此处的说明设置spark。 在提取的目录中,使用spark-shell运行Hudi:

  1. # pyspark
  2. export PYSPARK_PYTHON=$(which python3)
  3. spark-2.4.4-bin-hadoop2.7/bin/pyspark \
  4. --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.4 \
  5. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

请注意以下事项:

  • 需要通过 —packages 指定 spark-avro, 因为默认情况下 spark-shell 不包含该模块
  • spark-avro 和 spark 的版本必须匹配 (上面两个我们都使用了2.4.4)
  • 我们使用了基于 scala 2.11 构建的 hudi-spark-bundle, 因为使用的 spark-avro 也是基于 scala 2.11的. 如果使用了 spark-avro_2.12, 相应的, 需要使用 hudi-spark-bundle_2.12.

设置表名、基本路径和数据生成器来为本指南生成记录。

  1. # pyspark
  2. tableName = "hudi_trips_cow"
  3. basePath = "file:///tmp/hudi_trips_cow"
  4. dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

数据生成器 可以基于行程样本模式 生成插入和更新的样本。

插入数据

生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。

  1. # pyspark
  2. inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
  3. df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  4. hudi_options = {
  5. 'hoodie.table.name': tableName,
  6. 'hoodie.datasource.write.recordkey.field': 'uuid',
  7. 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  8. 'hoodie.datasource.write.table.name': tableName,
  9. 'hoodie.datasource.write.operation': 'insert',
  10. 'hoodie.datasource.write.precombine.field': 'ts',
  11. 'hoodie.upsert.shuffle.parallelism': 2,
  12. 'hoodie.insert.shuffle.parallelism': 2
  13. }
  14. df.write.format("hudi"). \
  15. options(**hudi_options). \
  16. mode("overwrite"). \
  17. save(basePath)

mode(Overwrite) 覆盖并重新创建数据集(如果已经存在)。 您可以检查在/tmp/hudi_cow_table/<region>/<country>/<city>/下生成的数据。我们提供了一个记录键 (schema中的uuid),分区字段(region/country/city)和组合逻辑(schema中的ts) 以确保行程记录在每个分区中都是唯一的。更多信息请参阅 对Hudi中的数据进行建模, 有关将数据提取到Hudi中的方法的信息,请参阅写入Hudi数据集。 这里我们使用默认的写操作:插入更新。 如果您的工作负载没有更新,也可以使用更快的插入批量插入操作。 想了解更多信息,请参阅写操作

查询数据

将数据文件加载到DataFrame中。

  1. # pyspark
  2. tripsSnapshotDF = spark. \
  3. read. \
  4. format("hudi"). \
  5. load(basePath + "/*/*/*/*")
  6. # load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
  7. tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
  8. spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
  9. spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

该查询提供已提取数据的读取优化视图。由于我们的分区路径(region/country/city)是嵌套的3个级别 从基本路径开始,我们使用了load(basePath + "/*/*/*/*")。 有关支持的所有存储类型和视图的更多信息,请参考存储类型和视图

更新数据

这类似于插入新数据。使用数据生成器生成对现有行程的更新,加载到DataFrame中并将DataFrame写入hudi数据集。

  1. # pyspark
  2. updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
  3. df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
  4. df.write.format("hudi"). \
  5. options(**hudi_options). \
  6. mode("append"). \
  7. save(basePath)

注意,保存模式现在为追加。通常,除非您是第一次尝试创建数据集,否则请始终使用追加模式。 查询现在再次查询数据将显示更新的行程。每个写操作都会生成一个新的由时间戳表示的commit 。在之前提交的相同的_hoodie_record_key中寻找_hoodie_commit_time, rider, driver字段变更。

增量查询

Hudi还提供了获取给定提交时间戳以来已更改的记录流的功能。 这可以通过使用Hudi的增量视图并提供所需更改的开始时间来实现。 如果我们需要给定提交之后的所有更改(这是常见的情况),则无需指定结束时间。

  1. # pyspark
  2. # 加载数据
  3. spark. \
  4. read. \
  5. format("hudi"). \
  6. load(basePath + "/*/*/*/*"). \
  7. createOrReplaceTempView("hudi_trips_snapshot")
  8. commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").limit(50).collect()))
  9. beginTime = commits[len(commits) - 2] # commit time we are interested in
  10. # 增量的查询数据
  11. incremental_read_options = {
  12. 'hoodie.datasource.query.type': 'incremental',
  13. 'hoodie.datasource.read.begin.instanttime': beginTime,
  14. }
  15. tripsIncrementalDF = spark.read.format("hudi"). \
  16. options(**incremental_read_options). \
  17. load(basePath)
  18. tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
  19. spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

这将提供在开始时间提交之后发生的所有更改,其中包含票价大于20.0的过滤器。关于此功能的独特之处在于,它现在使您可以在批量数据上创作流式管道。

特定时间点查询

让我们看一下如何查询特定时间的数据。可以通过将结束时间指向特定的提交时间,将开始时间指向”000”(表示最早的提交时间)来表示特定时间。

  1. # pyspark
  2. beginTime = "000" # 代表所有大于该时间的 commits.
  3. endTime = commits[len(commits) - 2] # 我们感兴趣的提交时间
  4. # 特定时间查询
  5. point_in_time_read_options = {
  6. 'hoodie.datasource.query.type': 'incremental',
  7. 'hoodie.datasource.read.end.instanttime': endTime,
  8. 'hoodie.datasource.read.begin.instanttime': beginTime
  9. }
  10. tripsPointInTimeDF = spark.read.format("hudi"). \
  11. options(**point_in_time_read_options). \
  12. load(basePath)
  13. tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
  14. spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

删除数据

删除传入的 HoodieKeys 的记录。

注意: 删除操作只支持 Append 模式。

  1. # pyspark
  2. # 获取记录总数
  3. spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
  4. # 拿到两条将被删除的记录
  5. ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
  6. # 执行删除
  7. hudi_delete_options = {
  8. 'hoodie.table.name': tableName,
  9. 'hoodie.datasource.write.recordkey.field': 'uuid',
  10. 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  11. 'hoodie.datasource.write.table.name': tableName,
  12. 'hoodie.datasource.write.operation': 'delete',
  13. 'hoodie.datasource.write.precombine.field': 'ts',
  14. 'hoodie.upsert.shuffle.parallelism': 2,
  15. 'hoodie.insert.shuffle.parallelism': 2
  16. }
  17. from pyspark.sql.functions import lit
  18. deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
  19. df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))
  20. df.write.format("hudi"). \
  21. options(**hudi_delete_options). \
  22. mode("append"). \
  23. save(basePath)
  24. # 向之前一样运行查询
  25. roAfterDeleteViewDF = spark. \
  26. read. \
  27. format("hudi"). \
  28. load(basePath + "/*/*/*/*")
  29. roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
  30. # 应返回 (total - 2) 条记录
  31. spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()

从这开始下一步?

您也可以通过自己构建hudi来快速开始, 并在spark-shell命令中使用--jars <path to hudi_code>/packaging/hudi-spark-bundle/target/hudi-spark-bundle-*.*.*-SNAPSHOT.jar, 而不是--packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating

这里我们使用Spark演示了Hudi的功能。但是,Hudi可以支持多种存储类型/视图,并且可以从Hive,Spark,Presto等查询引擎中查询Hudi数据集。 我们制作了一个基于Docker设置、所有依赖系统都在本地运行的演示视频, 我们建议您复制相同的设置然后按照这里的步骤自己运行这个演示。 另外,如果您正在寻找将现有数据迁移到Hudi的方法,请参考迁移指南