Sqoop 引擎

本文主要介绍Sqoop(>=1.1.2 版本支持)引擎的配置、部署和使用。

Sqoop引擎主要依赖Hadoop基础环境,如果该节点需要部署Sqoop引擎,需要部署Hadoop客户端环境。

强烈建议您在执行Sqoop任务之前,先在该节点使用原生的Sqoop执行测试任务,以检测该节点环境是否正常。

  1. #验证sqoop环境是否可用 参考示例:将hdfs的/user/hive/warehouse/hadoop/test_linkis_sqoop文件数据导入到mysql表 test_sqoop中
  2. sqoop export \
  3. --connect jdbc:mysql://10.10.10.10/test \
  4. --username test \
  5. --password test123\
  6. --table test_sqoop \
  7. --columns user_id,user_code,user_name,email,status \
  8. --export-dir /user/hive/warehouse/hadoop/test_linkis_sqoop \
  9. --update-mode allowinsert \
  10. --verbose ;
环境变量名环境变量内容备注
JAVA_HOMEJDK安装路径必须
HADOOP_HOMEHadoop安装路径必须
HADOOP_CONF_DIRHadoop配置路径必须
SQOOP_HOMESqoop安装路径必须
SQOOP_CONF_DIRSqoop配置路径非必须
HCAT_HOMEHCAT配置路径非必须
HBASE_HOMEHBASE配置路径非必须

表1-1 环境配置清单

Linkis系统参数参数备注
wds.linkis.hadoop.site.xml设置sqoop加载hadoop参数文件位置一般不需要单独配置,默认值”core-site.xml;hdfs-site.xml;yarn-site.xml;mapred-site.xml”
sqoop.fetch.status.interval设置获取sqoop执行状态的间隔时间一般不需要单独配置,默认值为5s

Linkis 1.1.2及以上支持的主流Sqoop版本1.4.6与1.4.7,更高版本可能需要修改部分代码重新编译。

注意: 编译sqoop引擎之前需要进行linkis项目全量编译

  1. 单独编译sqoop的方式
  2. ${linkis_code_dir}/linkis-engineconn-plugins/sqoop/
  3. mvn clean install

安装方式是将编译出来的引擎包,位置在

  1. ${linkis_code_dir}/linkis-enginepconn-plugins/engineconn-plugins/sqoop/target/sqoop-engineconn.zip

然后上传部署到linkis服务器

  1. ${LINKIS_HOME}/lib/linkis-engineplugins

并重启linkis-engineplugin

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-engineplugin

engineplugin更详细的介绍可以参看下面的文章。

https://linkis.apache.org/zh-CN/docs/latest/deployment/install-engineconn

hdfs文件导出到mysql

  1. sh linkis-cli-sqoop export \
  2. -D mapreduce.job.queuename=ide \
  3. --connect jdbc:mysql://10.10.10.10:9600/testdb \
  4. --username password@123 \
  5. --password password@123 \
  6. --table test_sqoop_01_copy \
  7. --columns user_id,user_code,user_name,email,status \
  8. --export-dir /user/hive/warehouse/hadoop/test_linkis_sqoop_2 \
  9. --update-mode allowinsert --verbose ;

mysql数据导入到hive库

  1. mysql导入到hive linkis_test_ind.test_import_sqoop_1,表test_import_sqoop_1不存在 需要添加参数 --create-hive-table
  2. sh linkis-cli-sqoop import -D mapreduce.job.queuename=dws \
  3. --connect jdbc:mysql://10.10.10.10:3306/casion_test \
  4. --username hadoop \
  5. --password password@123 \
  6. --table test_sqoop_01 \
  7. --columns user_id,user_code,user_name,email,status \
  8. --fields-terminated-by ',' \
  9. --hive-import --create-hive-table \
  10. --hive-database casionxia_ind \
  11. --hive-table test_import_sqoop_1 \
  12. --hive-drop-import-delims \
  13. --delete-target-dir \
  14. --input-null-non-string '\\N' \
  15. --input-null-string '\\N' \
  16. --verbose ;
  17. mysql导入到hive linkis_test_ind.test_import_sqoop_1,表test_import_sqoop_1存在 移除参数--create-hive-table \
  18. sh linkis-cli-sqoop import -D mapreduce.job.queuename=dws \
  19. --connect jdbc:mysql://10.10.10.10:9600/testdb \
  20. --username testdb \
  21. --password password@123 \
  22. --table test_sqoop_01 \
  23. --columns user_id,user_code,user_name,email,status \
  24. --fields-terminated-by ',' \
  25. --hive-import \
  26. --hive-database linkis_test_ind \
  27. --hive-table test_import_sqoop_1 \
  28. --hive-overwrite \
  29. --hive-drop-import-delims \
  30. --delete-target-dir \
  31. --input-null-non-string '\\N' \
  32. --input-null-string '\\N' \
  33. --verbose ;

OnceEngineConn的使用方式是通过LinkisManagerClient调用LinkisManager的createEngineConn的接口,并将代码发给创建的Sqoop引擎,然后Sqoop引擎就开始执行,此方式可以被其他系统进行调用,比如Exchangis。Client的使用方式也很简单,首先新建一个maven项目,或者在您的项目中引入以下的依赖

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

测试用例:

  1. import java.util.concurrent.TimeUnit
  2. import java.util
  3. import org.apache.linkis.computation.client.LinkisJobBuilder
  4. import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
  5. import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
  6. import org.apache.linkis.computation.client.utils.LabelKeyUtils
  7. import scala.collection.JavaConverters._
  8. object SqoopOnceJobTest extends App {
  9. LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001")
  10. val logPath = "C:\\Users\\resources\\log4j.properties"
  11. System.setProperty("log4j.configurationFile", logPath)
  12. val startUpMap = new util.HashMap[String, Any]
  13. startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")
  14. val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client")
  15. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
  16. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client")
  17. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
  18. .setStartupParams(startUpMap)
  19. .setMaxSubmitTime(30000)
  20. .addExecuteUser("freeuser")
  21. val onceJob = importJob(builder)
  22. val time = System.currentTimeMillis()
  23. onceJob.submit()
  24. println(onceJob.getId)
  25. val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
  26. println(onceJob.getECMServiceInstance)
  27. logOperator.setFromLine(0)
  28. logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
  29. logOperator.setEngineConnType("sqoop")
  30. logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
  31. var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
  32. var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
  33. var end = false
  34. var rowBefore = 1
  35. while (!end || rowBefore > 0){
  36. if(onceJob.isCompleted) {
  37. end = true
  38. metricOperator = null
  39. }
  40. logOperator.setPageSize(100)
  41. Utils.tryQuietly{
  42. val logs = logOperator.apply()
  43. logs.logs.asScala.foreach( log => {
  44. println(log)
  45. })
  46. rowBefore = logs.logs.size
  47. }
  48. Thread.sleep(3000)
  49. Option(metricOperator).foreach( operator => {
  50. if (!onceJob.isCompleted){
  51. println(s"Metric监控: ${operator.apply()}")
  52. println(s"进度: ${progressOperator.apply()}")
  53. }
  54. })
  55. }
  56. onceJob.isCompleted
  57. onceJob.waitForCompleted()
  58. println(onceJob.getStatus)
  59. println(TimeUnit.SECONDS.convert(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS) + "s")
  60. System.exit(0)
  61. def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  62. jobBuilder
  63. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10")
  64. .addJobContent("sqoop.mode", "import")
  65. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
  66. .addJobContent("sqoop.args.username", "free")
  67. .addJobContent("sqoop.args.password", "testpwd")
  68. .addJobContent("sqoop.args.query", "select id as order_number, sno as time from" +
  69. " exchangis where sno =1 and $CONDITIONS")
  70. .addJobContent("sqoop.args.hcatalog.database", "freedb")
  71. .addJobContent("sqoop.args.hcatalog.table", "zy_test")
  72. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
  73. .addJobContent("sqoop.args.hcatalog.partition.values", "3")
  74. .addJobContent("sqoop.args.num.mappers", "1")
  75. .build()
  76. }
  77. def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  78. jobBuilder
  79. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue1")
  80. .addJobContent("sqoop.mode", "import")
  81. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
  82. .addJobContent("sqoop.args.query", "select id as order, sno as great_time from" +
  83. " exchangis_table where sno =1 and $CONDITIONS")
  84. .addJobContent("sqoop.args.hcatalog.database", "hadoop")
  85. .addJobContent("sqoop.args.hcatalog.table", "partition_33")
  86. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
  87. .addJobContent("sqoop.args.hcatalog.partition.values", "4")
  88. .addJobContent("sqoop.args.num.mappers", "1")
  89. .build()
  90. }
参数key说明
sqoop.modeimport/export/…
-Dmapreduce.job.queuenamesqoop.env.mapreduce.job.queuename
—connect <jdbc-uri>sqoop.args.connectSpecify JDBC connect string
—connection-manager <class-name>sqoop.args.connection.managerSpecify connection manager class name
—connection-param-file <properties-file>sqoop.args.connection.param.fileSpecify connection parameters file
—driver <class-name>sqoop.args.driverManually specify JDBC driver class to use
—hadoop-home <hdir>sqoop.args.hadoop.homeOverride $HADOOP_MAPRED_HOME_ARG
—hadoop-mapred-home <dir>sqoop.args.hadoop.mapred.homeOverride $HADOOP_MAPRED_HOME_ARG
—helpsqoop.args.helpPrint usage instructions
-PRead password from console
—password <password>sqoop.args.passwordSet authentication password
—password-alias <password-alias>sqoop.args.password.aliasCredential provider password alias
—password-file <password-file>sqoop.args.password.fileSet authentication password file path
—relaxed-isolationsqoop.args.relaxed.isolationUse read-uncommitted isolation for imports
—skip-dist-cachesqoop.args.skip.dist.cacheSkip copying jars to distributed cache
—username <username>sqoop.args.usernameSet authentication username
—verbosesqoop.args.verbosePrint more information while working
参数key说明
—batchsqoop.args.batch Indicates underlying statements to be executed in batch mode
—call <arg>sqoop.args.callPopulate the table using this stored procedure (one call  per row)
—clear-staging-tablesqoop.args.clear.staging.tableIndicates that any data in staging table can be deleted
—columns <col,col,col…>sqoop.args.columnsColumns to export to table
—directsqoop.args.directUse direct export fast path
—export-dir <dir>sqoop.args.export.dirHDFS source path for the export
-m,—num-mappers <n>sqoop.args.num.mappersUse ‘n’ map tasks to export in parallel
—mapreduce-job-name <name>sqoop.args.mapreduce.job.nameSet name for generated mapreduce job
—staging-table <table-name>sqoop.args.staging.tableIntermediate staging  table
—table <table-name>sqoop.args.tableTable to populate
—update-key <key>sqoop.args.update.keyUpdate records by specified key column
—update-mode <mode>sqoop.args.update.modeSpecifies how updates are  performed  when new   rows are  found with non-matching keys in database
—validatesqoop.args.validateValidate the copy using the configured validator
—validation-failurehandler <validation-failurehandler>sqoop.args.validation.failurehandlerValidate the  copy using the configured validator
—validation-threshold <validation-threshold>sqoop.args.validation.threshold Fully qualified class name for ValidationThreshold
—validator <validator>sqoop.args.validatorFully qualified class name for the Validator
参数key说明
—appendsqoop.args.appendImports data in append mode
—as-avrodatafilesqoop.args.as.avrodatafileImports data to Avro data files
—as-parquetfilesqoop.args.as.parquetfileImports data to Parquet files
—as-sequencefilesqoop.args.as.sequencefileImports data to SequenceFiles
—as-textfilesqoop.args.as.textfileImports data as plain text (default)
—autoreset-to-one-mappersqoop.args.autoreset.to.one.mapperReset the number of mappers to one mapper if no split key available
—boundary-query <statement>sqoop.args.boundary.querySet boundary query for retrieving max and min value of the primary key
—case-insensitivesqoop.args.case.insensitiveData Base is case insensitive, split where condition transfrom to lower case!
—columns <col,col,col…>sqoop.args.columnsColumns to import from table
—compression-codec <codec>sqoop.args.compression.codecCompression codec to use for import
—delete-target-dirsqoop.args.delete.target.dirImports data in delete mode
—directsqoop.args.directUse direct import fast path
—direct-split-size <n>sqoop.args.direct.split.sizeSplit the input stream every ‘n’ bytes when importing in direct mode
-e,—query <statement>sqoop.args.queryImport results of SQL ‘statement’
—fetch-size <n>sqoop.args.fetch.sizeSet number ‘n’ of rows to fetch from the database when more rows are needed
—inline-lob-limit <n>sqoop.args.inline.lob.limitSet the maximum size for an inline LOB
-m,—num-mappers <n>sqoop.args.num.mappersUse ‘n’ map tasks to import in parallel
—mapreduce-job-name <name>sqoop.args.mapreduce.job.nameSet name for generated mapreduce job
—merge-key <column>sqoop.args.merge.keyKey column to use to join results
—split-by <column-name>sqoop.args.split.byColumn of the table used to split work units
—table <table-name>sqoop.args.tableTable to read
—target-dir <dir>sqoop.args.target.dirHDFS plain table destination
—validatesqoop.args.validateValidate the copy using the configured validator
—validation-failurehandler <validation-failurehandler>sqoop.args.validation.failurehandlerFully qualified class name for ValidationFa ilureHandler
—validation-threshold <validation-threshold>sqoop.args.validation.thresholdFully qualified class name for ValidationTh reshold
—validator <validator>sqoop.args.validatorFully qualified class name for the Validator
—warehouse-dir <dir>sqoop.args.warehouse.dirHDFS parent for table destination
—where <where clause>sqoop.args.whereWHERE clause to use during import
-z,—compresssqoop.args.compressEnable compression
参数key说明
—check-column <column>sqoop.args.check.columnSource column to check for incremental change
—incremental <import-type>sqoop.args.incrementalDefine an incremental import of type ‘append’ or ‘lastmodified’
—last-value <value>sqoop.args.last.valueLast imported value in the incremental check column
参数key说明
—enclosed-by <char>sqoop.args.enclosed.bySets a required field enclosing character
—escaped-by <char>sqoop.args.escaped.bySets the escape character
—fields-terminated-by <char>sqoop.args.fields.terminated.bySets the field separator character
—lines-terminated-by <char>sqoop.args.lines.terminated.bySets the end-of-line character
—mysql-delimiterssqoop.args.mysql.delimitersUses MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: ‘
—optionally-enclosed-by <char>sqoop.args.optionally.enclosed.bySets a field enclosing character
参数key说明
—input-enclosed-by <char>sqoop.args.input.enclosed.bySets a required field encloser
—input-escaped-by <char>sqoop.args.input.escaped.bySets the input escape character
—input-fields-terminated-by <char>sqoop.args.input.fields.terminated.bySets the input field separator
—input-lines-terminated-by <char>sqoop.args.input.lines.terminated.bySets the input end-of-line char
—input-optionally-enclosed-by <char>sqoop.args.input.optionally.enclosed.bySets a field enclosing character
参数key说明
—create-hive-tablesqoop.args.create.hive.tableFail if the target hive table exists
—hive-database <database-name>sqoop.args.hive.databaseSets the database name to use when importing to hive
—hive-delims-replacement <arg>sqoop.args.hive.delims.replacementReplace Hive record \0x01 and row delimiters (\n\r) from imported string fields with user-defined string
—hive-drop-import-delimssqoop.args.hive.drop.import.delimsDrop Hive record \0x01 and row delimiters (\n\r) from imported string fields
—hive-home <dir>sqoop.args.hive.homeOverride $HIVE_HOME
—hive-importsqoop.args.hive.importImport tables into Hive (Uses Hive’s default delimiters if none are set.)
—hive-overwritesqoop.args.hive.overwriteOverwrite existing data in the Hive table
—hive-partition-key <partition-key>sqoop.args.hive.partition.keySets the partition key to use when importing to hive
—hive-partition-value <partition-value>sqoop.args.hive.partition.valueSets the partition value to use when importing to hive
—hive-table <table-name>sqoop.args.hive.tableSets the table name to use when importing to hive
—map-column-hive <arg>sqoop.args.map.column.hiveOverride mapping for specific column to hive types.
参数key说明
—column-family <family>sqoop.args.column.familySets the target column family for the import
—hbase-bulkloadsqoop.args.hbase.bulkloadEnables HBase bulk loading
—hbase-create-tablesqoop.args.hbase.create.tableIf specified, create missing HBase tables
—hbase-row-key <col>sqoop.args.hbase.row.keySpecifies which input column to use as the row key
—hbase-table <table>sqoop.args.hbase.tableImport to <table>in HBase
参数key说明
—hcatalog-database <arg>sqoop.args.hcatalog.databaseHCatalog database name
—hcatalog-home <hdir>sqoop.args.hcatalog.homeOverride $HCAT_HOME
—hcatalog-partition-keys <partition-key>sqoop.args.hcatalog.partition.keysSets the partition keys to use when importing to hive
—hcatalog-partition-values <partition-value>sqoop.args.hcatalog.partition.valuesSets the partition values to use when importing to hive
—hcatalog-table <arg>sqoop.args.hcatalog.tableHCatalog table name
—hive-home <dir>sqoop.args.hive.homeOverride $HIVE_HOME
—hive-partition-key <partition-key>sqoop.args.hive.partition.keySets the partition key to use when importing to hive
—hive-partition-value <partition-value>sqoop.args.hive.partition.valueSets the partition value to use when importing to hive
—map-column-hive <arg>sqoop.args.map.column.hiveOverride mapping for specific column to hive types.
HCatalog import specific options:
—create-hcatalog-tablesqoop.args.create.hcatalog.tableCreate HCatalog before import
—hcatalog-storage-stanza <arg>sqoop.args.hcatalog.storage.stanzaHCatalog storage stanza for table creation
参数key说明
—accumulo-batch-size <size>sqoop.args.accumulo.batch.sizeBatch size in bytes
—accumulo-column-family <family>sqoop.args.accumulo.column.familySets the target column family for the import
—accumulo-create-tablesqoop.args.accumulo.create.tableIf specified, create missing Accumulo tables
—accumulo-instance <instance>sqoop.args.accumulo.instanceAccumulo instance name.
—accumulo-max-latency <latency>sqoop.args.accumulo.max.latencyMax write latency in milliseconds
—accumulo-password <password>sqoop.args.accumulo.passwordAccumulo password.
—accumulo-row-key <col>sqoop.args.accumulo.row.keySpecifies which input column to use as the row key
—accumulo-table <table>sqoop.args.accumulo.tableImport to <table>in Accumulo
—accumulo-user <user>sqoop.args.accumulo.userAccumulo user name.
—accumulo-visibility <vis>sqoop.args.accumulo.visibilityVisibility token to be applied to all rows imported
—accumulo-zookeepers <zookeepers>sqoop.args.accumulo.zookeepersComma-separated list of zookeepers (host:port)
参数key说明
—bindir <dir>sqoop.args.bindirOutput directory for compiled objects
—class-name <name>sqoop.args.class.nameSets the generated class name. This overrides —package-name. When combined with —jar-file, sets the input class.
—input-null-non-string <null-str>sqoop.args.input.null.non.stringInput null non-string representation
—input-null-string <null-str>sqoop.args.input.null.stringInput null string representation
—jar-file <file>sqoop.args.jar.fileDisable code generation; use specified jar
—map-column-java <arg>sqoop.args.map.column.javaOverride mapping for specific columns to java types
—null-non-string <null-str>sqoop.args.null.non.stringNull non-string representation
—null-string <null-str>sqoop.args.null.stringNull string representation
—outdir <dir>sqoop.args.outdirOutput directory for generated code
—package-name <name>sqoop.args.package.namePut auto-generated classes in this package

must preceed any tool-specific arguments,Generic options supported are

参数key说明
-conf <configuration file>sqoop.args.confspecify an application configuration file
-D <property=value>sqoop.args.Duse value for given property
-fs <localnamenode:port>sqoop.args.fs
-jt <localresourcemanager:port>sqoop.args.jt
-files <comma separated list of files>sqoop.args.filesspecify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>sqoop.args.libjarsspecify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>sqoop.args.archivesspecify comma separated archives to be unarchived on the compute machines.