数据源 Client SDK 的使用

Linkis DataSource 提供了方便的JAVA和SCALA调用的接口,只需要引入linkis-datasource-client的模块就可以进行使用,

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-datasource-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. 如:
  7. <dependency>
  8. <groupId>org.apache.linkis</groupId>
  9. <artifactId>linkis-datasource-client</artifactId>
  10. <version>1.1.0</version>
  11. </dependency>

建立Scala的测试类LinkisDataSourceClientTest,具体接口含义可以见注释:

  1. import com.fasterxml.jackson.databind.ObjectMapper
  2. import org.apache.linkis.common.utils.JsonUtils
  3. import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}
  4. import org.apache.linkis.datasource.client.request.{CreateDataSourceAction, GetAllDataSourceTypesAction, MetadataGetDatabasesAction, UpdateDataSourceParameterAction}
  5. import org.apache.linkis.datasourcemanager.common.domain.{DataSource, DataSourceType}
  6. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  7. import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
  8. import java.io.StringWriter
  9. import java.util
  10. import java.util.concurrent.TimeUnit
  11. object LinkisDataSourceClientTest {
  12. def main(args: Array[String]): Unit = {
  13. val clientConfig =DWSClientConfigBuilder.newBuilder
  14. .addServerUrl("http://127.0.0.1:9001") //set linkis-mg-gateway url: http://{ip}:{port}
  15. .connectionTimeout(30000) //connection timtout
  16. .discoveryEnabled(false) //disable discovery
  17. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  18. .loadbalancerEnabled(true) // enable loadbalance
  19. .maxConnectionSize(5) // set max Connection
  20. .retryEnabled(false) // set retry
  21. .readTimeout(30000) //set read timeout
  22. .setAuthenticationStrategy(new StaticAuthenticationStrategy) //AuthenticationStrategy Linkis authen suppory static and Token
  23. .setAuthTokenKey("hadoop") // set submit user
  24. .setAuthTokenValue("xxx") // set passwd or token
  25. .setDWSVersion("v1") //linkis rest version v1
  26. .build
  27. //init datasource remote client
  28. val dataSourceClient = new LinkisDataSourceRemoteClient(clientConfig)
  29. //init metadata remote client
  30. val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig)
  31. //get all datasource type
  32. testGetAllDataSourceTypes(dataSourceClient)
  33. //create kafka datasource
  34. testCreateDataSourceForKafka(dataSourceClient)
  35. //create es datasource
  36. testCreateDataSourceForEs(dataSourceClient)
  37. //update datasource parameter for kafka
  38. testUpdateDataSourceParameterForKafka(dataSourceClient)
  39. //update datasource parameter for es
  40. testUpdateDataSourceParameterForEs(dataSourceClient)
  41. //get hive metadata database list
  42. testMetadataGetDatabases(metaDataClient)
  43. }
  44. def testGetAllDataSourceTypes(client:LinkisDataSourceRemoteClient): Unit ={
  45. val getAllDataSourceTypesResult = client.getAllDataSourceTypes(GetAllDataSourceTypesAction.builder().setUser("hadoop").build()).getAllDataSourceType
  46. System.out.println(getAllDataSourceTypesResult)
  47. }
  48. def testCreateDataSourceForKafka(client:LinkisDataSourceRemoteClient): Unit ={
  49. val dataSource = new DataSource();
  50. val dataSourceType = new DataSourceType
  51. dataSourceType.setName("kafka")
  52. dataSourceType.setId("2")
  53. dataSourceType.setLayers(2)
  54. dataSourceType.setClassifier("消息队列")
  55. dataSourceType.setDescription("kafka")
  56. dataSource.setDataSourceType(dataSourceType)
  57. dataSource.setDataSourceName("kafka-test")
  58. dataSource.setCreateSystem("client")
  59. dataSource.setDataSourceTypeId(2l);
  60. val dsJsonWriter = new StringWriter
  61. val mapper = new ObjectMapper
  62. JsonUtils.jackson.writeValue(dsJsonWriter, dataSource)
  63. val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass)
  64. val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id
  65. System.out.println(id)
  66. }
  67. def testCreateDataSourceForEs(client:LinkisDataSourceRemoteClient): Unit ={
  68. val dataSource = new DataSource();
  69. dataSource.setDataSourceName("es-test")
  70. dataSource.setCreateSystem("client")
  71. dataSource.setDataSourceTypeId(7l);
  72. val dsJsonWriter = new StringWriter
  73. val mapper = new ObjectMapper
  74. JsonUtils.jackson.writeValue(dsJsonWriter, dataSource)
  75. val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass)
  76. val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id
  77. System.out.println(id)
  78. }
  79. def testUpdateDataSourceParameterForKafka(client:LinkisDataSourceRemoteClient): Unit ={
  80. val params = new util.HashMap[String,Any]()
  81. val connParams = new util.HashMap[String,Any]()
  82. connParams.put("brokers","172.24.2.232:9092")
  83. params.put("connectParams",connParams)
  84. params.put("comment","kafka data source")
  85. client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("7").addRequestPayloads(params).build())
  86. }
  87. def testUpdateDataSourceParameterForEs(client:LinkisDataSourceRemoteClient): Unit ={
  88. val params = new util.HashMap[String,Any]()
  89. val connParams = new util.HashMap[String,Any]()
  90. val elasticUrls = new util.ArrayList[String]()
  91. elasticUrls.add("http://172.24.2.231:9200")
  92. connParams.put("elasticUrls",elasticUrls)
  93. params.put("connectParams",connParams)
  94. params.put("comment","es data source")
  95. client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("8").addRequestPayloads(params).build())
  96. }
  97. def testMetadataGetDatabases(client:LinkisMetaDataRemoteClient): Unit ={
  98. client.getDatabases(MetadataGetDatabasesAction.builder().setUser("hadoop").setDataSourceId(9l).setUser("hadoop").setSystem("client").build()).getDbs
  99. }
  100. }