JAVA SDK Manual

Linkis provides a convenient interface for calling JAVA and SCALA. It can be used only by introducing the linkis-computation-client module. After 1.0, the method of submitting with Label is added. The following will introduce both ways that compatible with 0.X and newly added in 1.0.

1. Introduce dependent modules

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. Such as:
  7. <dependency>
  8. <groupId>org.apache.linkis</groupId>
  9. <artifactId>linkis-computation-client</artifactId>
  10. <version>1.0.3</version>
  11. </dependency>

2. Java test code

Create the Java test class LinkisClientTest. Refer to the comments to understand the purposes of those interfaces:

  1. package org.apache.linkis.client.test;
  2. import org.apache.linkis.common.utils.Utils;
  3. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
  5. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import org.apache.linkis.manager.label.constant.LabelKeyConstant;
  7. import org.apache.linkis.protocol.constants.TaskConstant;
  8. import org.apache.linkis.ujes.client.UJESClient;
  9. import org.apache.linkis.ujes.client.UJESClientImpl;
  10. import org.apache.linkis.ujes.client.request.JobSubmitAction;
  11. import org.apache.linkis.ujes.client.request.JobExecuteAction;
  12. import org.apache.linkis.ujes.client.request.ResultSetAction;
  13. import org.apache.linkis.ujes.client.response.*;
  14. import org.apache.commons.io.IOUtils;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.concurrent.TimeUnit;
  18. public class LinkisClientTest {
  19. // 1. build config: linkis gateway url
  20. private static DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  21. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  22. .connectionTimeout(30000) //connectionTimeOut
  23. .discoveryEnabled(false) //disable discovery
  24. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  25. .loadbalancerEnabled(true) // enable loadbalance
  26. .maxConnectionSize(5) // set max Connection
  27. .retryEnabled(false) // set retry
  28. .readTimeout(30000) //set read timeout
  29. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  30. .setAuthTokenKey("hadoop") // set submit user
  31. .setAuthTokenValue("hadoop"))) // set passwd or token (setAuthTokenValue("BML-AUTH"))
  32. .setDWSVersion("v1") //linkis rest version v1
  33. .build();
  34. // 2. new Client(Linkis Client) by clientConfig
  35. private static UJESClient client = new UJESClientImpl(clientConfig);
  36. public static void main(String[] args){
  37. String user = "hadoop"; // execute user
  38. String executeCode = "df=spark.sql(\"show tables\")\n" +
  39. "show(df)"; // code support:sql/hql/py/scala
  40. try {
  41. System.out.println("user : " + user + ", code : [" + executeCode + "]");
  42. // 3. build job and execute
  43. JobExecuteResult jobExecuteResult = toSubmit(user, executeCode);
  44. //0.x:JobExecuteResult jobExecuteResult = toExecute(user, executeCode);
  45. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  46. // 4. get job jonfo
  47. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  48. int sleepTimeMills = 1000;
  49. int logFromLen = 0;
  50. int logSize = 100;
  51. while(!jobInfoResult.isCompleted()) {
  52. // 5. get progress and log
  53. JobProgressResult progress = client.progress(jobExecuteResult);
  54. System.out.println("progress: " + progress.getProgress());
  55. JobLogResult logRes = client.log(jobExecuteResult, logFromLen, logSize);
  56. logFromLen = logRes.fromLine();
  57. // 0: info 1: warn 2: error 3: all
  58. System.out.println(logRes.log().get(3));
  59. Utils.sleepQuietly(sleepTimeMills);
  60. jobInfoResult = client.getJobInfo(jobExecuteResult);
  61. }
  62. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  63. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  64. // multiple result sets will be generated)
  65. String resultSet = jobInfo.getResultSetList(client)[0];
  66. // 7. get resultContent
  67. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
  68. System.out.println("res: " + fileContents);
  69. } catch (Exception e) {
  70. e.printStackTrace();
  71. IOUtils.closeQuietly(client);
  72. }
  73. IOUtils.closeQuietly(client);
  74. }
  75. /**
  76. * Linkis 1.0 recommends the use of Submit method
  77. */
  78. private static JobExecuteResult toSubmit(String user, String code) {
  79. // 1. build params
  80. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  81. Map<String, Object> labels = new HashMap<String, Object>();
  82. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  83. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-IDE");// required execute user and creator
  84. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
  85. // set start up map :engineConn start params
  86. Map<String, Object> startupMap = new HashMap<String, Object>(16);
  87. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  88. startupMap.put("spark.executor.instances", 2);
  89. // setting linkis params
  90. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  91. // 2. build jobSubmitAction
  92. JobSubmitAction jobSubmitAction = JobSubmitAction.builder()
  93. .addExecuteCode(code)
  94. .setStartupParams(startupMap)
  95. .setUser(user) //submit user
  96. .addExecuteUser(user) // execute user
  97. .setLabels(labels)
  98. .build();
  99. // 3. to execute
  100. return client.submit(jobSubmitAction);
  101. }
  102. /**
  103. * Compatible with 0.X execution mode
  104. */
  105. private static JobExecuteResult toExecute(String user, String code) {
  106. // 1. build params
  107. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  108. Map<String, Object> labels = new HashMap<String, Object>();
  109. // labels.put(LabelKeyConstant.TENANT_KEY, "fate");
  110. // set start up map :engineConn start params
  111. Map<String, Object> startupMap = new HashMap<String, Object>(16);
  112. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  113. startupMap.put("spark.executor.instances", 2);
  114. // setting linkis params
  115. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  116. // 2. build JobExecuteAction (0.X old way of using)
  117. JobExecuteAction executionAction = JobExecuteAction.builder()
  118. .setCreator("IDE") //creator, the system name of the client requesting linkis, used for system-level isolation
  119. .addExecuteCode(code) //Execution Code
  120. .setEngineTypeStr("spark") // engineConn type
  121. .setRunTypeStr("py") // code type
  122. .setUser(user) //execute user
  123. .setStartupParams(startupMap) // start up params
  124. .build();
  125. executionAction.addRequestPayload(TaskConstant.LABELS, labels);
  126. String body = executionAction.getRequestPayload();
  127. System.out.println(body);
  128. // 3. to execute
  129. return client.execute(executionAction);
  130. }
  131. }

Run the above code to interact with Linkis

3. Scala test code

Create the Scala test class LinkisClientTest. Refer to the comments to understand the purposes of those interfaces:

  1. package org.apache.linkis.client.test
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import org.apache.linkis.common.utils.Utils
  5. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  6. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  7. import org.apache.linkis.manager.label.constant.LabelKeyConstant
  8. import org.apache.linkis.protocol.constants.TaskConstant
  9. import org.apache.linkis.ujes.client.UJESClient
  10. import org.apache.linkis.ujes.client.request._
  11. import org.apache.linkis.ujes.client.response._
  12. import org.apache.commons.io.IOUtils
  13. import org.apache.commons.lang.StringUtils
  14. object LinkisClientTest {
  15. // 1. build config: linkis gateway url
  16. val clientConfig = DWSClientConfigBuilder.newBuilder()
  17. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  18. .connectionTimeout(30000) //connectionTimeOut
  19. .discoveryEnabled(false) //disable discovery
  20. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  21. .loadbalancerEnabled(true) // enable loadbalance
  22. .maxConnectionSize(5) // set max Connection
  23. .retryEnabled(false) // set retry
  24. .readTimeout(30000) //set read timeout
  25. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  26. .setAuthTokenKey("hadoop") // set submit user
  27. .setAuthTokenValue("hadoop") // set passwd or token (setAuthTokenValue("BML-AUTH"))
  28. .setDWSVersion("v1") //linkis rest version v1
  29. .build();
  30. // 2. new Client(Linkis Client) by clientConfig
  31. val client = UJESClient(clientConfig)
  32. def main(args: Array[String]): Unit = {
  33. val user = "hadoop" // execute user
  34. val executeCode = "df=spark.sql(\"show tables\")\n" +
  35. "show(df)"; // code support:sql/hql/py/scala
  36. try {
  37. // 3. build job and execute
  38. println("user : " + user + ", code : [" + executeCode + "]")
  39. val jobExecuteResult = toSubmit(user, executeCode)
  40. //0.X: val jobExecuteResult = toExecute(user, executeCode)
  41. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  42. // 4. get job jonfo
  43. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  44. var logFromLen = 0
  45. val logSize = 100
  46. val sleepTimeMills : Int = 1000
  47. while (!jobInfoResult.isCompleted) {
  48. // 5. get progress and log
  49. val progress = client.progress(jobExecuteResult)
  50. println("progress: " + progress.getProgress)
  51. val logObj = client .log(jobExecuteResult, logFromLen, logSize)
  52. logFromLen = logObj.fromLine
  53. val logArray = logObj.getLog
  54. // 0: info 1: warn 2: error 3: all
  55. if (logArray != null && logArray.size >= 4 && StringUtils.isNotEmpty(logArray.get(3))) {
  56. println(s"log: ${logArray.get(3)}")
  57. }
  58. Utils.sleepQuietly(sleepTimeMills)
  59. jobInfoResult = client.getJobInfo(jobExecuteResult)
  60. }
  61. if (!jobInfoResult.isSucceed) {
  62. println("Failed to execute job: " + jobInfoResult.getMessage)
  63. throw new Exception(jobInfoResult.getMessage)
  64. }
  65. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  66. // multiple result sets will be generated)
  67. val jobInfo = client.getJobInfo(jobExecuteResult)
  68. val resultSetList = jobInfoResult.getResultSetList(client)
  69. println("All result set list:")
  70. resultSetList.foreach(println)
  71. val oneResultSet = jobInfo.getResultSetList(client).head
  72. // 7. get resultContent
  73. val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
  74. println("First fileContents: ")
  75. println(fileContents)
  76. } catch {
  77. case e: Exception => {
  78. e.printStackTrace()
  79. }
  80. }
  81. IOUtils.closeQuietly(client)
  82. }
  83. /**
  84. * Linkis 1.0 recommends the use of Submit method
  85. */
  86. def toSubmit(user: String, code: String): JobExecuteResult = {
  87. // 1. build params
  88. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  89. val labels: util.Map[String, Any] = new util.HashMap[String, Any]
  90. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  91. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-IDE");// required execute user and creator
  92. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
  93. val startupMap = new java.util.HashMap[String, Any]()
  94. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  95. startupMap.put("spark.executor.instances", 2);
  96. // setting linkis params
  97. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  98. // 2. build jobSubmitAction
  99. val jobSubmitAction = JobSubmitAction.builder
  100. .addExecuteCode(code)
  101. .setStartupParams(startupMap)
  102. .setUser(user) //submit user
  103. .addExecuteUser(user) //execute user
  104. .setLabels(labels)
  105. .build
  106. // 3. to execute
  107. client.submit(jobSubmitAction)
  108. }
  109. /**
  110. * Compatible with 0.X execution mode
  111. */
  112. def toExecute(user: String, code: String): JobExecuteResult = {
  113. // 1. build params
  114. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  115. val labels = new util.HashMap[String, Any]
  116. // labels.put(LabelKeyConstant.TENANT_KEY, "fate");
  117. val startupMap = new java.util.HashMap[String, Any]()
  118. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  119. startupMap.put("spark.executor.instances", 2)
  120. // setting linkis params
  121. startupMap.put("wds.linkis.rm.yarnqueue", "dws")
  122. // 2. build JobExecuteAction (0.X old way of using)
  123. val executionAction = JobExecuteAction.builder()
  124. .setCreator("IDE") //creator, the system name of the client requesting linkis, used for system-level isolation
  125. .addExecuteCode(code) //Execution Code
  126. .setEngineTypeStr("spark") // engineConn type
  127. .setRunTypeStr("py") // code type
  128. .setUser(user) //execute user
  129. .setStartupParams(startupMap) // start up params
  130. .build();
  131. executionAction.addRequestPayload(TaskConstant.LABELS, labels);
  132. // 3. to execute
  133. client.execute(executionAction)
  134. }
  135. }