JAVA SDK 方式使用

Linkis 提供了方便的JAVA和SCALA调用的接口,只需要引入linkis-computation-client的模块就可以进行使用,1.0后新增支持带Label提交的方式,下面将对兼容0.X的方式和1.0新增的方式进行介绍

1. 引入依赖模块

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. 如:
  7. <dependency>
  8. <groupId>org.apache.linkis</groupId>
  9. <artifactId>linkis-computation-client</artifactId>
  10. <version>1.0.2</version>
  11. </dependency>

2. Java测试代码

建立Java的测试类LinkisClientTest,具体接口含义可以见注释:

  1. package com.webank.wedatasphere.linkis.client.test;
  2. import com.webank.wedatasphere.linkis.common.utils.Utils;
  3. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;
  5. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant;
  7. import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant;
  8. import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
  9. import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;
  10. import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction;
  11. import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction;
  12. import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;
  13. import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
  14. import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;
  15. import com.webank.wedatasphere.linkis.ujes.client.response.JobLogResult;
  16. import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;
  17. import org.apache.commons.io.IOUtils;
  18. import java.util.HashMap;
  19. import java.util.Map;
  20. import java.util.concurrent.TimeUnit;
  21. public class JavaClientTest {
  22. // 1. build config: linkis gateway url
  23. private static DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  24. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  25. .connectionTimeout(30000) //connectionTimeOut
  26. .discoveryEnabled(false) //disable discovery
  27. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  28. .loadbalancerEnabled(true) // enable loadbalance
  29. .maxConnectionSize(5) // set max Connection
  30. .retryEnabled(false) // set retry
  31. .readTimeout(30000) //set read timeout
  32. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  33. .setAuthTokenKey("hadoop") // set submit user
  34. .setAuthTokenValue("hadoop"))) // set passwd or token (setAuthTokenValue("BML-AUTH"))
  35. .setDWSVersion("v1") //linkis rest version v1
  36. .build();
  37. // 2. new Client(Linkis Client) by clientConfig
  38. private static UJESClient client = new UJESClientImpl(clientConfig);
  39. public static void main(String[] args){
  40. String user = "hadoop"; // execute user
  41. String executeCode = "df=spark.sql(\"show tables\")\n" +
  42. "show(df)"; // code support:sql/hql/py/scala
  43. try {
  44. System.out.println("user : " + user + ", code : [" + executeCode + "]");
  45. // 3. build job and execute
  46. JobExecuteResult jobExecuteResult = toSubmit(user, executeCode);
  47. //0.x:JobExecuteResult jobExecuteResult = toExecute(user, executeCode);
  48. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  49. // 4. get job jonfo
  50. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  51. int sleepTimeMills = 1000;
  52. int logFromLen = 0;
  53. int logSize = 100;
  54. while(!jobInfoResult.isCompleted()) {
  55. // 5. get progress and log
  56. JobProgressResult progress = client.progress(jobExecuteResult);
  57. System.out.println("progress: " + progress.getProgress());
  58. JobLogResult logRes = client.log(jobExecuteResult, logFromLen, logSize);
  59. logFromLen = logRes.fromLine();
  60. // 0: info 1: warn 2: error 3: all
  61. System.out.println(logRes.log().get(3));
  62. Utils.sleepQuietly(sleepTimeMills);
  63. jobInfoResult = client.getJobInfo(jobExecuteResult);
  64. }
  65. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  66. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  67. // multiple result sets will be generated)
  68. String resultSet = jobInfo.getResultSetList(client)[0];
  69. // 7. get resultContent
  70. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
  71. System.out.println("res: " + fileContents);
  72. } catch (Exception e) {
  73. e.printStackTrace();
  74. IOUtils.closeQuietly(client);
  75. }
  76. IOUtils.closeQuietly(client);
  77. }
  78. /**
  79. * Linkis 1.0 recommends the use of Submit method
  80. */
  81. private static JobExecuteResult toSubmit(String user, String code) {
  82. // 1. build params
  83. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  84. Map<String, Object> labels = new HashMap<String, Object>();
  85. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  86. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-IDE");// required execute user and creator
  87. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
  88. // set start up map :engineConn start params
  89. Map<String, Object> startupMap = new HashMap<String, Object>(16);
  90. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  91. startupMap.put("spark.executor.instances", 2);
  92. // setting linkis params
  93. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  94. // 2. build jobSubmitAction
  95. JobSubmitAction jobSubmitAction = JobSubmitAction.builder()
  96. .addExecuteCode(code)
  97. .setStartupParams(startupMap)
  98. .setUser(user) //submit user
  99. .addExecuteUser(user) // execute user
  100. .setLabels(labels)
  101. .build();
  102. // 3. to execute
  103. return client.submit(jobSubmitAction);
  104. }
  105. /**
  106. * Compatible with 0.X execution mode
  107. */
  108. private static JobExecuteResult toExecute(String user, String code) {
  109. // 1. build params
  110. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  111. Map<String, Object> labels = new HashMap<String, Object>();
  112. // labels.put(LabelKeyConstant.TENANT_KEY, "fate");
  113. // set start up map :engineConn start params
  114. Map<String, Object> startupMap = new HashMap<String, Object>(16);
  115. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  116. startupMap.put("spark.executor.instances", 2);
  117. // setting linkis params
  118. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  119. // 2. build JobExecuteAction (0.X old way of using)
  120. JobExecuteAction executionAction = JobExecuteAction.builder()
  121. .setCreator("IDE") //creator, the system name of the client requesting linkis, used for system-level isolation
  122. .addExecuteCode(code) //Execution Code
  123. .setEngineTypeStr("spark") // engineConn type
  124. .setRunTypeStr("py") // code type
  125. .setUser(user) //execute user
  126. .setStartupParams(startupMap) // start up params
  127. .build();
  128. executionAction.addRequestPayload(TaskConstant.LABELS, labels);
  129. String body = executionAction.getRequestPayload();
  130. System.out.println(body);
  131. // 3. to execute
  132. return client.execute(executionAction);
  133. }
  134. }

运行上述的代码即可以完成任务提交/执行/日志/结果集获取等

3. Scala测试代码:

  1. package com.webank.wedatasphere.linkis.client.test
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import com.webank.wedatasphere.linkis.common.utils.Utils
  5. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  6. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder
  7. import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant
  8. import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
  9. import com.webank.wedatasphere.linkis.ujes.client.UJESClient
  10. import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, JobSubmitAction, ResultSetAction}
  11. import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult
  12. import org.apache.commons.io.IOUtils
  13. import org.apache.commons.lang.StringUtils
  14. object ScalaClientTest {
  15. // 1. build config: linkis gateway url
  16. val clientConfig = DWSClientConfigBuilder.newBuilder()
  17. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  18. .connectionTimeout(30000) //connectionTimeOut
  19. .discoveryEnabled(false) //disable discovery
  20. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  21. .loadbalancerEnabled(true) // enable loadbalance
  22. .maxConnectionSize(5) // set max Connection
  23. .retryEnabled(false) // set retry
  24. .readTimeout(30000) //set read timeout
  25. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  26. .setAuthTokenKey("hadoop") // set submit user
  27. .setAuthTokenValue("hadoop") // set passwd or token (setAuthTokenValue("BML-AUTH"))
  28. .setDWSVersion("v1") //linkis rest version v1
  29. .build();
  30. // 2. new Client(Linkis Client) by clientConfig
  31. val client = UJESClient(clientConfig)
  32. def main(args: Array[String]): Unit = {
  33. val user = "hadoop" // execute user
  34. val executeCode = "df=spark.sql(\"show tables\")\n" +
  35. "show(df)"; // code support:sql/hql/py/scala
  36. try {
  37. // 3. build job and execute
  38. println("user : " + user + ", code : [" + executeCode + "]")
  39. val jobExecuteResult = toSubmit(user, executeCode)
  40. //0.X: val jobExecuteResult = toExecute(user, executeCode)
  41. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  42. // 4. get job jonfo
  43. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  44. var logFromLen = 0
  45. val logSize = 100
  46. val sleepTimeMills : Int = 1000
  47. while (!jobInfoResult.isCompleted) {
  48. // 5. get progress and log
  49. val progress = client.progress(jobExecuteResult)
  50. println("progress: " + progress.getProgress)
  51. val logObj = client .log(jobExecuteResult, logFromLen, logSize)
  52. logFromLen = logObj.fromLine
  53. val logArray = logObj.getLog
  54. // 0: info 1: warn 2: error 3: all
  55. if (logArray != null && logArray.size >= 4 && StringUtils.isNotEmpty(logArray.get(3))) {
  56. println(s"log: ${logArray.get(3)}")
  57. }
  58. Utils.sleepQuietly(sleepTimeMills)
  59. jobInfoResult = client.getJobInfo(jobExecuteResult)
  60. }
  61. if (!jobInfoResult.isSucceed) {
  62. println("Failed to execute job: " + jobInfoResult.getMessage)
  63. throw new Exception(jobInfoResult.getMessage)
  64. }
  65. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  66. // multiple result sets will be generated)
  67. val jobInfo = client.getJobInfo(jobExecuteResult)
  68. val resultSetList = jobInfoResult.getResultSetList(client)
  69. println("All result set list:")
  70. resultSetList.foreach(println)
  71. val oneResultSet = jobInfo.getResultSetList(client).head
  72. // 7. get resultContent
  73. val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
  74. println("First fileContents: ")
  75. println(fileContents)
  76. } catch {
  77. case e: Exception => {
  78. e.printStackTrace()
  79. }
  80. }
  81. IOUtils.closeQuietly(client)
  82. }
  83. /**
  84. * Linkis 1.0 recommends the use of Submit method
  85. */
  86. def toSubmit(user: String, code: String): JobExecuteResult = {
  87. // 1. build params
  88. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  89. val labels: util.Map[String, Any] = new util.HashMap[String, Any]
  90. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  91. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-IDE");// required execute user and creator
  92. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
  93. val startupMap = new java.util.HashMap[String, Any]()
  94. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  95. startupMap.put("spark.executor.instances", 2);
  96. // setting linkis params
  97. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  98. // 2. build jobSubmitAction
  99. val jobSubmitAction = JobSubmitAction.builder
  100. .addExecuteCode(code)
  101. .setStartupParams(startupMap)
  102. .setUser(user) //submit user
  103. .addExecuteUser(user) //execute user
  104. .setLabels(labels)
  105. .build
  106. // 3. to execute
  107. client.submit(jobSubmitAction)
  108. }
  109. /**
  110. * Compatible with 0.X execution mode
  111. */
  112. def toExecute(user: String, code: String): JobExecuteResult = {
  113. // 1. build params
  114. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  115. val labels = new util.HashMap[String, Any]
  116. // labels.put(LabelKeyConstant.TENANT_KEY, "fate");
  117. val startupMap = new java.util.HashMap[String, Any]()
  118. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  119. startupMap.put("spark.executor.instances", 2)
  120. // setting linkis params
  121. startupMap.put("wds.linkis.rm.yarnqueue", "dws")
  122. // 2. build JobExecuteAction (0.X old way of using)
  123. val executionAction = JobExecuteAction.builder()
  124. .setCreator("IDE") //creator, the system name of the client requesting linkis, used for system-level isolation
  125. .addExecuteCode(code) //Execution Code
  126. .setEngineTypeStr("spark") // engineConn type
  127. .setRunTypeStr("py") // code type
  128. .setUser(user) //execute user
  129. .setStartupParams(startupMap) // start up params
  130. .build();
  131. executionAction.addRequestPayload(TaskConstant.LABELS, labels);
  132. // 3. to execute
  133. client.execute(executionAction)
  134. }
  135. }