TiSpark 用户指南

TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势,和 TiDB 一起为用户一站式解决 HTAP (Hybrid Transactional/Analytical Processing) 的需求。TiSpark 依赖于 TiKV 集群和 Placement Driver (PD),也需要你搭建一个 Spark 集群。

本文简单介绍如何部署和使用 TiSpark。本文假设你对 Spark 有基本认知。你可以参阅 Apache Spark 官网了解 Spark 的相关信息。

概述

TiSpark 是将 Spark SQL 直接运行在分布式存储引擎 TiKV 上的 OLAP 解决方案。其架构图如下:

TiSpark Architecture

  • TiSpark 深度整合了 Spark Catalyst 引擎,可以对计算提供精确的控制,使 Spark 能够高效的读取 TiKV 中的数据,提供索引支持以实现高速的点查。
  • 通过多种计算下推减少 Spark SQL 需要处理的数据大小,以加速查询;利用 TiDB 的内建的统计信息选择更优的查询计划。
  • 从数据集群的角度看,TiSpark + TiDB 可以让用户无需进行脆弱和难以维护的 ETL,直接在同一个平台进行事务和分析两种工作,简化了系统架构和运维。
  • 用户借助 TiSpark 项目可以在 TiDB 上使用 Spark 生态圈提供的多种工具进行数据处理。例如,使用 TiSpark 进行数据分析和 ETL;使用 TiKV 作为机器学习的数据源;借助调度系统产生定时报表等等。
  • 除此之外,TiSpark 还提供了分布式写入 TiKV 的功能。相比使用 Spark 结合 JDBC 的方式写入 TiDB,分布式写入 TiKV 可以实现事务(要么全部数据写入成功,要么全部都写入失败),并且写入速度会更快。

环境准备

现有 TiSpark 2.x 版本支持 Spark 2.3.x 和 Spark 2.4.x。如果你希望使用 Spark 2.1.x 版本,需使用 TiSpark 1.x。

TiSpark 需要 JDK 1.8+ 以及 Scala 2.11(Spark2.0+ 默认 Scala 版本)。

TiSpark 可以在 YARN,Mesos,Standalone 等任意 Spark 模式下运行。

推荐配置

本部分描述了 TiKV 与 TiSpark 集群分开部署、Spark 与 TiSpark 集群独立部署,以及 TiSpark 与 TiKV 集群混合部署的建议配置。

TiKV 与 TiSpark 集群分开部署的配置

对于 TiKV 与 TiSpark 分开部署的场景,可以参考如下建议配置:

  • 硬件配置建议

    普通场景可以参考 TiDB 和 TiKV 硬件配置建议,但是如果是偏重分析的场景,可以将 TiKV 节点增加到至少 64G 内存。

Spark 与 TiSpark 集群独立部署的配置

关于 Spark 的详细硬件推荐配置请参考官网,如下是 TiSpark 所需环境的简单描述:

Spark 推荐 32G 内存以上的配额。请在配置中预留 25% 的内存给操作系统。

Spark 推荐每台计算节点配备 CPU 累计 8 到 16 核以上。你可以初始设定分配所有 CPU 核给 Spark。

Spark 的具体配置方式也请参考官方说明。以下为根据 spark-env.sh 配置的范例:

  1. SPARK_EXECUTOR_CORES: 5
  2. SPARK_EXECUTOR_MEMORY: 10g
  3. SPARK_WORKER_CORES: 5
  4. SPARK_WORKER_MEMORY: 10g

spark-defaults.conf 中,增加如下配置:

  1. spark.tispark.pd.addresses $your_pd_servers
  2. spark.sql.extensions org.apache.spark.sql.TiExtensions

CDH spark 版本中添加如下配置:

  1. spark.tispark.pd.addresses=$your_pd_servers
  2. spark.sql.extensions=org.apache.spark.sql.TiExtensions

your_pd_servers 是用逗号分隔的 PD 地址,每个地址使用 地址:端口 的格式。

例如你有一组 PD 在10.16.20.110.16.20.210.16.20.3,那么 PD 配置格式是10.16.20.1:2379,10.16.20.2:2379,10.16.20.3:2379

TiSpark 与 TiKV 集群混合部署的配置

对于 TiKV 与 TiSpark 混合部署的场景,需在原有 TiKV 预留资源之外累加 Spark 所需部分,并分配 25% 的内存作为系统本身占用。

部署 TiSpark

TiSpark 的 jar 包可以在 TiSpark Releases 页面下载对应版本的 jar 包并拷贝到合适的目录。

已有 Spark 集群的部署方式

如果在已有 Spark 集群上运行 TiSpark,无需重启集群。可以使用 Spark 的 --jars 参数将 TiSpark 作为依赖引入:

  1. spark-shell --jars $TISPARK_FOLDER/tispark-${name_with_version}.jar

没有 Spark 集群的部署方式

如果没有使用中的 Spark 集群,推荐使用 Spark Standalone 方式部署。这里简单介绍下 Standalone 部署方式。如果遇到问题,可以去官网寻求帮助;也欢迎在 GitHub 上提 issue

下载安装包并安装

你可以在 Download Apache Spark™ 页面下载 Apache Spark。

对于 Standalone 模式且无需 Hadoop 支持,则选择 Spark 2.3.x 或者 Spark 2.4.x 且带有 Hadoop 依赖的 Pre-build with Apache Hadoop 2.x 任意版本。如有需要配合使用的 Hadoop 集群,则选择对应的 Hadoop 版本号。你也可以选择从源代码自行构建以配合官方 Hadoop 2.x 之前的版本。

如果你已经有了 Spark 二进制文件,并且当前 PATH 为 SPARKPATH,需将 TiSpark jar 包拷贝到 ${SPARKPATH}/jars 目录下。

启动 Master

在选中的 Spark Master 节点执行如下命令:

  1. cd $SPARKPATH
  1. ./sbin/start-master.sh

在这步完成以后,屏幕上会打印出一个 log 文件。检查 log 文件确认 Spark-Master 是否启动成功。你可以打开 http://spark-master-hostname:8080 查看集群信息(如果你没有改动 Spark-Master 默认 Port Numebr)。在启动 Spark-Worker 的时候,也可以通过这个面板来确认 Worker 是否已经加入集群。

启动 Worker

类似地,可以用如下命令启动 Spark-Worker 节点:

  1. ./sbin/start-slave.sh spark://spark-master-hostname:7077

命令返回以后,即可通过刚才的面板查看这个 Worker 是否已经正确地加入了 Spark 集群。在所有 Worker 节点重复刚才的命令。确认所有的 Worker 都可以正确连接 Master,这样你就拥有了一个 Standalone 模式的 Spark 集群。

Spark SQL shell 和 JDBC 服务器

当前版本的 TiSpark 可以直接使用 spark-sql 和 Spark 的 ThriftServer JDBC 服务器。

一个使用范例

假设你已经按照上述步骤成功启动了 TiSpark 集群,下面简单介绍如何使用 Spark SQL 来做 OLAP 分析。这里我们用名为 tpch 数据库中的 lineitem 表作为范例。

假设你的 PD 节点位于 192.168.1.100,端口为 2379,在 $SPARK_HOME/conf/spark-defaults.conf 加入:

  1. spark.tispark.pd.addresses 192.168.1.100:2379
  1. spark.sql.extensions org.apache.spark.sql.TiExtensions

然后在 Spark-Shell 里像原生 Spark 一样输入下面的命令:

  1. spark.sql("use tpch")
  1. spark.sql("select count(*) from lineitem").show

结果为:

  1. +-------------+
  2. | Count (1) |
  3. +-------------+
  4. | 600000000 |
  5. +-------------+

Spark SQL 交互 Shell 和原生 Spark 一致:

  1. spark-sql> use tpch;
  1. Time taken: 0.015 seconds
  1. spark-sql> select count(*) from lineitem;
  1. 2000
  2. Time taken: 0.673 seconds, Fetched 1 row(s)

SQuirreLSQL 和 hive-beeline 可以使用 JDBC 连接 Thrift 服务器。例如,使用 beeline 连接:

  1. ./beeline
  1. Beeline version 1.2.2 by Apache Hive
  1. beeline> !connect jdbc:hive2://localhost:10000
  1. 1: jdbc:hive2://localhost:10000> use testdb;
  1. +---------+--+
  2. | Result |
  3. +---------+--+
  4. +---------+--+
  5. No rows selected (0.013 seconds)
  1. select count(*) from account;
  1. +-----------+--+
  2. | count(1) |
  3. +-----------+--+
  4. | 1000000 |
  5. +-----------+--+
  6. 1 row selected (1.97 seconds)

和 Hive 一起使用 TiSpark

TiSpark 可以和 Hive 混合使用。在启动 Spark 之前,需要添加 HADOOP_CONF_DIR 环境变量指向 Hadoop 配置目录并且将 hive-site.xml 拷贝到 $SPARK_HOME/conf 目录下。

  1. val tisparkDF = spark.sql("select * from tispark_table").toDF
  2. tisparkDF.write.saveAsTable("hive_table") // save table to hive
  3. spark.sql("select * from hive_table a, tispark_table b where a.col1 = b.col1").show // join table across Hive and Tispark

通过 TiSpark 将 DataFrame 批量写入 TiDB

TiSpark 从 v2.3 版本开始原生支持将 DataFrame 批量写入 TiDB 集群,该写入模式通过 TiKV 的两阶段提交协议实现。

TiSpark 批量写入相比 Spark + JDBC 写入,有以下特点:

比较的方面 TiSpark 批量写入 Spark + JDBC 写入
原子性 DataFrame 的数据要么全部写入成功,要么全部写入失败 如果在写入过程中 spark 任务失败退出,会出现部分数据写入成功的情况
隔离性 写入过程中其他事务对正在写入的数据不可见 写入过程中其他事务能看到部分写入成功的数据
错误恢复 失败后只需要重新运行 Spark 程序 需要业务来实现幂等,例如失败后需要先清理部分写入成功的数据,再重新运行 Spark 程序,并且需要设置 spark.task.maxFailures=1,防止 task 内重试导致数据重复
速度 直接写入 TiKV,速度更快 通过 TiDB 再写入 TiKV,对速度会有影响

以下通过 scala API 演示如何使用 TiSpark 批量写入:

  1. // select data to write
  2. val df = spark.sql("select * from tpch.ORDERS")
  3. // write data to tidb
  4. df.write.
  5. format("tidb").
  6. option("tidb.addr", "127.0.0.1").
  7. option("tidb.port", "4000").
  8. option("tidb.user", "root").
  9. option("tidb.password", "").
  10. option("database", "tpch").
  11. option("table", "target_orders").
  12. mode("append").
  13. save()

如果写入的数据量比较大,且写入时间超过 10 分钟,则需要保证 GC 时间大于写入时间。

  1. update mysql.tidb set VARIABLE_VALUE="6h" where VARIABLE_NAME="tikv_gc_life_time";

详细使用手册请参考该文档

通过 JDBC 将 DataFrame 写入 TiDB

除了使用 TiSpark 将 DataFrame 批量写入 TiDB 集群以外,也可以使用 Spark 原生的 JDBC 支持进行写入:

  1. import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
  2. val customer = spark.sql("select * from customer limit 100000")
  3. // you might repartition source to make it balance across nodes
  4. // and increase concurrency
  5. val df = customer.repartition(32)
  6. df.write
  7. .mode(saveMode = "append")
  8. .format("jdbc")
  9. .option("driver", "com.mysql.jdbc.Driver")
  10. // replace host and port as your and be sure to use rewrite batch
  11. .option("url", "jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true")
  12. .option("useSSL", "false")
  13. // As tested, 150 is good practice
  14. .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150)
  15. .option("dbtable", s"cust_test_select") // database name and table name here
  16. .option("isolationLevel", "NONE") // recommended to set isolationLevel to NONE if you have a large DF to load.
  17. .option("user", "root") // TiDB user here
  18. .save()

推荐将 isolationLevel 设置为 NONE,否则单一大事务有可能造成 TiDB 服务器内存溢出。

注意:

TiSpark 使用 JDBC 时默认 isolationLevelREAD_UNCOMMITTED,会造成事务隔离级别不支持的错误。推荐将 isolationLevel 设置为 NONE

统计信息

TiSpark 可以使用 TiDB 的统计信息:

  1. 选择代价最低的索引或扫表访问
  2. 估算数据大小以决定是否进行广播优化

如果希望使用统计信息支持,需要确保所涉及的表已经被分析。请阅读这份文档了解如何进行表分析。

从 TiSpark 2.0 开始,统计信息将会默认被读取。

统计信息将在 Spark Driver 进行缓存,请确定 Driver 内存足够缓存统计信息。可以在spark-defaults.conf中开启或关闭统计信息读取:

Property Name Default Description
spark.tispark.statistics.auto_load true 是否默认进行统计信息读取

TiSpark FAQ

  • Q. 是独立部署还是和现有 Spark/Hadoop 集群共用资源?

    A. 可以利用现有 Spark 集群无需单独部署,但是如果现有集群繁忙,TiSpark 将无法达到理想速度。

  • Q. 是否可以和 TiKV 混合部署?

    A. 如果 TiDB 以及 TiKV 负载较高且运行关键的线上任务,请考虑单独部署 TiSpark;并且考虑使用不同的网卡保证 OLTP 的网络资源不被侵占而影响线上业务。如果线上业务要求不高或者机器负载不大,可以考虑与 TiKV 混合部署。

  • Q. Spark 执行中报 warning:WARN ObjectStore:568 - Failed to get database

    A. Warning 忽略即可,原因是 Spark 找不到对应的 hive 库,因为这个库是在 TIKV 中,而不是在 hive 中。可以考虑调整 log4j 日志,将该参数添加到 spark 下 conf 里 log4j 文件(如果后缀是 template 那先 mv 成后缀 properties)。

  • Q. Spark 执行中报 java.sql.BatchUpdateException: Data Truncated

    A. 写入的数据长度超过了数据库定义的数据类型的长度,可以确认 target table 的字段长度,进行调整。

  • Q. TiSpark 任务是否默认读取 Hive 的元数据?

    A. TiSpark 通过读取 hive-site 里的 meta 来搜寻 hive 的库。如果搜寻不到,就通过读取 tidb meta 搜寻 tidb 库。如果不需要该行为,可不在 hive site 中配置 hive 的 meta。

  • Q. TiSpark 执行 Spark 任务时报:”Error:java.io.InvalidClassException: com.pingcap.tikv.region.TiRegion; local class incompatible: stream classdesc serialVersionUID …”

    A. 该报错日志中显示 serialVersionUID 冲突,说明存在不同版本的 class 和 TiRegion。因为 TiRegion 是 TiSpark 独有的,所以可能存在多个版本的 TiSpark 包。要解决该报错,请确保集群中各节点的 TiSpark 依赖包版本一致。