JDBC驱动

本文档将介绍 SparkSQL 通过 JDBC 驱动对接 SequoiaDB 巨杉数据库的示例。

SparkSQL连接SequoiaDB

SparkSQL 可以通过 JDBC 驱动连接 SequoiaDB 进行操作。

连接前准备

  1. 下载安装 Spark 和 SequoiaDB 数据库,将 Spark-SequoiaDB 连接组件和 SequoiaDB Java 驱动的 jar 包复制到 Spark 安装路径下的 jars 目录下

  2. 新建一个 java 项目,并导入 sparkSQL 的 JDBC 驱动程序依赖包,可使用 maven 下载,参考配置如下:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.hive</groupId>
    4. <artifactId>hive-jdbc</artifactId>
    5. <version>$version</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.apache.hadoop</groupId>
    9. <artifactId>hadoop-common</artifactId>
    10. <version>$version</version>
    11. </dependency>
    12. </dependencies>

示例

假设 SequoiaDB 存在集合 test.test,且保存数据如下:

  1. > db.test.test.find()
  2. {
  3. "_id": {
  4. "$oid": "5d5911f41125bc9c9aa2bc0b"
  5. },
  6. "c1": 0,
  7. "c2": "mary",
  8. "c3": 15
  9. }
  10. {
  11. "_id": {
  12. "$oid": "5d5912041125bc9c9aa2bc0c"
  13. },
  14. "c1": 1,
  15. "c2": "lili",
  16. "c3": 25
  17. }

编写并执行示例代码

  1. package com.spark.samples;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.ResultSet;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. public class HiveJdbcClient {
  8. public static void main(String[] args) throws ClassNotFoundException {
  9. //JDBC Driver程序的类名
  10. Class.forName("org.apache.hive.jdbc.HiveDriver");
  11. try {
  12. //连接SparkSQL,假设spark服务所在主机名为sparkServer
  13. Connection connection = DriverManager.getConnection("jdbc:hive2://sparkServer:10000/default", "", "");
  14. System.out.println("connection success!");
  15. Statement statement = connection.createStatement();
  16. // 创建表,该表映射SequoiaDB中表test.test
  17. String crtTableName = "test";
  18. statement.execute("CREATE TABLE" + crtTableName
  19. + "( c1 int, c2 string, c3 int ) USING com.sequoiadb.spark OPTIONS ( host 'server1:11810,server2:11810', "
  20. + "collectionspace 'test', collection 'test',username '',password '')");
  21. // 查询表test数据,返回sequoiaDB中test.test表中的数据信息
  22. String sql = "select * from " + crtTableName;
  23. System.out.println("Running:" + sql);
  24. ResultSet resultSet = statement.executeQuery(sql);
  25. while (resultSet.next()) {
  26. System.out.println(
  27. String.valueOf(resultSet.getString(1)) + "\t" + String.valueOf(resultSet.getString(2)));
  28. }
  29. statement.close();
  30. connection.close();
  31. } catch (SQLException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }

运行结果如下:

  1. connection success!
  2. Running:select * from test
  3. 1 lili 25
  4. 0 mary 15

SparkSQL对接SequoiaSQL

SparkSQL 可以通过 DataFrames 使用 JDBC 对 SequoiaSQL-MySQL 或 SequoiaSQL-PGSQL 进行读写操作。

对接前准备

  1. 下载相应的 JDBC 驱动,将其拷贝到 spark 集群 SPARK_HOME/jars 目录下

  2. 在读实例执行创建测试库、测试用户、授权及准备数据,在写实例执行创建测试库、测试用户及授权

    1. -- Create test database
    2. create database sparktest;
    3. -- Create a user representing your Spark cluster
    4. create user 'sparktest'@'%' identified by 'sparktest';
    5. -- Add privileges for the Spark cluster
    6. grant create, delete, drop, insert, select, update on sparktest.* to 'sparktest'@'%';
    7. flush privileges;
    8. -- Create a test table of physical characteristics.
    9. use sparktest;
    10. create table people (
    11. id int(10) not null auto_increment,
    12. name char(50) not null,
    13. is_male tinyint(1) not null,
    14. height_in int(4) not null,
    15. weight_lb int(4) not null,
    16. primary key (id),
    17. key (id)
    18. );
    19. -- Create sample data to load into a DataFrame
    20. insert into people values (null, 'Alice', 0, 60, 125);
    21. insert into people values (null, 'Brian', 1, 64, 131);
    22. insert into people values (null, 'Charlie', 1, 74, 183);
    23. insert into people values (null, 'Doris', 0, 58, 102);
    24. insert into people values (null, 'Ellen', 0, 66, 140);
    25. insert into people values (null, 'Frank', 1, 66, 151);
    26. insert into people values (null, 'Gerard', 1, 68, 190);
    27. insert into people values (null, 'Harold', 1, 61, 128);

示例

  1. 编写示例代码

    1. package com.sequoiadb.test;
    2. import org.apache.spark.sql.Dataset;
    3. import org.apache.spark.sql.Row;
    4. import org.apache.spark.sql.SparkSession;
    5. import java.io.File;
    6. import java.io.FileInputStream;
    7. import java.util.Properties;
    8. public final class JDBCDemo {
    9. public static void main(String[] args) throws Exception {
    10. String readUrl = "jdbc:mysql://192.168.30.81/sparktest" ;
    11. String writeUrl = "jdbc:mysql://192.168.30.82/sparktest" ;
    12. SparkSession spark = SparkSession.builder().appName("JDBCDemo").getOrCreate();
    13. Properties dbProperties = new Properties();
    14. dbProperties.setProperty("user", "sparktest") ;
    15. dbProperties.setProperty("password", "sparktest" );
    16. System.out.println("A DataFrame loaded from the entire contents of a table over JDBC.");
    17. String where = "sparktest.people";
    18. Dataset<Row> entireDF = spark.read().jdbc(readUrl, where, dbProperties);
    19. entireDF.printSchema();
    20. entireDF.show();
    21. System.out.println("Filtering the table to just show the males.");
    22. entireDF.filter("is_male = 1").show();
    23. System.out.println("Alternately, pre-filter the table for males before loading over JDBC.");
    24. where = "(select * from sparktest.people where is_male = 1) as subset";
    25. Dataset<Row> malesDF = spark.read().jdbc(readUrl, where, dbProperties);
    26. malesDF.show();
    27. System.out.println("Update weights by 2 pounds (results in a new DataFrame with same column names)");
    28. Dataset<Row> heavyDF = entireDF.withColumn("updated_weight_lb", entireDF.col("weight_lb").plus(2));
    29. Dataset<Row> updatedDF = heavyDF.select("id", "name", "is_male", "height_in", "updated_weight_lb")
    30. .withColumnRenamed("updated_weight_lb", "weight_lb");
    31. updatedDF.show();
    32. System.out.println("Save the updated data to a new table with JDBC");
    33. where = "sparktest.updated_people";
    34. updatedDF.write().mode("error").jdbc(writeUrl, where, dbProperties);
    35. System.out.println("Load the new table into a new DataFrame to confirm that it was saved successfully.");
    36. Dataset<Row> retrievedDF = spark.read().jdbc(writeUrl, where, dbProperties);
    37. retrievedDF.show();
    38. spark.stop();
    39. }
    40. }
  2. 编译并提交任务

    1. mkdir -p target/java
    2. javac src/main/java/com/sequoiadb/test/JDBCDemo.java -classpath "$SPARK_HOME/jars/*" -d target/java
    3. cd target/java
    4. jar -cf ../JDBCDemo.jar *
    5. cd ../..
    6. APP_ARGS="--class com.sequoiadb.test.JDBCDemo target/JDBCDemo.jar"
    7. #本地提交
    8. $SPARK_HOME/bin/spark-submit --driver-class-path lib/mysql-connector-java-5.1.38.jar $APP_ARGS
    9. #集群提交
    10. $SPARK_HOME/bin/spark-submit --master spark://ip:7077 $APP_ARGS

    运行结果如下:

    1. A DataFrame loaded from the entire contents of a table over JDBC.
    2. root
    3. |-- id: integer (nullable = true)
    4. |-- name: string (nullable = true)
    5. |-- is_male: boolean (nullable = true)
    6. |-- height_in: integer (nullable = true)
    7. |-- weight_lb: integer (nullable = true)
    8. +---+-------+-------+---------+---------+
    9. | id| name|is_male|height_in|weight_lb|
    10. +---+-------+-------+---------+---------+
    11. | 1| Alice| false| 60| 125|
    12. | 2| Brian| true| 64| 131|
    13. | 3|Charlie| true| 74| 183|
    14. | 4| Doris| false| 58| 102|
    15. | 5| Ellen| false| 66| 140|
    16. | 6| Frank| true| 66| 151|
    17. | 7| Gerard| true| 68| 190|
    18. | 8| Harold| true| 61| 128|
    19. +---+-------+-------+---------+---------+
    20. Filtering the table to just show the males.
    21. +---+-------+-------+---------+---------+
    22. | id| name|is_male|height_in|weight_lb|
    23. +---+-------+-------+---------+---------+
    24. | 2| Brian| true| 64| 131|
    25. | 3|Charlie| true| 74| 183|
    26. | 6| Frank| true| 66| 151|
    27. | 7| Gerard| true| 68| 190|
    28. | 8| Harold| true| 61| 128|
    29. +---+-------+-------+---------+---------+
    30. Alternately, pre-filter the table for males before loading over JDBC.
    31. +---+-------+-------+---------+---------+
    32. | id| name|is_male|height_in|weight_lb|
    33. +---+-------+-------+---------+---------+
    34. | 2| Brian| true| 64| 131|
    35. | 3|Charlie| true| 74| 183|
    36. | 6| Frank| true| 66| 151|
    37. | 7| Gerard| true| 68| 190|
    38. | 8| Harold| true| 61| 128|
    39. +---+-------+-------+---------+---------+
    40. Update weights by 2 pounds (results in a new DataFrame with same column names)
    41. +---+-------+-------+---------+---------+
    42. | id| name|is_male|height_in|weight_lb|
    43. +---+-------+-------+---------+---------+
    44. | 1| Alice| false| 60| 127|
    45. | 2| Brian| true| 64| 133|
    46. | 3|Charlie| true| 74| 185|
    47. | 4| Doris| false| 58| 104|
    48. | 5| Ellen| false| 66| 142|
    49. | 6| Frank| true| 66| 153|
    50. | 7| Gerard| true| 68| 192|
    51. | 8| Harold| true| 61| 130|
    52. +---+-------+-------+---------+---------+
    53. Save the updated data to a new table with JDBC
    54. Load the new table into a new DataFrame to confirm that it was saved successfully.
    55. +---+-------+-------+---------+---------+
    56. | id| name|is_male|height_in|weight_lb|
    57. +---+-------+-------+---------+---------+
    58. | 1| Alice| false| 60| 127|
    59. | 2| Brian| true| 64| 133|
    60. | 3|Charlie| true| 74| 185|
    61. | 4| Doris| false| 58| 104|
    62. | 5| Ellen| false| 66| 142|
    63. | 6| Frank| true| 66| 153|
    64. | 7| Gerard| true| 68| 192|
    65. | 8| Harold| true| 61| 130|
    66. +---+-------+-------+---------+---------+