Spark Load

Spark load realizes the preprocessing of load data by spark, improves the performance of loading large amount of Doris data and saves the computing resources of Doris cluster. It is mainly used for the scene of initial migration and large amount of data imported into Doris.

Spark load is an asynchronous load method. Users need to create spark type load job by MySQL protocol and view the load results by show load.

Applicable scenarios

  • The source data is in a file storage system that spark can access, such as HDFS.

  • The data volume ranges from tens of GB to TB.

Explanation of terms

  1. Frontend (FE): metadata and scheduling node of Doris system. In the load process, it is mainly responsible for the scheduling of load jobs.

  2. Backend (be): the computing and storage node of Doris system. In the load process, it is mainly responsible for data writing and storage.

  3. Spark ETL: in the load process, it is mainly responsible for ETL of data, including global dictionary construction (bitmap type), partition, sorting, aggregation, etc.

  4. Broker: broker is an independent stateless process. It encapsulates the file system interface and provides the ability of Doris to read the files in the remote storage system.

  5. Global dictionary: it stores the data structure from the original value to the coded value. The original value can be any data type, while the encoded value is an integer. The global dictionary is mainly used in the scene of precise de duplication precomputation.

Basic principles

Basic process

The user submits spark type load job by MySQL client, Fe records metadata and returns that the user submitted successfully.

The implementation of spark load task is mainly divided into the following five stages.

  1. Fe schedules and submits ETL tasks to spark cluster for execution.

  2. Spark cluster executes ETL to complete the preprocessing of load data. It includes global dictionary building (bitmap type), partitioning, sorting, aggregation, etc.

  3. After the ETL task is completed, Fe obtains the data path of each partition that has been preprocessed, and schedules the related be to execute the push task.

  4. Be reads data through broker and converts it into Doris underlying storage format.

  5. Fe schedule the effective version and complete the load job.

  1. +
  2. | 0. User create spark load job
  3. +----v----+
  4. | FE |---------------------------------+
  5. +----+----+ |
  6. | 3. FE send push tasks |
  7. | 5. FE publish version |
  8. +------------+------------+ |
  9. | | | |
  10. +---v---+ +---v---+ +---v---+ |
  11. | BE | | BE | | BE | |1. FE submit Spark ETL job
  12. +---^---+ +---^---+ +---^---+ |
  13. |4. BE push with broker | |
  14. +---+---+ +---+---+ +---+---+ |
  15. |Broker | |Broker | |Broker | |
  16. +---^---+ +---^---+ +---^---+ |
  17. | | | |
  18. +---+------------+------------+---+ 2.ETL +-------------v---------------+
  19. | HDFS +-------> Spark cluster |
  20. | <-------+ |
  21. +---------------------------------+ +-----------------------------+

Global dictionary

Applicable scenarios

At present, the bitmap column in Doris is implemented using the class library ‘roaingbitmap‘, while the input data type of ‘roaringbitmap‘ can only be integer. Therefore, if you want to pre calculate the bitmap column in the import process, you need to convert the type of input data to integer.

In the existing Doris import process, the data structure of global dictionary is implemented based on hive table, which stores the mapping from original value to encoded value.

Build process

  1. Read the data from the upstream data source and generate a hive temporary table, which is recorded as hive_table.

  2. Extract the de duplicated values of the fields to be de duplicated from the hive_table, and generate a new hive table, which is marked as distinct_value_table.

  3. Create a new global dictionary table named dict_table; one column is the original value, and the other is the encoded value.

  4. Left join the distinct_value_table and dict_table, calculate the new de duplication value set, and then code this set with window function. At this time, the original value of the de duplication column will have one more column of encoded value. Finally, the data of these two columns will be written back to dict_table.

  5. Join the dict_table with the hive_table to replace the original value in the hive_table with the integer encoded value.

  6. hive_table will be read by the next data preprocessing process and imported into Doris after calculation.

Data preprocessing (DPP)

Basic process

  1. Read data from the data source. The upstream data source can be HDFS file or hive table.

  2. Map the read data, calculate the expression, and generate the bucket field bucket_id according to the partition information.

  3. Generate rolluptree according to rollup metadata of Doris table.

  4. Traverse rolluptree to perform hierarchical aggregation. The rollup of the next level can be calculated from the rollup of the previous level.

  5. After each aggregation calculation, the data will be calculated according to the bucket_idis divided into buckets and then written into HDFS.

  6. Subsequent brokers will pull the files in HDFS and import them into Doris be.

Basic operation

Configure ETL cluster

As an external computing resource, spark is used to complete ETL work in Doris. In the future, there may be other external resources that will be used in Doris, such as spark / GPU for query, HDFS / S3 for external storage, MapReduce for ETL, etc. Therefore, we introduce resource management to manage these external resources used by Doris.

Before submitting the spark import task, you need to configure the spark cluster that performs the ETL task.

Grammar:

  1. -- create spark resource
  2. CREATE EXTERNAL RESOURCE resource_name
  3. PROPERTIES
  4. (
  5. type = spark,
  6. spark_conf_key = spark_conf_value,
  7. working_dir = path,
  8. broker = broker_name,
  9. broker.property_key = property_value
  10. )
  11. -- drop spark resource
  12. DROP RESOURCE resource_name
  13. -- show resources
  14. SHOW RESOURCES
  15. SHOW PROC "/resources"
  16. -- privileges
  17. GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
  18. GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
  19. REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
  20. REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name

Create resource

resource_name is the name of the spark resource configured in Doris.

Properties are the parameters related to spark resources, as follows:

  • type: resource type, required. Currently, only spark is supported.

  • Spark related parameters are as follows:

    • spark.master: required, yarn is supported at present, spark://host:port.

    • spark.submit.deployMode: the deployment mode of Spark Program. It is required and supports cluster and client.

    • spark.hadoop.yarn.resourcemanager.address: required when master is yarn.

    • spark.hadoop.fs.defaultfs: required when master is yarn.

    • Other parameters are optional, refer to http://spark.apache.org/docs/latest/configuration.html

  • working_dir: directory used by ETL. Spark is required when used as an ETL resource. For example: hdfs://host :port/tmp/doris.

  • broker: the name of the broker. Spark is required when used as an ETL resource. You need to use the ‘alter system add broker’ command to complete the configuration in advance.

  • broker.property_key: the authentication information that the broker needs to specify when reading the intermediate file generated by ETL.

Example:

  1. -- yarn cluster 模式
  2. CREATE EXTERNAL RESOURCE "spark0"
  3. PROPERTIES
  4. (
  5. "type" = "spark",
  6. "spark.master" = "yarn",
  7. "spark.submit.deployMode" = "cluster",
  8. "spark.jars" = "xxx.jar,yyy.jar",
  9. "spark.files" = "/tmp/aaa,/tmp/bbb",
  10. "spark.executor.memory" = "1g",
  11. "spark.yarn.queue" = "queue0",
  12. "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
  13. "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
  14. "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  15. "broker" = "broker0",
  16. "broker.username" = "user0",
  17. "broker.password" = "password0"
  18. );
  19. -- spark standalone client 模式
  20. CREATE EXTERNAL RESOURCE "spark1"
  21. PROPERTIES
  22. (
  23. "type" = "spark",
  24. "spark.master" = "spark://127.0.0.1:7777",
  25. "spark.submit.deployMode" = "client",
  26. "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  27. "broker" = "broker1"
  28. );

Show resources

Ordinary accounts can only see the resources that they have USAGE_PRIV to use.

The root and admin accounts can see all the resources.

Resource privilege

Resource permissions are managed by grant revoke. Currently, only USAGE_PRIV permission is supported.

You can use the USAGE_PRIV permission is given to a user or a role, and the role is used the same as before.

  1. -- Grant permission to the spark0 resource to user user0
  2. GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
  3. -- Grant permission to the spark0 resource to role ROLE0
  4. GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
  5. -- Grant permission to all resources to user user0
  6. GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
  7. -- Grant permission to all resources to role ROLE0
  8. GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
  9. -- Revoke the spark0 resource permission of user user0
  10. REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

Configure spark client

The Fe submits the spark task by executing the spark submit command. Therefore, it is necessary to configure the spark client for Fe. It is recommended to use the official version of spark 2 above 2.4.3, download spark hereSpark Load - 图1 (opens new window). After downloading, please follow the steps to complete the following configuration.

Configure SPARK_HOME environment variable

Place the spark client on the same machine as Fe and configure spark_home_default_dir in the fe.conf. This configuration item defaults to the fe/lib/spark2x path. This config cannot be empty.

Configure spark dependencies

Package all jar packages in jars folder under spark client root path into a zip file, and configure spark_resource_patj in fe.conf as this zip file’s path.

When the spark load task is submitted, this zip file will be uploaded to the remote repository, and the default repository path will be hung in working_dir/{cluster_ID} directory named as __spark_repository__{resource_name}, which indicates that a resource in the cluster corresponds to a remote warehouse. The directory structure of the remote warehouse is as follows:

  1. __spark_repository__spark0/
  2. |-__archive_1.0.0/
  3. | |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar
  4. | |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
  5. |-__archive_1.1.0/
  6. | |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar
  7. | |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip
  8. |-__archive_1.2.0/
  9. | |-...

In addition to spark dependency (named by spark-2x.zip by default), Fe will also upload DPP’s dependency package to the remote repository. If all the dependency files submitted by spark load already exist in the remote repository, then there is no need to upload dependency, saving the time of repeatedly uploading a large number of files each time.

Configure yarn client

The Fe obtains the running application status and kills the application by executing the yarn command. Therefore, you need to configure the yarn client for Fe. It is recommended to use the official version of Hadoop above 2.5.2, download hadoopSpark Load - 图2 (opens new window). After downloading, please follow the steps to complete the following configuration.

Configure the yarn client path

Place the downloaded yarn client in the same machine as Fe, and configure yarn_client_path in the fe.conf as the executable file of yarn, which is set as the fe/lib/yarn-client/hadoop/bin/yarn by default.

(optional) when Fe obtains the application status or kills the application through the yarn client, the configuration files required for executing the yarn command will be generated by default in the lib/yarn-config path in the Fe root directory. This path can be configured by configuring yarn-config-dir in the fe.conf. The currently generated configuration yarn config files include core-site.xml and yarn-site.xml.

Create load

Grammar:

  1. LOAD LABEL load_label
  2. (data_desc, ...)
  3. WITH RESOURCE resource_name resource_properties
  4. [PROPERTIES (key1=value1, ... )]
  5. * load_label:
  6. db_name.label_name
  7. * data_desc:
  8. DATA INFILE ('file_path', ...)
  9. [NEGATIVE]
  10. INTO TABLE tbl_name
  11. [PARTITION (p1, p2)]
  12. [COLUMNS TERMINATED BY separator ]
  13. [(col1, ...)]
  14. [SET (k1=f1(xx), k2=f2(xx))]
  15. [WHERE predicate]
  16. * resource_properties:
  17. (key2=value2, ...)

Example 1: when the upstream data source is HDFS file

  1. LOAD LABEL db1.label1
  2. (
  3. DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
  4. INTO TABLE tbl1
  5. COLUMNS TERMINATED BY ","
  6. (tmp_c1,tmp_c2)
  7. SET
  8. (
  9. id=tmp_c2,
  10. name=tmp_c1
  11. ),
  12. DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
  13. INTO TABLE tbl2
  14. COLUMNS TERMINATED BY ","
  15. (col1, col2)
  16. where col1 > 1
  17. )
  18. WITH RESOURCE 'spark0'
  19. (
  20. "spark.executor.memory" = "2g",
  21. "spark.shuffle.compress" = "true"
  22. )
  23. PROPERTIES
  24. (
  25. "timeout" = "3600"
  26. );

Example 2: when the upstream data source is hive table

  1. step 1:新建hive外部表
  2. CREATE EXTERNAL TABLE hive_t1
  3. (
  4. k1 INT,
  5. K2 SMALLINT,
  6. k3 varchar(50),
  7. uuid varchar(100)
  8. )
  9. ENGINE=hive
  10. properties
  11. (
  12. "database" = "tmp",
  13. "table" = "t1",
  14. "hive.metastore.uris" = "thrift://0.0.0.0:8080"
  15. );
  16. step 2: 提交load命令
  17. LOAD LABEL db1.label1
  18. (
  19. DATA FROM TABLE hive_t1
  20. INTO TABLE tbl1
  21. (k1,k2,k3)
  22. SET
  23. (
  24. uuid=bitmap_dict(uuid)
  25. )
  26. )
  27. WITH RESOURCE 'spark0'
  28. (
  29. "spark.executor.memory" = "2g",
  30. "spark.shuffle.compress" = "true"
  31. )
  32. PROPERTIES
  33. (
  34. "timeout" = "3600"
  35. );

Example 3: when the upstream data source is hive binary type table

  1. step 1: create hive external table
  2. CREATE EXTERNAL TABLE hive_t1
  3. (
  4. k1 INT,
  5. K2 SMALLINT,
  6. k3 varchar(50),
  7. uuid varchar(100)
  8. )
  9. ENGINE=hive
  10. properties
  11. (
  12. "database" = "tmp",
  13. "table" = "t1",
  14. "hive.metastore.uris" = "thrift://0.0.0.0:8080"
  15. );
  16. step 2: submit load command
  17. LOAD LABEL db1.label1
  18. (
  19. DATA FROM TABLE hive_t1
  20. INTO TABLE tbl1
  21. (k1,k2,k3)
  22. SET
  23. (
  24. uuid=binary_bitmap(uuid)
  25. )
  26. )
  27. WITH RESOURCE 'spark0'
  28. (
  29. "spark.executor.memory" = "2g",
  30. "spark.shuffle.compress" = "true"
  31. )
  32. PROPERTIES
  33. (
  34. "timeout" = "3600"
  35. );

You can view the details syntax about creating load by input help spark load. This paper mainly introduces the parameter meaning and precautions in the creation and load syntax of spark load.

Label

Identification of the import task. Each import task has a unique label within a single database. The specific rules are consistent with broker load.

Data description parameters

Currently, the supported data sources are CSV and hive table. Other rules are consistent with broker load.

Load job parameters

Load job parameters mainly refer to the opt_properties in the spark load. Load job parameters are applied to the entire load job. The rules are consistent with broker load.

Spark resource parameters

Spark resources need to be configured into the Doris system in advance, and users should be given USAGE_PRIV. Spark load can only be used after priv permission.

When users have temporary requirements, such as adding resources for tasks and modifying spark configs, you can set them here. The settings only take effect for this task and do not affect the existing configuration in the Doris cluster.

  1. WITH RESOURCE 'spark0'
  2. (
  3. "spark.driver.memory" = "1g",
  4. "spark.executor.memory" = "3g"
  5. )

Load when data source is hive table

At present, if you want to use hive table as a data source in the import process, you need to create an external table of type hive,

Then you can specify the table name of the external table when submitting the Load command.

Load process to build global dictionary

The data type applicable to the aggregate columns of the Doris table is of type bitmap.

In the load command, you can specify the field to build a global dictionary. The format is: ‘doris field name=bitmap_dict(hive_table field name)

It should be noted that the construction of global dictionary is supported only when the upstream data source is hive table.

Load when data source is hive binary type table

The data type applicable to the aggregate column of the doris table is bitmap type, and the data type of the corresponding column in the hive table of the data source is binary (through the org.apache.doris.load.loadv2.dpp.BitmapValue (FE spark-dpp) class serialized) type.

There is no need to build a global dictionary, just specify the corresponding field in the load command, the format is: doris field name=binary_bitmap (hive table field name)

Similarly, the binary (bitmap) type of data import is currently only supported when the upstream data source is a hive table.

Show load

Spark load is asynchronous just like broker load, so the user must create the load label record and use label in the show load command to view the load result. The show load command is common in all load types. The specific syntax can be viewed by executing help show load.

Example:

  1. mysql> show load order by createtime desc limit 1\G
  2. *************************** 1. row ***************************
  3. JobId: 76391
  4. Label: label1
  5. State: FINISHED
  6. Progress: ETL:100%; LOAD:100%
  7. Type: SPARK
  8. EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
  9. TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
  10. ErrorMsg: N/A
  11. CreateTime: 2019-07-27 11:46:42
  12. EtlStartTime: 2019-07-27 11:46:44
  13. EtlFinishTime: 2019-07-27 11:49:44
  14. LoadStartTime: 2019-07-27 11:49:44
  15. LoadFinishTime: 2019-07-27 11:50:16
  16. URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
  17. JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}

Refer to broker load for the meaning of parameters in the returned result set. The differences are as follows:

  • State

The current phase of the load job. After the job is submitted, the status is pending. After the spark ETL is submitted, the status changes to ETL. After ETL is completed, Fe schedules be to execute push operation, and the status changes to finished after the push is completed and the version takes effect.

There are two final stages of the load job: cancelled and finished. When the load job is in these two stages, the load is completed. Among them, cancelled is load failure, finished is load success.

  • Progress

Progress description of the load job. There are two kinds of progress: ETL and load, corresponding to the two stages of the load process, ETL and loading.

The progress range of load is 0 ~ 100%.

Load progress = the number of tables that have completed all replica imports / the total number of tables in this import task * 100%

If all load tables are loaded, the progress of load is 99%, the load enters the final effective stage. After the whole load is completed, the load progress will be changed to 100%.

The load progress is not linear. Therefore, if the progress does not change over a period of time, it does not mean that the load is not in execution.

  • Type

Type of load job. Spark load is spark.

  • CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime

These values represent the creation time of the load, the start time of the ETL phase, the completion time of the ETL phase, the start time of the loading phase, and the completion time of the entire load job.

  • JobDetails

Display the detailed running status of some jobs, which will be updated when ETL ends. It includes the number of loaded files, the total size (bytes), the number of subtasks, the number of processed original lines, etc.

{"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}

  • URL

Copy this url to the browser and jump to the web interface of the corresponding application.

View spark launcher commit log

Sometimes users need to view the detailed logs generated during the spark submission process. The logs are saved in the log/spark_launcher_log under the Fe root directory named as spark_launcher_{load_job_id}_{label}.log. The log will be saved in this directory for a period of time. When the load information in Fe metadata is cleaned up, the corresponding log will also be cleaned. The default saving log time is 3 days.

cancel load

When the spark load job status is not cancelled or finished, it can be manually cancelled by the user. When canceling, you need to specify the label to cancel the load job. The syntax of the cancel load command can be viewed by executing help cancel load.

FE configuration

The following configuration belongs to the system level configuration of spark load, that is, the configuration for all spark load import tasks. Mainly through modificationfe.conf to modify the configuration value.

  • enable_spark_load

Open spark load and create resource. The default value is false. This feature is turned off.

  • spark_load_default_timeout_second

The default timeout for tasks is 259200 seconds (3 days).

  • spark_home_default_dir

Spark client path (Fe/lib/spark2x).

  • spark_resource_path

The path of the packaged spark dependent file (empty by default).

  • spark_launcher_log_dir

The directory where the spark client’s commit log is stored (Fe/log/spark)_launcher_log).

  • yarn_client_path

The path of the yarn binary executable file (`Fe/lib/yarn-client/Hadoop/bin/yarn’).

  • yarn_config_dir

The path to generate the yarn configuration file (Fe/lib/yarn-config).

Best practices

Application scenarios

The most suitable scenario to use spark load is that the raw data is in the file system (HDFS), and the amount of data is tens of GB to TB. Stream load or broker load is recommended for small amount of data.

FAQ

  • When using spark load, the HADOOP_CONF_DIR environment variable is no set in the spark-env.sh.

If the HADOOP_CONF_DIR environment variable is not set, the error When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment will be reported.

  • When using spark load, the spark_home_default_dir does not specify correctly.

The spark submit command is used when submitting a spark job. If spark_home_default_dir is set incorrectly, an error Cannot run program 'xxx/bin/spark_submit', error = 2, no such file or directory will be reported.

  • When using spark load, spark_resource_path does not point to the packaged zip file.

If spark_resource_path is not set correctly. An error file XXX/jars/spark-2x.zip does not exist will be reported.

  • When using spark load yarn_client_path does not point to a executable file of yarn.

If yarn_client_path is not set correctly. An error yarn client does not exist in path: XXX/yarn-client/hadoop/bin/yarn will be reported.