Sqoop Engine usage documentation

This article mainly introduces the configuration, deployment and use of the Sqoop engine in Linkis1.X.

1.Sqoop engine Linkis system parameter configuration

The Sqoop engine mainly depends on the Hadoop basic environment. If the node needs to deploy the Sqoop engine, the Hadoop client environment needs to be deployed.

It is strongly recommended that you use the native Sqoop to execute the test task on the node before executing the Sqoop task to check whether the node environment is normal.

Environment Variable NameEnvironment Variable ContentRemark
JAVA_HOMEJDK installation pathRequired
HADOOP_HOMEHadoop installation pathRequired
HADOOP_CONF_DIRHadoop installation pathRequired
SQOOP_HOMESqoop installation pathNot Required
SQOOP_CONF_DIRSqoop config pathNot Required
HCAT_HOMEHCAT config pathNot Required
HBASE_HOMEHBASE config pathNot Required

表1-1 环境配置清单

Linkis Parameter NameParameter ContentRemark
wds.linkis.hadoop.site.xmlSet sqoop to load hadoop parameter file locationRequired,Reference example:”/etc/hadoop/conf/core-site.xml;/etc/hadoop/conf/hdfs-site.xml;/etc/hadoop/conf/yarn-site.xml;/etc/hadoop/conf/mapred-site.xml”
sqoop.fetch.status.intervalSet the interval time for obtaining sqoop execution statusNot required, the default value is 5s

2.Sqoop Engine configuration and deployment

2.1 Sqoop Version selection and compilation

Mainstream Sqoop versions 1.4.6 and 1.4.7 supported by Linkis 1.1.2 and above, and later versions may need to modify some code and recompile

2.2 Sqoop engineConn deploy and load

Note: Before compiling the sqoop engine, the linkis project needs to be fully compiled

  1. Compile sqoop separately:
  2. ${linkis_code_dir}linkis-engineconn-plugins/sqoop/
  3. mvn clean install

The installation method is to compile the compiled engine package, located in

  1. ${linkis_code_dir}linkis-engineconn-plugins/sqoop/target/sqoop-engineconn.zip

and then deploy to

  1. ${LINKIS_HOME}/lib/linkis-engineplugins

and restart linkis-engineplugin

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-engineplugin

More engineplugin details can be found in the following article.
https://linkis.apache.org/zh-CN/docs/1.1.1/deployment/engine-conn-plugin-installation

3.Sqoop Engine Usage

3.1 OnceEngineConn

OnceEngineConn is used by calling LinkisManager’s createEngineConn interface through LinkisManagerClient, and sending the code to the created Sqoop engine, and then the Sqoop engine starts to execute. This method can be called by other systems, such as Exchange. The use of Client is also very simple, first create a new maven project, or introduce the following dependencies into your project

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

Test Case:

  1. package com.webank.wedatasphere.exchangis.job.server.log.client
  2. import java.util.concurrent.TimeUnit
  3. import java.util
  4. import org.apache.linkis.computation.client.LinkisJobBuilder
  5. import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
  6. import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
  7. import org.apache.linkis.computation.client.utils.LabelKeyUtils
  8. import scala.collection.JavaConverters._
  9. object SqoopOnceJobTest extends App {
  10. LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001")
  11. val logPath = "C:\\Users\\resources\\log4j.properties"
  12. System.setProperty("log4j.configurationFile", logPath)
  13. val startUpMap = new util.HashMap[String, Any]
  14. startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")
  15. val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client")
  16. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
  17. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client")
  18. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
  19. .setStartupParams(startUpMap)
  20. .setMaxSubmitTime(30000)
  21. .addExecuteUser("freeuser")
  22. val onceJob = importJob(builder)
  23. val time = System.currentTimeMillis()
  24. onceJob.submit()
  25. println(onceJob.getId)
  26. val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
  27. println(onceJob.getECMServiceInstance)
  28. logOperator.setFromLine(0)
  29. logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
  30. logOperator.setEngineConnType("sqoop")
  31. logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
  32. var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
  33. var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
  34. var end = false
  35. var rowBefore = 1
  36. while (!end || rowBefore > 0){
  37. if(onceJob.isCompleted) {
  38. end = true
  39. metricOperator = null
  40. }
  41. logOperator.setPageSize(100)
  42. Utils.tryQuietly{
  43. val logs = logOperator.apply()
  44. logs.logs.asScala.foreach( log => {
  45. println(log)
  46. })
  47. rowBefore = logs.logs.size
  48. }
  49. Thread.sleep(3000)
  50. Option(metricOperator).foreach( operator => {
  51. if (!onceJob.isCompleted){
  52. println(s"Metric Monitor: ${operator.apply()}")
  53. println(s"Progress: ${progressOperator.apply()}")
  54. }
  55. })
  56. }
  57. onceJob.isCompleted
  58. onceJob.waitForCompleted()
  59. println(onceJob.getStatus)
  60. println(TimeUnit.SECONDS.convert(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS) + "s")
  61. System.exit(0)
  62. def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  63. jobBuilder
  64. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10")
  65. .addJobContent("sqoop.mode", "import")
  66. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
  67. .addJobContent("sqoop.args.username", "free")
  68. .addJobContent("sqoop.args.password", "testpwd")
  69. .addJobContent("sqoop.args.query", "select id as order_number, sno as time from" +
  70. " exchangis where sno =1 and $CONDITIONS")
  71. .addJobContent("sqoop.args.hcatalog.database", "freedb")
  72. .addJobContent("sqoop.args.hcatalog.table", "zy_test")
  73. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
  74. .addJobContent("sqoop.args.hcatalog.partition.values", "3")
  75. .addJobContent("sqoop.args.num.mappers", "1")
  76. .build()
  77. }
  78. def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  79. jobBuilder
  80. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue1")
  81. .addJobContent("sqoop.mode", "import")
  82. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
  83. .addJobContent("sqoop.args.query", "select id as order, sno as great_time from" +
  84. " exchangis_table where sno =1 and $CONDITIONS")
  85. .addJobContent("sqoop.args.hcatalog.database", "hadoop")
  86. .addJobContent("sqoop.args.hcatalog.table", "partition_33")
  87. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
  88. .addJobContent("sqoop.args.hcatalog.partition.values", "4")
  89. .addJobContent("sqoop.args.num.mappers", "1")
  90. .build()
  91. }

Parameter Comparison table (with native parameters):**

  1. sqoop.env.mapreduce.job.queuename<=>-Dmapreduce.job.queuename
  2. sqoop.args.connection.manager<===>--connection-manager
  3. sqoop.args.connection.param.file<===>--connection-param-file
  4. sqoop.args.driver<===>--driver
  5. sqoop.args.hadoop.home<===>--hadoop-home
  6. sqoop.args.hadoop.mapred.home<===>--hadoop-mapred-home
  7. sqoop.args.help<===>help
  8. sqoop.args.password<===>--password
  9. sqoop.args.password.alias<===>--password-alias
  10. sqoop.args.password.file<===>--password-file
  11. sqoop.args.relaxed.isolation<===>--relaxed-isolation
  12. sqoop.args.skip.dist.cache<===>--skip-dist-cache
  13. sqoop.args.username<===>--username
  14. sqoop.args.verbose<===>--verbose
  15. sqoop.args.append<===>--append
  16. sqoop.args.as.avrodatafile<===>--as-avrodatafile
  17. sqoop.args.as.parquetfile<===>--as-parquetfile
  18. sqoop.args.as.sequencefile<===>--as-sequencefile
  19. sqoop.args.as.textfile<===>--as-textfile
  20. sqoop.args.autoreset.to.one.mapper<===>--autoreset-to-one-mapper
  21. sqoop.args.boundary.query<===>--boundary-query
  22. sqoop.args.case.insensitive<===>--case-insensitive
  23. sqoop.args.columns<===>--columns
  24. sqoop.args.compression.codec<===>--compression-codec
  25. sqoop.args.delete.target.dir<===>--delete-target-dir
  26. sqoop.args.direct<===>--direct
  27. sqoop.args.direct.split.size<===>--direct-split-size
  28. sqoop.args.query<===>--query
  29. sqoop.args.fetch.size<===>--fetch-size
  30. sqoop.args.inline.lob.limit<===>--inline-lob-limit
  31. sqoop.args.num.mappers<===>--num-mappers
  32. sqoop.args.mapreduce.job.name<===>--mapreduce-job-name
  33. sqoop.args.merge.key<===>--merge-key
  34. sqoop.args.split.by<===>--split-by
  35. sqoop.args.table<===>--table
  36. sqoop.args.target.dir<===>--target-dir
  37. sqoop.args.validate<===>--validate
  38. sqoop.args.validation.failurehandler<===>--validation-failurehandler
  39. sqoop.args.validation.threshold<===> --validation-threshold
  40. sqoop.args.validator<===>--validator
  41. sqoop.args.warehouse.dir<===>--warehouse-dir
  42. sqoop.args.where<===>--where
  43. sqoop.args.compress<===>--compress
  44. sqoop.args.check.column<===>--check-column
  45. sqoop.args.incremental<===>--incremental
  46. sqoop.args.last.value<===>--last-value
  47. sqoop.args.enclosed.by<===>--enclosed-by
  48. sqoop.args.escaped.by<===>--escaped-by
  49. sqoop.args.fields.terminated.by<===>--fields-terminated-by
  50. sqoop.args.lines.terminated.by<===>--lines-terminated-by
  51. sqoop.args.mysql.delimiters<===>--mysql-delimiters
  52. sqoop.args.optionally.enclosed.by<===>--optionally-enclosed-by
  53. sqoop.args.input.enclosed.by<===>--input-enclosed-by
  54. sqoop.args.input.escaped.by<===>--input-escaped-by
  55. sqoop.args.input.fields.terminated.by<===>--input-fields-terminated-by
  56. sqoop.args.input.lines.terminated.by<===>--input-lines-terminated-by
  57. sqoop.args.input.optionally.enclosed.by<===>--input-optionally-enclosed-by
  58. sqoop.args.create.hive.table<===>--create-hive-table
  59. sqoop.args.hive.delims.replacement<===>--hive-delims-replacement
  60. sqoop.args.hive.database<===>--hive-database
  61. sqoop.args.hive.drop.import.delims<===>--hive-drop-import-delims
  62. sqoop.args.hive.home<===>--hive-home
  63. sqoop.args.hive.import<===>--hive-import
  64. sqoop.args.hive.overwrite<===>--hive-overwrite
  65. sqoop.args.hive.partition.value<===>--hive-partition-value
  66. sqoop.args.hive.table<===>--hive-table
  67. sqoop.args.column.family<===>--column-family
  68. sqoop.args.hbase.bulkload<===>--hbase-bulkload
  69. sqoop.args.hbase.create.table<===>--hbase-create-table
  70. sqoop.args.hbase.row.key<===>--hbase-row-key
  71. sqoop.args.hbase.table<===>--hbase-table
  72. sqoop.args.hcatalog.database<===>--hcatalog-database
  73. sqoop.args.hcatalog.home<===>--hcatalog-home
  74. sqoop.args.hcatalog.partition.keys<===>--hcatalog-partition-keys
  75. sqoop.args.hcatalog.partition.values<===>--hcatalog-partition-values
  76. sqoop.args.hcatalog.table<===>--hcatalog-table
  77. sqoop.args.hive.partition.key<===>--hive-partition-key
  78. sqoop.args.map.column.hive<===>--map-column-hive
  79. sqoop.args.create.hcatalog.table<===>--create-hcatalog-table
  80. sqoop.args.hcatalog.storage.stanza<===>--hcatalog-storage-stanza
  81. sqoop.args.accumulo.batch.size<===>--accumulo-batch-size
  82. sqoop.args.accumulo.column.family<===>--accumulo-column-family
  83. sqoop.args.accumulo.create.table<===>--accumulo-create-table
  84. sqoop.args.accumulo.instance<===>--accumulo-instance
  85. sqoop.args.accumulo.max.latency<===>--accumulo-max-latency
  86. sqoop.args.accumulo.password<===>--accumulo-password
  87. sqoop.args.accumulo.row.key<===>--accumulo-row-key
  88. sqoop.args.accumulo.table<===>--accumulo-table
  89. sqoop.args.accumulo.user<===>--accumulo-user
  90. sqoop.args.accumulo.visibility<===>--accumulo-visibility
  91. sqoop.args.accumulo.zookeepers<===>--accumulo-zookeepers
  92. sqoop.args.bindir<===>--bindir
  93. sqoop.args.class.name<===>--class-name
  94. sqoop.args.input.null.non.string<===>--input-null-non-string
  95. sqoop.args.input.null.string<===>--input-null-string
  96. sqoop.args.jar.file<===>--jar-file
  97. sqoop.args.map.column.java<===>--map-column-java
  98. sqoop.args.null.non.string<===>--null-non-string
  99. sqoop.args.null.string<===>--null-string
  100. sqoop.args.outdir<===>--outdir
  101. sqoop.args.package.name<===>--package-name
  102. sqoop.args.conf<===>-conf
  103. sqoop.args.D<===>-D
  104. sqoop.args.fs<===>-fs
  105. sqoop.args.jt<===>-jt
  106. sqoop.args.files<===>-files
  107. sqoop.args.libjars<===>-libjars
  108. sqoop.args.archives<===>-archives
  109. sqoop.args.update.key<===>--update-key
  110. sqoop.args.update.mode<===>--update-mode
  111. sqoop.args.export.dir<===>--export-dir