DataSource

Introduce how to use the new feature function data source of version 1.1.0

1. Data source function introduction

1.1 Concept

  • Data source: We call database services that can provide data storage as databases, such as mysql/hive/kafka. The data source defines the configuration information for connecting to the actual database. The configuration information is mainly the address required for connection and user authentication information , connection parameters, etc. Stored in the linkisps_dm_datasource* table related to the linkis database
  • Metadata: single refers to the metadata of the database, which refers to the data that defines the data structure and the data of various object structures of the database. For example, the database name, table name, column name, field length, type and other information data in the database.

1.2 Three main modules

linkis-datasource-client Client module, DataSourceRemoteClient for basic management of user data sources, and MetaDataRemoteClient for metadata query operations.

linkis-datasource-manager-server Data source management module, service name ps-data-source-manager. Perform basic management of data sources, and provide http interfaces such as adding, querying, modifying, and connection testing of external data sources. The rpc service is provided internally, which is convenient for the data element management module to call through rpc to query the necessary information needed to establish a connection to the database.

  • http interface documentation
  • http interface class org.apache.linkis.metadatamanager.server.restful
  • rpc interface class org.apache.linkis.metadatamanager.server.receiver

linkis-metedata-manager-server Data element management module, service name ps-metadatamanager. It provides the basic query function of the data metadata of the database, provides the http interface externally, and provides the rpc service internally, which is convenient for the data source management module to perform the connection test of the data source through the rpc call.

  • http interface documentation
  • http interface class org.apache.linkis.datasourcemanager.core.restful
  • rpc interface class org.apache.linkis.datasourcemanager.core.receivers

1.3 Processing logic

1.3.1 LinkisDataSourceRemoteClient

The functional structure diagram is as follows: datasource

  • The LinkisDataSourceRemoteClient client assembles the http request according to the request parameters,
  • HTTP request sent to linkis-ps-data-source-manager
  • linkis-ps-data-source-manager will perform basic parameter verification, some interfaces can only be operated by the administrator role
  • linkis-ps-data-source-manager performs basic data operations with the database
  • The data source test connection interface provided by linkis-ps-data-source-manager internally uses rpc to call the ps-metadatamanager method for connection test
  • The data result after the http request is processed will be mapped and converted from the result set to the entity class by annotating the DWSHttpMessageResult function

LinkisDataSourceRemoteClient interface

  • GetAllDataSourceTypesResult getAllDataSourceTypes(GetAllDataSourceTypesAction) Query all data source types
  • QueryDataSourceEnvResult queryDataSourceEnv(QueryDataSourceEnvAction) Query the cluster configuration information that can be used by the data source
  • GetInfoByDataSourceIdResult getInfoByDataSourceId(GetInfoByDataSourceIdAction): query data source information by data source id
  • QueryDataSourceResult queryDataSource(QueryDataSourceAction) query data source information
  • GetConnectParamsByDataSourceIdResult getConnectParams(GetConnectParamsByDataSourceIdAction) Get connection configuration parameters
  • CreateDataSourceResult createDataSource(CreateDataSourceAction) to create a data source
  • DataSourceTestConnectResult getDataSourceTestConnect(DataSourceTestConnectAction) to test whether the data source can be connected normally
  • DeleteDataSourceResult deleteDataSource(DeleteDataSourceAction) deletes the data source
  • ExpireDataSourceResult expireDataSource(ExpireDataSourceAction) sets the data source to expired state
  • GetDataSourceVersionsResult getDataSourceVersions(GetDataSourceVersionsAction) Query the version list of the data source configuration
  • PublishDataSourceVersionResult publishDataSourceVersion(PublishDataSourceVersionAction) publishes the data source configuration version
  • UpdateDataSourceResult updateDataSource(UpdateDataSourceAction) to update the data source
  • UpdateDataSourceParameterResult updateDataSourceParameter(UpdateDataSourceParameterAction) Update data source configuration parameters
  • GetKeyTypeDatasourceResult getKeyDefinitionsByType(GetKeyTypeDatasourceAction) Query the configuration properties required by a data source type

1.3.2 LinkisMetaDataRemoteClient

The functional structure diagram is as follows: metadata

  • LinkisMetaDataRemoteClient client, according to the request parameters, assemble the http request,
  • HTTP request sent to ps-metadatamanager
  • ps-metadatamanager will perform basic parameter verification,
  • The request will send an RPC request to linkis-ps-data-source-manager based on the parameter datasourceId to obtain the type of the data source, connection parameters such as username and password, etc.
  • After getting the information required for the connection, load the lib package in the corresponding directory according to the data source type, and call the corresponding function method through the reflection mechanism to query the metadata information
  • The data result after the http request is processed will be mapped and converted from the result set to the entity class by annotating the DWSHttpMessageResult function

LinkisMetaDataRemoteClient interface

  • MetadataGetDatabasesResult getDatabases(MetadataGetDatabasesAction) query database list
  • MetadataGetTablesResult getTables(MetadataGetTablesAction) query table data
  • MetadataGetTablePropsResult getTableProps(MetadataGetTablePropsAction)
  • MetadataGetPartitionsResult getPartitions(MetadataGetPartitionsAction) query partition table
  • MetadataGetColumnsResult getColumns(MetadataGetColumnsAction) query data table fields

1.3 Source module directory structure

  1. linkis-public-enhancements/linkis-datasource
  2. ├── linkis-datasource-client //client code
  3. ├── linkis-datasource-manager //Datasource management module
  4. ├── common //Data source management common module
  5. └── server //Data source management service module
  6. ├── linkis-metadata //Module existing in the old version, reserved
  7. ├── linkis-metadata-manager //Data Metadata Management Module
  8. ├── common //Data element management common module
  9. ├── server //Data element management service module
  10. └── service //Supported data sources
  11. ├── elasticsearch
  12. ├── hive
  13. ├── kafka
  14. └── mysql

1.4 Installation package directory structure

  1. /lib/linkis-public-enhancements/
  2. ├── linkis-ps-data-source-manager
  3. ├── linkis-ps-metadatamanager
  4. └── service
  5. ├── elasticsearch
  6. ├── hive
  7. ├── kafka
  8. └── mysql

wds.linkis.server.mdm.service.lib.dir controls the classpath loaded during reflection calls. The default value of the parameter is /lib/linkis-public-enhancements/linkis-ps-metadatamanager/service

1.5 Configuration Parameters

See Tuning and Troubleshooting>Parameter List#datasourceConfiguration Parameters

2. Enable data source function

In the startup script of linkis, the two services related to the data source (ps-data-source-manager, ps-metadatamanager) will not be started by default. If you want to use the data source service, you can enable it in the following ways: Modify export ENABLE_METADATA_MANAGER=true in $LINKIS_CONF_DIR/linkis-env.sh to true. When the service is started and stopped through linkis-start-all.sh/linkis-stop-all.sh, the data source service will be started and stopped.

Check whether the service starts normally through the eureka page

datasource eureka

DataSource - 图4note
    1. Management of linkis The web version needs to be upgraded to version 1.1.0 to use the data source management page function on the linkis console.
    1. At present, there are jar packages of mysql/hive/kafak/elasticsearch in the data source, but the kafak/elasticsearch data source has not been strictly tested, and the complete availability of functions is not guaranteed.

3. Use of data sources

The use of data sources is divided into three steps:

  • step 1. Create data source/configure connection parameters
  • step 2. Publish the data source and select the connection configuration version to use
  • step 3. Data source usage, query metadata information , hive/kafka/elasticsearch configuration is associated with the corresponding cluster environment configuration.

3.1 Mysql data source

3.1.1 Created through the management console

You can only create configuration data sources, and test whether the data sources can be connected normally, and cannot directly query metadata

Data Source Management > New Data Source > Select MySQL Type

Enter relevant configuration information

create mysql

After the entry is successful, you can pass the connection test to verify whether the connection can be made normally

DataSource - 图6note
  • The system to which the data source created through the management console belongs is Linkis
  • After the creation is successful, it needs to be published (switching and selecting the configuration parameter version when publishing) before it can be used normally

Publishing of the configuration (using that configuration for the connection to the data source):

Click on the version and then pop up the page to select the appropriate configuration to publish

publish

3.1.2 Using the client

scala code example:

  1. package org.apache.linkis.datasource.client
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import org.apache.linkis.common.utils.JsonUtils
  5. import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}
  6. import org.apache.linkis.datasource.client.request._
  7. import org.apache.linkis.datasource.client.response._
  8. import org.apache.linkis.datasourcemanager.common.domain.DataSource
  9. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  10. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  11. import org.junit.jupiter.api.{Disabled, Test}
  12. object TestMysqlClient {
  13. val gatewayUrl = "http://127.0.0.1:9001"
  14. val clientConfig = DWSClientConfigBuilder.newBuilder
  15. .addServerUrl(gatewayUrl)
  16. .connectionTimeout(30000)
  17. .discoveryEnabled(false)
  18. .discoveryFrequency(1, TimeUnit.MINUTES)
  19. .loadbalancerEnabled(true)
  20. .maxConnectionSize(1)
  21. .retryEnabled(false)
  22. .readTimeout(30000)
  23. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  24. .setAuthTokenKey("hadoop")
  25. .setAuthTokenValue("xxxxx")
  26. .setDWSVersion("v1")
  27. val dataSourceclient = new LinkisDataSourceRemoteClient(clientConfig.build())
  28. val clientConfig2 = DWSClientConfigBuilder.newBuilder
  29. .addServerUrl(gatewayUrl)
  30. .connectionTimeout(30000)
  31. .discoveryEnabled(false)
  32. .discoveryFrequency(1, TimeUnit.MINUTES)
  33. .loadbalancerEnabled(true)
  34. .maxConnectionSize(1)
  35. .retryEnabled(false)
  36. .readTimeout(30000)
  37. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  38. .setAuthTokenKey("hadoop")
  39. .setAuthTokenValue("xxxxx")
  40. .setDWSVersion("v1")
  41. val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig2.build())
  42. @Test
  43. @Disabled
  44. def testCreateDataSourceMysql: Unit = {
  45. val user = "hadoop"
  46. val system = "Linkis"
  47. //create data source
  48. val dataSource = new DataSource();
  49. dataSource.setDataSourceName("for-mysql-test")
  50. dataSource.setDataSourceDesc("this is for mysql test")
  51. dataSource.setCreateSystem(system)
  52. dataSource.setDataSourceTypeId(1L)
  53. val map = JsonUtils.jackson.readValue(JsonUtils.jackson.writeValueAsString(dataSource), new util.HashMap[String, Any]().getClass)
  54. val createDataSourceAction: CreateDataSourceAction = CreateDataSourceAction.builder()
  55. .setUser(user)
  56. .addRequestPayloads(map)
  57. .build()
  58. val createDataSourceResult: CreateDataSourceResult = dataSourceclient.createDataSource(createDataSourceAction)
  59. val dataSourceId = createDataSourceResult.getInsertId
  60. // set connection parameters
  61. val params = new util.HashMap[String, Any]
  62. val connectParams = new util.HashMap[String, Any]
  63. connectParams.put("host", "127.0.0.1")
  64. connectParams.put("port", "36000")
  65. connectParams.put("username", "db username")
  66. connectParams.put("password", "db password")
  67. params.put("connectParams", connectParams)
  68. params.put("comment", "init")
  69. val updateParameterAction: UpdateDataSourceParameterAction = UpdateDataSourceParameterAction.builder()
  70. .setUser(user)
  71. .setDataSourceId(dataSourceId)
  72. .addRequestPayloads(params)
  73. .build()
  74. val updateParameterResult: UpdateDataSourceParameterResult = dataSourceclient.updateDataSourceParameter(updateParameterAction)
  75. val version: Long = updateParameterResult.getVersion
  76. //publish configuration version
  77. dataSourceclient.publishDataSourceVersion(
  78. PublishDataSourceVersionAction.builder()
  79. .setDataSourceId(dataSourceId)
  80. .setUser(user)
  81. .setVersion(version)
  82. .build())
  83. // use example
  84. val metadataGetDatabasesAction: MetadataGetDatabasesAction = MetadataGetDatabasesAction.builder()
  85. .setUser(user)
  86. .setDataSourceId(dataSourceId)
  87. .setSystem(system)
  88. .build()
  89. val metadataGetDatabasesResult: MetadataGetDatabasesResult = metaDataClient.getDatabases(metadataGetDatabasesAction)
  90. val metadataGetTablesAction: MetadataGetTablesAction = MetadataGetTablesAction.builder()
  91. .setUser(user)
  92. .setDataSourceId(dataSourceId)
  93. .setDatabase("linkis")
  94. .setSystem(system)
  95. .build()
  96. val metadataGetTablesResult: MetadataGetTablesResult = metaDataClient.getTables(metadataGetTablesAction)
  97. val metadataGetColumnsAction = MetadataGetColumnsAction.builder()
  98. .setUser(user)
  99. .setDataSourceId(dataSourceId)
  100. .setDatabase("linkis")
  101. .setSystem(system)
  102. .setTable("linkis_datasource")
  103. .build()
  104. val metadataGetColumnsResult: MetadataGetColumnsResult = metaDataClient.getColumns(metadataGetColumnsAction)
  105. }
  106. }

3.2 Hive data source

3.2.1 Created through the management console

You can only create configuration data sources, and test whether the data sources can be connected normally, and cannot directly query metadata

First need to configure the cluster environment information Table linkis_ps_dm_datasource_env

  1. INSERT INTO `linkis_ps_dm_datasource_env`
  2. (`env_name`, `env_desc`, `datasource_type_id`, `parameter`, `create_user`, `modify_user`)
  3. VALUES
  4. ('testEnv', 'TestEnv', 4, '{\r\n "keytab": "4dd408ad-a2f9-4501-83b3-139290977ca2",\r\n "uris": "thrift://clustername:9083 ",\r\n "principle":"hadoop@WEBANK.COM"\r\n}', 'user','user');

The primary key id, used as the envId, needs to pass the envId parameter to obtain information about the cluster configuration when establishing a connection. Explanation of configuration fields:

  1. {
  2. "keytab": "bml resource id", //keytab stores the resourceId in the material library, which currently needs to be manually uploaded through the http interface.
  3. "uris": "thrift://clustername:9083",
  4. "principle":"hadoop@WEBANK.COM" //Authenticated principle
  5. }

Create on the web side:

create_hive

3.2.2 Using the client

  1. package org.apache.linkis.datasource.client
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import org.apache.linkis.common.utils.JsonUtils
  5. import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}
  6. import org.apache.linkis.datasource.client.request._
  7. import org.apache.linkis.datasource.client.response._
  8. import org.apache.linkis.datasourcemanager.common.domain.DataSource
  9. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  10. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  11. import org.junit.jupiter.api.{Disabled, Test}
  12. object TestHiveClient {
  13. val gatewayUrl = "http://127.0.0.1:9001"
  14. val clientConfig = DWSClientConfigBuilder.newBuilder
  15. .addServerUrl(gatewayUrl)
  16. .connectionTimeout(30000)
  17. .discoveryEnabled(false)
  18. .discoveryFrequency(1, TimeUnit.MINUTES)
  19. .loadbalancerEnabled(true)
  20. .maxConnectionSize(1)
  21. .retryEnabled(false)
  22. .readTimeout(30000)
  23. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  24. .setAuthTokenKey("hadoop")
  25. .setAuthTokenValue("xxxxx")
  26. .setDWSVersion("v1")
  27. val dataSourceclient = new LinkisDataSourceRemoteClient(clientConfig.build())
  28. val clientConfig2 = DWSClientConfigBuilder.newBuilder
  29. .addServerUrl(gatewayUrl)
  30. .connectionTimeout(30000)
  31. .discoveryEnabled(false)
  32. .discoveryFrequency(1, TimeUnit.MINUTES)
  33. .loadbalancerEnabled(true)
  34. .maxConnectionSize(1)
  35. .retryEnabled(false)
  36. .readTimeout(30000)
  37. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  38. .setAuthTokenKey("hadoop")
  39. .setAuthTokenValue("xxxxx")
  40. .setDWSVersion("v1")
  41. val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig2.build())
  42. @Test
  43. @Disabled
  44. def testCreateDataSourceMysql: Unit = {
  45. val user = "hadoop"
  46. val system = "Linkis"
  47. //create data source
  48. val dataSource = new DataSource();
  49. dataSource.setDataSourceName("for-hive-test")
  50. dataSource.setDataSourceDesc("this is for hive test")
  51. dataSource.setCreateSystem(system)
  52. dataSource.setDataSourceTypeId(4L)
  53. val map = JsonUtils.jackson.readValue(JsonUtils.jackson.writeValueAsString(dataSource), new util.HashMap[String, Any]().getClass)
  54. val createDataSourceAction: CreateDataSourceAction = CreateDataSourceAction.builder()
  55. .setUser(user)
  56. .addRequestPayloads(map)
  57. .build()
  58. val createDataSourceResult: CreateDataSourceResult = dataSourceclient.createDataSource(createDataSourceAction)
  59. val dataSourceId = createDataSourceResult.getInsertId
  60. // set connection parameters
  61. val params = new util.HashMap[String, Any]
  62. val connectParams = new util.HashMap[String, Any]
  63. connectParams.put("envId", "3")
  64. params.put("connectParams", connectParams)
  65. params.put("comment", "init")
  66. val updateParameterAction: UpdateDataSourceParameterAction = UpdateDataSourceParameterAction.builder()
  67. .setUser(user)
  68. .setDataSourceId(dataSourceId)
  69. .addRequestPayloads(params)
  70. .build()
  71. val updateParameterResult: UpdateDataSourceParameterResult = dataSourceclient.updateDataSourceParameter(updateParameterAction)
  72. val version: Long = updateParameterResult.getVersion
  73. //publish configuration version
  74. dataSourceclient.publishDataSourceVersion(
  75. PublishDataSourceVersionAction.builder()
  76. .setDataSourceId(dataSourceId)
  77. .setUser(user)
  78. .setVersion(version)
  79. .build())
  80. // use example
  81. val metadataGetDatabasesAction: MetadataGetDatabasesAction = MetadataGetDatabasesAction.builder()
  82. .setUser(user)
  83. .setDataSourceId(dataSourceId)
  84. .setSystem(system)
  85. .build()
  86. val metadataGetDatabasesResult: MetadataGetDatabasesResult = metaDataClient.getDatabases(metadataGetDatabasesAction)
  87. val metadataGetTablesAction: MetadataGetTablesAction = MetadataGetTablesAction.builder()
  88. .setUser(user)
  89. .setDataSourceId(dataSourceId)
  90. .setDatabase("linkis_test_ind")
  91. .setSystem(system)
  92. .build()
  93. val metadataGetTablesResult: MetadataGetTablesResult = metaDataClient.getTables(metadataGetTablesAction)
  94. val metadataGetColumnsAction = MetadataGetColumnsAction.builder()
  95. .setUser(user)
  96. .setDataSourceId(dataSourceId)
  97. .setDatabase("linkis_test_ind")
  98. .setSystem(system)
  99. .setTable("test")
  100. .build()
  101. val metadataGetColumnsResult: MetadataGetColumnsResult = metaDataClient.getColumns(metadataGetColumnsAction)
  102. }
  103. }