Use of 1.0 SDK

Linkis provides a convenient interface for calling JAVA and SCALA. You only need to introduce the linkis-computation-client module to use it. After 1.0, the method of submitting with Label is added. The following will be compatible with 0.X and 1.0. New way to introduce

1 Introduce dependent modules

  1. <dependency>
  2. <groupId>com.webank.wedatasphere.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. like:
  7. <dependency>
  8. <groupId>com.webank.wedatasphere.linkis</groupId>
  9. <artifactId>linkis-computation-client</artifactId>
  10. <version>1.0.0-RC1</version>
  11. </dependency>

2 Compatible with 0.X Execute method submission

2.1 Java test code

Establish the Java test class UJESClientImplTestJ, the specific interface meaning can be seen in the notes:

  1. package com.webank.wedatasphere.linkis.client.test
  2. import com.webank.wedatasphere.linkis.common.utils.Utils;
  3. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
  5. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;
  6. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  7. import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
  8. import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;
  9. import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction;
  10. import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;
  11. import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
  12. import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;
  13. import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;
  14. import org.apache.commons.io.IOUtils;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.concurrent.TimeUnit;
  18. public class LinkisClientTest {
  19. public static void main(String[] args){
  20. String user = "hadoop";
  21. String executeCode = "show databases;";
  22. // 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder
  23. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  24. .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the linkis server-side gateway, such as http://{ip}:{port}
  25. .connectionTimeout(30000) //connectionTimeOut client connection timeout
  26. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered
  27. .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless
  28. .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent
  29. .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry
  30. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method
  31. .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name
  32. .setDWSVersion("v1").build(); //Linkis background protocol version, the current version is v1
  33. // 2. Get a UJESClient through DWSClientConfig
  34. UJESClient client = new UJESClientImpl(clientConfig);
  35. try {
  36. // 3. Start code execution
  37. System.out.println("user: "+ user + ", code: [" + executeCode + "]");
  38. Map<String, Object> startupMap = new HashMap<String, Object>();
  39. startupMap.put("wds.linkis.yarnqueue", "default"); // Various startup parameters can be stored in startupMap, see linkis management console configuration
  40. JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder()
  41. .setCreator("linkisClient-Test") //creator, the system name of the client requesting linkis, used for system-level isolation
  42. .addExecuteCode(executeCode) //ExecutionCode The code to be executed
  43. .setEngineType((JobExecuteAction.EngineType) JobExecuteAction.EngineType$.MODULE$.HIVE()) // The execution engine type of the linkis that you want to request, such as Spark hive, etc.
  44. .setUser(user) //User, request user; used for user-level multi-tenant isolation
  45. .setStartupParams(startupMap)
  46. .build());
  47. System.out.println("execId: "+ jobExecuteResult.getExecID() + ", taskId:" + jobExecuteResult.taskID());
  48. // 4. Get the execution status of the script
  49. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  50. int sleepTimeMills = 1000;
  51. while(!jobInfoResult.isCompleted()) {
  52. // 5. Get the execution progress of the script
  53. JobProgressResult progress = client.progress(jobExecuteResult);
  54. Utils.sleepQuietly(sleepTimeMills);
  55. jobInfoResult = client.getJobInfo(jobExecuteResult);
  56. }
  57. // 6. Get the job information of the script
  58. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  59. // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)
  60. String resultSet = jobInfo.getResultSetList(client)[0];
  61. // 8. Get a specific result set through a result set information
  62. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
  63. System.out.println("fileContents: "+ fileContents);
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. IOUtils.closeQuietly(client);
  67. }
  68. IOUtils.closeQuietly(client);
  69. }
  70. }

Run the above code to interact with Linkis

2.2 Scala test code

  1. package com.webank.wedatasphere.linkis.client.test
  2. import java.util.concurrent.TimeUnit
  3. import com.webank.wedatasphere.linkis.common.utils.Utils
  4. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  5. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder
  6. import com.webank.wedatasphere.linkis.ujes.client.UJESClient
  7. import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.EngineType
  8. import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}
  9. import org.apache.commons.io.IOUtils
  10. object LinkisClientImplTest extends App {
  11. var executeCode = "show databases;"
  12. var user = "hadoop"
  13. // 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder
  14. val clientConfig = DWSClientConfigBuilder.newBuilder()
  15. .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the Linkis server-side gateway, such as http://{ip}:{port}
  16. .connectionTimeout(30000) //connectionTimeOut client connection timeout
  17. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered
  18. .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless
  19. .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent
  20. .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry
  21. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method
  22. .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name
  23. .setDWSVersion("v1").build() //Linkis backend protocol version, the current version is v1
  24. // 2. Get a UJESClient through DWSClientConfig
  25. val client = UJESClient(clientConfig)
  26. try {
  27. // 3. Start code execution
  28. println("user: "+ user + ", code: [" + executeCode + "]")
  29. val startupMap = new java.util.HashMap[String, Any]()
  30. startupMap.put("wds.linkis.yarnqueue", "default") //Startup parameter configuration
  31. val jobExecuteResult = client.execute(JobExecuteAction.builder()
  32. .setCreator("LinkisClient-Test") //creator, requesting the system name of the Linkis client, used for system-level isolation
  33. .addExecuteCode(executeCode) //ExecutionCode The code to be executed
  34. .setEngineType(EngineType.SPARK) // The execution engine type of Linkis that you want to request, such as Spark hive, etc.
  35. .setStartupParams(startupMap)
  36. .setUser(user).build()) //User, request user; used for user-level multi-tenant isolation
  37. println("execId: "+ jobExecuteResult.getExecID + ", taskId:" + jobExecuteResult.taskID)
  38. // 4. Get the execution status of the script
  39. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  40. val sleepTimeMills: Int = 1000
  41. while (!jobInfoResult.isCompleted) {
  42. // 5. Get the execution progress of the script
  43. val progress = client.progress(jobExecuteResult)
  44. val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
  45. println("progress: "+ progress.getProgress + ", progressInfo:" + progressInfo)
  46. Utils.sleepQuietly(sleepTimeMills)
  47. jobInfoResult = client.getJobInfo(jobExecuteResult)
  48. }
  49. if (!jobInfoResult.isSucceed) {
  50. println("Failed to execute job: "+ jobInfoResult.getMessage)
  51. throw new Exception(jobInfoResult.getMessage)
  52. }
  53. // 6. Get the job information of the script
  54. val jobInfo = client.getJobInfo(jobExecuteResult)
  55. // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)
  56. val resultSetList = jobInfoResult.getResultSetList(client)
  57. println("All result set list:")
  58. resultSetList.foreach(println)
  59. val oneResultSet = jobInfo.getResultSetList(client).head
  60. // 8. Get a specific result set through a result set information
  61. val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
  62. println("First fileContents: ")
  63. println(fileContents)
  64. } catch {
  65. case e: Exception => {
  66. e.printStackTrace()
  67. }
  68. }
  69. IOUtils.closeQuietly(client)
  70. }

3 1.0 New Submit Method

1.0 adds the client.submit method, which is used to interface with the new task execution interface of 1.0, and supports the input of Label and other parameters

3.1 Java Test Class

  1. package com.webank.wedatasphere.linkis.client.test;
  2. import com.webank.wedatasphere.linkis.common.utils.Utils;
  3. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;
  5. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant;
  7. import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant;
  8. import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
  9. import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;
  10. import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction;
  11. import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;
  12. import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
  13. import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;
  14. import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;
  15. import org.apache.commons.io.IOUtils;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.concurrent.TimeUnit;
  19. public class JavaClientTest {
  20. public static void main(String[] args){
  21. String user = "hadoop";
  22. String executeCode = "show tables";
  23. // 1. Configure ClientBuilder and get ClientConfig
  24. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  25. .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the linkis server-side gateway, such as http://{ip}:{port}
  26. .connectionTimeout(30000) //connectionTimeOut client connection timeout
  27. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered
  28. .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless
  29. .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent
  30. .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry
  31. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method
  32. .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name
  33. .setDWSVersion("v1").build(); //Linkis background protocol version, the current version is v1
  34. // 2. Get a UJESClient through DWSClientConfig
  35. UJESClient client = new UJESClientImpl(clientConfig);
  36. try {
  37. // 3. Start code execution
  38. System.out.println("user: "+ user + ", code: [" + executeCode + "]");
  39. Map<String, Object> startupMap = new HashMap<String, Object>();
  40. // Various startup parameters can be stored in startupMap, see linkis management console configuration
  41. startupMap.put("wds.linkis.yarnqueue", "q02");
  42. //Specify Label
  43. Map<String, Object> labels = new HashMap<String, Object>();
  44. //Add the label that this execution depends on: EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel
  45. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1");
  46. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");
  47. labels.put(LabelKeyConstant.ENGINE_RUN_TYPE_KEY, "hql");
  48. //Specify source
  49. Map<String, Object> source = new HashMap<String, Object>();
  50. source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test");
  51. JobExecuteResult jobExecuteResult = client.submit( JobSubmitAction.builder()
  52. .addExecuteCode(executeCode)
  53. .setStartupParams(startupMap)
  54. .setUser(user)//Job submit user
  55. .addExecuteUser(user)//The actual execution user
  56. .setLabels(labels)
  57. .setSource(source)
  58. .build()
  59. );
  60. System.out.println("execId: "+ jobExecuteResult.getExecID() + ", taskId:" + jobExecuteResult.taskID());
  61. // 4. Get the execution status of the script
  62. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  63. int sleepTimeMills = 1000;
  64. while(!jobInfoResult.isCompleted()) {
  65. // 5. Get the execution progress of the script
  66. JobProgressResult progress = client.progress(jobExecuteResult);
  67. Utils.sleepQuietly(sleepTimeMills);
  68. jobInfoResult = client.getJobInfo(jobExecuteResult);
  69. }
  70. // 6. Get the job information of the script
  71. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  72. // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)
  73. String resultSet = jobInfo.getResultSetList(client)[0];
  74. // 8. Get a specific result set through a result set information
  75. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
  76. System.out.println("fileContents: "+ fileContents);
  77. } catch (Exception e) {
  78. e.printStackTrace();
  79. IOUtils.closeQuietly(client);
  80. }
  81. IOUtils.closeQuietly(client);
  82. }
  83. }

3.2 Scala Test Class

  1. package com.webank.wedatasphere.linkis.client.test
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import com.webank.wedatasphere.linkis.common.utils.Utils
  5. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  6. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder
  7. import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant
  8. import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
  9. import com.webank.wedatasphere.linkis.ujes.client.UJESClient
  10. import com.webank.wedatasphere.linkis.ujes.client.request.{JobSubmitAction, ResultSetAction}
  11. import org.apache.commons.io.IOUtils
  12. object ScalaClientTest {
  13. def main(args: Array[String]): Unit = {
  14. val executeCode = "show tables"
  15. val user = "hadoop"
  16. // 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder
  17. val clientConfig = DWSClientConfigBuilder.newBuilder()
  18. .addServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the Linkis server-side gateway, such as http://{ip}:{port}
  19. .connectionTimeout(30000) //connectionTimeOut client connection timeout
  20. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered
  21. .loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless
  22. .maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent
  23. .retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry
  24. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method
  25. .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name
  26. .setDWSVersion("v1").build() //Linkis backend protocol version, the current version is v1
  27. // 2. Get a UJESClient through DWSClientConfig
  28. val client = UJESClient(clientConfig)
  29. try {
  30. // 3. Start code execution
  31. println("user: "+ user + ", code: [" + executeCode + "]")
  32. val startupMap = new java.util.HashMap[String, Any]()
  33. startupMap.put("wds.linkis.yarnqueue", "q02") //Startup parameter configuration
  34. //Specify Label
  35. val labels: util.Map[String, Any] = new util.HashMap[String, Any]
  36. //Add the label that this execution depends on, such as engineLabel
  37. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1")
  38. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE")
  39. labels.put(LabelKeyConstant.ENGINE_RUN_TYPE_KEY, "hql")
  40. //Specify source
  41. val source: util.Map[String, Any] = new util.HashMap[String, Any]
  42. source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test")
  43. val jobExecuteResult = client.submit(JobSubmitAction.builder
  44. .addExecuteCode(executeCode)
  45. .setStartupParams(startupMap)
  46. .setUser(user) //Job submit user
  47. .addExecuteUser(user) //The actual execution user
  48. .setLabels(labels)
  49. .setSource(source)
  50. .build) //User, requesting user; used for user-level multi-tenant isolation
  51. println("execId: "+ jobExecuteResult.getExecID + ", taskId:" + jobExecuteResult.taskID)
  52. // 4. Get the execution status of the script
  53. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  54. val sleepTimeMills: Int = 1000
  55. while (!jobInfoResult.isCompleted) {
  56. // 5. Get the execution progress of the script
  57. val progress = client.progress(jobExecuteResult)
  58. val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
  59. println("progress: "+ progress.getProgress + ", progressInfo:" + progressInfo)
  60. Utils.sleepQuietly(sleepTimeMills)
  61. jobInfoResult = client.getJobInfo(jobExecuteResult)
  62. }
  63. if (!jobInfoResult.isSucceed) {
  64. println("Failed to execute job: "+ jobInfoResult.getMessage)
  65. throw new Exception(jobInfoResult.getMessage)
  66. }
  67. // 6. Get the job information of the script
  68. val jobInfo = client.getJobInfo(jobExecuteResult)
  69. // 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)
  70. val resultSetList = jobInfoResult.getResultSetList(client)
  71. println("All result set list:")
  72. resultSetList.foreach(println)
  73. val oneResultSet = jobInfo.getResultSetList(client).head
  74. // 8. Get a specific result set through a result set information
  75. val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
  76. println("First fileContents: ")
  77. println(fileContents)
  78. } catch {
  79. case e: Exception => {
  80. e.printStackTrace()
  81. }
  82. }
  83. IOUtils.closeQuietly(client)
  84. }
  85. }