1.0 SDK的使用

Linkis 提供了方便的JAVA和SCALA调用的接口,只需要引入linkis-computation-client的模块就可以进行使用,1.0后新增支持带Label提交的方式,下面将对兼容0.X的方式和1.0新增的方式进行介绍

1 引入依赖模块

  1. <dependency>
  2. <groupId>com.webank.wedatasphere.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. 如:
  7. <dependency>
  8. <groupId>com.webank.wedatasphere.linkis</groupId>
  9. <artifactId>linkis-computation-client</artifactId>
  10. <version>1.0.0-RC1</version>
  11. </dependency>

2 兼容0.X的Execute方法提交

2.1 Java测试代码

建立Java的测试类UJESClientImplTestJ,具体接口含义可以见注释:

  1. package com.webank.wedatasphere.linkis.client.test
  2. import com.webank.wedatasphere.linkis.common.utils.Utils;
  3. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
  5. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;
  6. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  7. import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
  8. import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;
  9. import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction;
  10. import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;
  11. import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
  12. import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;
  13. import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;
  14. import org.apache.commons.io.IOUtils;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.concurrent.TimeUnit;
  18. public class LinkisClientTest {
  19. public static void main(String[] args){
  20. String user = "hadoop";
  21. String executeCode = "show databases;";
  22. // 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
  23. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  24. .addServerUrl("http://${ip}:${port}") //指定ServerUrl,linkis服务器端网关的地址,如http://{ip}:{port}
  25. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
  26. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
  27. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
  28. .maxConnectionSize(5) //指定最大连接数,即最大并发数
  29. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
  30. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
  31. .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //认证key,一般为用户名; 认证value,一般为用户名对应的密码
  32. .setDWSVersion("v1").build(); //linkis后台协议的版本,当前版本为v1
  33. // 2. 通过DWSClientConfig获取一个UJESClient
  34. UJESClient client = new UJESClientImpl(clientConfig);
  35. try {
  36. // 3. 开始执行代码
  37. System.out.println("user : " + user + ", code : [" + executeCode + "]");
  38. Map<String, Object> startupMap = new HashMap<String, Object>();
  39. startupMap.put("wds.linkis.yarnqueue", "default"); // 在startupMap可以存放多种启动参数,参见linkis管理台配置
  40. JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder()
  41. .setCreator("linkisClient-Test") //creator,请求linkis的客户端的系统名,用于做系统级隔离
  42. .addExecuteCode(executeCode) //ExecutionCode 请求执行的代码
  43. .setEngineType((JobExecuteAction.EngineType) JobExecuteAction.EngineType$.MODULE$.HIVE()) // 希望请求的linkis的执行引擎类型,如Spark hive等
  44. .setUser(user) //User,请求用户;用于做用户级多租户隔离
  45. .setStartupParams(startupMap)
  46. .build());
  47. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  48. // 4. 获取脚本的执行状态
  49. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  50. int sleepTimeMills = 1000;
  51. while(!jobInfoResult.isCompleted()) {
  52. // 5. 获取脚本的执行进度
  53. JobProgressResult progress = client.progress(jobExecuteResult);
  54. Utils.sleepQuietly(sleepTimeMills);
  55. jobInfoResult = client.getJobInfo(jobExecuteResult);
  56. }
  57. // 6. 获取脚本的Job信息
  58. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  59. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
  60. String resultSet = jobInfo.getResultSetList(client)[0];
  61. // 8. 通过一个结果集信息,获取具体的结果集
  62. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
  63. System.out.println("fileContents: " + fileContents);
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. IOUtils.closeQuietly(client);
  67. }
  68. IOUtils.closeQuietly(client);
  69. }
  70. }

运行上述的代码即可以和Linkis进行交互

2.2 Scala测试代码

  1. package com.webank.wedatasphere.linkis.client.test
  2. import java.util.concurrent.TimeUnit
  3. import com.webank.wedatasphere.linkis.common.utils.Utils
  4. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  5. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder
  6. import com.webank.wedatasphere.linkis.ujes.client.UJESClient
  7. import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.EngineType
  8. import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}
  9. import org.apache.commons.io.IOUtils
  10. object LinkisClientImplTest extends App {
  11. var executeCode = "show databases;"
  12. var user = "hadoop"
  13. // 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
  14. val clientConfig = DWSClientConfigBuilder.newBuilder()
  15. .addServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服务器端网关的地址,如http://{ip}:{port}
  16. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
  17. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
  18. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
  19. .maxConnectionSize(5) //指定最大连接数,即最大并发数
  20. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
  21. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
  22. .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //认证key,一般为用户名; 认证value,一般为用户名对应的密码
  23. .setDWSVersion("v1").build() //Linkis后台协议的版本,当前版本为v1
  24. // 2. 通过DWSClientConfig获取一个UJESClient
  25. val client = UJESClient(clientConfig)
  26. try {
  27. // 3. 开始执行代码
  28. println("user : " + user + ", code : [" + executeCode + "]")
  29. val startupMap = new java.util.HashMap[String, Any]()
  30. startupMap.put("wds.linkis.yarnqueue", "default") //启动参数配置
  31. val jobExecuteResult = client.execute(JobExecuteAction.builder()
  32. .setCreator("LinkisClient-Test") //creator,请求Linkis的客户端的系统名,用于做系统级隔离
  33. .addExecuteCode(executeCode) //ExecutionCode 请求执行的代码
  34. .setEngineType(EngineType.SPARK) // 希望请求的Linkis的执行引擎类型,如Spark hive等
  35. .setStartupParams(startupMap)
  36. .setUser(user).build()) //User,请求用户;用于做用户级多租户隔离
  37. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  38. // 4. 获取脚本的执行状态
  39. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  40. val sleepTimeMills : Int = 1000
  41. while (!jobInfoResult.isCompleted) {
  42. // 5. 获取脚本的执行进度
  43. val progress = client.progress(jobExecuteResult)
  44. val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
  45. println("progress: " + progress.getProgress + ", progressInfo: " + progressInfo)
  46. Utils.sleepQuietly(sleepTimeMills)
  47. jobInfoResult = client.getJobInfo(jobExecuteResult)
  48. }
  49. if (!jobInfoResult.isSucceed) {
  50. println("Failed to execute job: " + jobInfoResult.getMessage)
  51. throw new Exception(jobInfoResult.getMessage)
  52. }
  53. // 6. 获取脚本的Job信息
  54. val jobInfo = client.getJobInfo(jobExecuteResult)
  55. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
  56. val resultSetList = jobInfoResult.getResultSetList(client)
  57. println("All result set list:")
  58. resultSetList.foreach(println)
  59. val oneResultSet = jobInfo.getResultSetList(client).head
  60. // 8. 通过一个结果集信息,获取具体的结果集
  61. val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
  62. println("First fileContents: ")
  63. println(fileContents)
  64. } catch {
  65. case e: Exception => {
  66. e.printStackTrace()
  67. }
  68. }
  69. IOUtils.closeQuietly(client)
  70. }

3 1.0新增的Submit方式

1.0 新增了client.submit方法,用于对接1.0新的任务执行接口,支持传入Label等参数

3.1 Java测试类

  1. package com.webank.wedatasphere.linkis.client.test;
  2. import com.webank.wedatasphere.linkis.common.utils.Utils;
  3. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;
  5. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant;
  7. import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant;
  8. import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
  9. import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;
  10. import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction;
  11. import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;
  12. import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
  13. import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;
  14. import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;
  15. import org.apache.commons.io.IOUtils;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.concurrent.TimeUnit;
  19. public class JavaClientTest {
  20. public static void main(String[] args){
  21. String user = "hadoop";
  22. String executeCode = "show tables";
  23. // 1. 配置ClientBuilder,获取ClientConfig
  24. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  25. .addServerUrl("http://${ip}:${port}") //指定ServerUrl,linkis服务器端网关的地址,如http://{ip}:{port}
  26. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
  27. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
  28. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
  29. .maxConnectionSize(5) //指定最大连接数,即最大并发数
  30. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
  31. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
  32. .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //认证key,一般为用户名; 认证value,一般为用户名对应的密码
  33. .setDWSVersion("v1").build(); //linkis后台协议的版本,当前版本为v1
  34. // 2. 通过DWSClientConfig获取一个UJESClient
  35. UJESClient client = new UJESClientImpl(clientConfig);
  36. try {
  37. // 3. 开始执行代码
  38. System.out.println("user : " + user + ", code : [" + executeCode + "]");
  39. Map<String, Object> startupMap = new HashMap<String, Object>();
  40. // 在startupMap可以存放多种启动参数,参见linkis管理台配置
  41. startupMap.put("wds.linkis.yarnqueue", "q02");
  42. //指定Label
  43. Map<String, Object> labels = new HashMap<String, Object>();
  44. //添加本次执行所依赖的的标签:EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel
  45. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1");
  46. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");
  47. labels.put(LabelKeyConstant.ENGINE_RUN_TYPE_KEY, "hql");
  48. //指定source
  49. Map<String, Object> source = new HashMap<String, Object>();
  50. source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test");
  51. JobExecuteResult jobExecuteResult = client.submit( JobSubmitAction.builder()
  52. .addExecuteCode(executeCode)
  53. .setStartupParams(startupMap)
  54. .setUser(user)//Job提交用户
  55. .addExecuteUser(user)//实际执行用户
  56. .setLabels(labels)
  57. .setSource(source)
  58. .build()
  59. );
  60. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  61. // 4. 获取脚本的执行状态
  62. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  63. int sleepTimeMills = 1000;
  64. while(!jobInfoResult.isCompleted()) {
  65. // 5. 获取脚本的执行进度
  66. JobProgressResult progress = client.progress(jobExecuteResult);
  67. Utils.sleepQuietly(sleepTimeMills);
  68. jobInfoResult = client.getJobInfo(jobExecuteResult);
  69. }
  70. // 6. 获取脚本的Job信息
  71. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  72. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
  73. String resultSet = jobInfo.getResultSetList(client)[0];
  74. // 8. 通过一个结果集信息,获取具体的结果集
  75. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
  76. System.out.println("fileContents: " + fileContents);
  77. } catch (Exception e) {
  78. e.printStackTrace();
  79. IOUtils.closeQuietly(client);
  80. }
  81. IOUtils.closeQuietly(client);
  82. }
  83. }

3.2 Scala 测试类

  1. package com.webank.wedatasphere.linkis.client.test
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import com.webank.wedatasphere.linkis.common.utils.Utils
  5. import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  6. import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder
  7. import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant
  8. import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
  9. import com.webank.wedatasphere.linkis.ujes.client.UJESClient
  10. import com.webank.wedatasphere.linkis.ujes.client.request.{JobSubmitAction, ResultSetAction}
  11. import org.apache.commons.io.IOUtils
  12. object ScalaClientTest {
  13. def main(args: Array[String]): Unit = {
  14. val executeCode = "show tables"
  15. val user = "hadoop"
  16. // 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
  17. val clientConfig = DWSClientConfigBuilder.newBuilder()
  18. .addServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服务器端网关的地址,如http://{ip}:{port}
  19. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
  20. .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
  21. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
  22. .maxConnectionSize(5) //指定最大连接数,即最大并发数
  23. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
  24. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
  25. .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //认证key,一般为用户名; 认证value,一般为用户名对应的密码
  26. .setDWSVersion("v1").build() //Linkis后台协议的版本,当前版本为v1
  27. // 2. 通过DWSClientConfig获取一个UJESClient
  28. val client = UJESClient(clientConfig)
  29. try {
  30. // 3. 开始执行代码
  31. println("user : " + user + ", code : [" + executeCode + "]")
  32. val startupMap = new java.util.HashMap[String, Any]()
  33. startupMap.put("wds.linkis.yarnqueue", "q02") //启动参数配置
  34. //指定Label
  35. val labels: util.Map[String, Any] = new util.HashMap[String, Any]
  36. //添加本次执行所依赖的的标签,如engineLabel
  37. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "hive-1.2.1")
  38. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE")
  39. labels.put(LabelKeyConstant.ENGINE_RUN_TYPE_KEY, "hql")
  40. //指定source
  41. val source: util.Map[String, Any] = new util.HashMap[String, Any]
  42. source.put(TaskConstant.SCRIPTPATH, "LinkisClient-test")
  43. val jobExecuteResult = client.submit(JobSubmitAction.builder
  44. .addExecuteCode(executeCode)
  45. .setStartupParams(startupMap)
  46. .setUser(user) //Job提交用户
  47. .addExecuteUser(user) //实际执行用户
  48. .setLabels(labels)
  49. .setSource(source)
  50. .build) //User,请求用户;用于做用户级多租户隔离
  51. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  52. // 4. 获取脚本的执行状态
  53. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  54. val sleepTimeMills : Int = 1000
  55. while (!jobInfoResult.isCompleted) {
  56. // 5. 获取脚本的执行进度
  57. val progress = client.progress(jobExecuteResult)
  58. val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
  59. println("progress: " + progress.getProgress + ", progressInfo: " + progressInfo)
  60. Utils.sleepQuietly(sleepTimeMills)
  61. jobInfoResult = client.getJobInfo(jobExecuteResult)
  62. }
  63. if (!jobInfoResult.isSucceed) {
  64. println("Failed to execute job: " + jobInfoResult.getMessage)
  65. throw new Exception(jobInfoResult.getMessage)
  66. }
  67. // 6. 获取脚本的Job信息
  68. val jobInfo = client.getJobInfo(jobExecuteResult)
  69. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
  70. val resultSetList = jobInfoResult.getResultSetList(client)
  71. println("All result set list:")
  72. resultSetList.foreach(println)
  73. val oneResultSet = jobInfo.getResultSetList(client).head
  74. // 8. 通过一个结果集信息,获取具体的结果集
  75. val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
  76. println("First fileContents: ")
  77. println(fileContents)
  78. } catch {
  79. case e: Exception => {
  80. e.printStackTrace()
  81. }
  82. }
  83. IOUtils.closeQuietly(client)
  84. }
  85. }