Imported data transformation, column mapping and filtering

Supported import methods

  • BROKER LOAD

    1. LOAD LABEL example_db.label1
    2. (
    3. DATA INFILE("bos://bucket/input/file")
    4. INTO TABLE `my_table`
    5. (k1, k2, tmpk3)
    6. PRECEDING FILTER k1 = 1
    7. SET (
    8. k3 = tmpk3 + 1
    9. )
    10. WHERE k1 > k2
    11. )
    12. WITH BROKER bos
    13. (
    14. ...
    15. );
  • STREAM LOAD

    1. curl
    2. --location-trusted
    3. -u user:passwd
    4. -H "columns: k1, k2, tmpk3, k3 = tmpk3 + 1"
    5. -H "where: k1 > k2"
    6. -T file.txt
    7. http://host:port/api/testDb/testTbl/_stream_load
  • ROUTINE LOAD

    1. CREATE ROUTINE LOAD example_db.label1 ON my_table
    2. COLUMNS(k1, k2, tmpk3, k3 = tmpk3 + 1),
    3. PRECEDING FILTER k1 = 1,
    4. WHERE k1 > k2
    5. ...

The above import methods all support column mapping, transformation and filtering operations on the source data:

  • Pre-filtering: filter the read raw data once.

    1. PRECEDING FILTER k1 = 1
  • Mapping: Define the columns in the source data. If the defined column name is the same as the column in the table, it is directly mapped to the column in the table. If different, the defined column can be used for subsequent transformation operations. As in the example above:

    1. (k1, k2, tmpk3)
  • Conversion: Convert the mapped columns in the first step, you can use built-in expressions, functions, and custom functions for conversion, and remap them to the corresponding columns in the table. As in the example above:

    1. k3 = tmpk3 + 1
  • Post filtering: Filter the mapped and transformed columns by expressions. Filtered data rows are not imported into the system. As in the example above:

    1. WHERE k1 > k2

column mapping

The purpose of column mapping is mainly to describe the information of each column in the import file, which is equivalent to defining the name of the column in the source data. By describing the column mapping relationship, we can import source files with different column order and different number of columns into Doris. Below we illustrate with an example:

Assuming that the source file has 4 columns, the contents are as follows (the header column names are only for convenience, and there is no header actually):

Column 1Column 2Column 3Column 4
1100beijing1.1
2200shanghai1.2
3300guangzhou1.3
4\Nchongqing1.4

Note: \N means null in the source file.

  1. Adjust the mapping order

    Suppose there are 4 columns k1,k2,k3,k4 in the table. The import mapping relationship we want is as follows:

    1. column 1 -> k1
    2. column 2 -> k3
    3. column 3 -> k2
    4. column 4 -> k4

    Then the column mapping should be written in the following order:

    1. (k1, k3, k2, k4)
  2. There are more columns in the source file than in the table

    Suppose there are 3 columns k1,k2,k3 in the table. The import mapping relationship we want is as follows:

    1. column 1 -> k1
    2. column 2 -> k3
    3. column 3 -> k2

    Then the column mapping should be written in the following order:

    1. (k1, k3, k2, tmpk4)

    where tmpk4 is a custom column name that does not exist in the table. Doris ignores this non-existing column name.

  3. The number of columns in the source file is less than the number of columns in the table, fill with default values

    Suppose there are 5 columns k1,k2,k3,k4,k5 in the table. The import mapping relationship we want is as follows:

    1. column 1 -> k1
    2. column 2 -> k3
    3. column 3 -> k2

    Here we only use the first 3 columns from the source file. The two columns k4,k5 want to be filled with default values.

    Then the column mapping should be written in the following order:

    1. (k1, k3, k2)

    If the k4,k5 columns have default values, the default values will be populated. Otherwise, if it is a nullable column, it will be populated with a null value. Otherwise, the import job will report an error.

Column pre-filtering

Pre-filtering is to filter the read raw data once. Currently only BROKER LOAD and ROUTINE LOAD are supported.

Pre-filtering has the following application scenarios:

  1. Filter before conversion

    Scenarios where you want to filter before column mapping and transformation. It can filter out some unwanted data first.

  2. The filter column does not exist in the table, it is only used as a filter identifier

    For example, the source data stores the data of multiple tables (or the data of multiple tables is written to the same Kafka message queue). Each row in the data has a column name to identify which table the row of data belongs to. Users can filter the corresponding table data for import by pre-filtering conditions.

Column conversion

The column transformation function allows users to transform column values in the source file. Currently, Doris supports most of the built-in functions and user-defined functions for conversion.

Note: The user-defined function belongs to a certain database. When using the user-defined function for conversion, the user needs to have read permission to this database.

Transformation operations are usually defined along with column mappings. That is, the columns are first mapped and then converted. Below we illustrate with an example:

Assuming that the source file has 4 columns, the contents are as follows (the header column names are only for convenience, and there is no header actually):

Column 1Column 2Column 3Column 4
1100beijing1.1
2200shanghai1.2
3300guangzhou1.3
\N400chongqing1.4
  1. Convert the column values in the source file and import them into the table

    Suppose there are 4 columns k1,k2,k3,k4 in the table. Our desired import mapping and transformation relationship is as follows:

    1. column 1 -> k1
    2. column 2 * 100 -> k3
    3. column 3 -> k2
    4. column 4 -> k4

    Then the column mapping should be written in the following order:

    1. (k1, tmpk3, k2, k4, k3 = tmpk3 * 100)

    This is equivalent to us naming the second column in the source file tmpk3, and specifying that the value of the k3 column in the table is tmpk3 * 100. The data in the final table is as follows:

    k1k2k3k4
    1beijing100001.1
    2shanghai200001.2
    3guangzhou300001.3
    nullchongqing400001.4
  2. Through the case when function, column conversion is performed conditionally.

    Suppose there are 4 columns k1,k2,k3,k4 in the table. We hope that beijing, shanghai, guangzhou, chongqing in the source data are converted to the corresponding region ids and imported:

    1. column 1 -> k1
    2. column 2 -> k2
    3. Column 3 after region id conversion -> k3
    4. column 4 -> k4

    Then the column mapping should be written in the following order:

    1. (k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)

    The data in the final table is as follows:

    k1k2k3k4
    110011.1
    220021.2
    330031.3
    null40041.4
  3. Convert the null value in the source file to 0 and import it. At the same time, the region id conversion in Example 2 is also performed.

    Suppose there are 4 columns k1,k2,k3,k4 in the table. While converting the region id, we also want to convert the null value of the k1 column in the source data to 0 and import:

    1. Column 1 is converted to 0 if it is null -> k1
    2. column 2 -> k2
    3. column 3 -> k3
    4. column 4 -> k4

    Then the column mapping should be written in the following order:

    1. (tmpk1, k2, tmpk3, k4, k1 = ifnull(tmpk1, 0), k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)

    The data in the final table is as follows:

    k1k2k3k4
    110011.1
    220021.2
    330031.3
    040041.4

List filter

After column mapping and transformation, we can filter the data that we do not want to import into Doris through filter conditions. Below we illustrate with an example:

Assuming that the source file has 4 columns, the contents are as follows (the header column names are only for convenience, and there is no header actually):

Column 1Column 2Column 3Column 4
1100beijing1.1
2200shanghai1.2
3300guangzhou1.3
\N400chongqing1.4
  1. In the default case of column mapping and transformation, filter directly

    Suppose there are 4 columns k1,k2,k3,k4 in the table. We can define filter conditions directly with default column mapping and transformation. If we want to import only the data rows whose fourth column in the source file is greater than 1.2, the filter conditions are as follows:

    1. where k4 > 1.2

    The data in the final table is as follows:

    k1k2k3k4
    3300guangzhou1.3
    null400chongqing1.4

    By default, Doris maps columns sequentially, so column 4 in the source file is automatically mapped to column k4 in the table.

  2. Filter the column-transformed data

    Suppose there are 4 columns k1,k2,k3,k4 in the table. In the column conversion example, we converted province names to ids. Here we want to filter out the data with id 3. Then the conversion and filter conditions are as follows:

    1. (k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
    2. where k3 != 3

    The data in the final table is as follows:

    k1k2k3k4
    110011.1
    220021.2
    null40041.4

    Here we see that the column value when performing the filter is the final column value after mapping and transformation, not the original data.

  3. Multi-condition filtering

    Suppose there are 4 columns k1,k2,k3,k4 in the table. We want to filter out the data whose k1 column is null, and at the same time filter out the data whose k4 column is less than 1.2, the filter conditions are as follows:

    1. where k1 is not null and k4 >= 1.2

    The data in the final table is as follows:

    k1k2k3k4
    220021.2
    330031.3

Data Quality Issues and Filtering Thresholds

The rows of data processed in an import job can be divided into the following three types:

  1. Filtered Rows

    Data that was filtered out due to poor data quality. Unqualified data quality includes data format problems such as type error, precision error, long string length, mismatched file column number, and data rows that are filtered out because there is no corresponding partition.

  2. Unselected Rows

    This part is the row of data that was filtered out due to preceding filter or where column filter conditions.

  3. Loaded Rows

    Rows of data being imported correctly.

Doris’s import task allows the user to set a maximum error rate (max_filter_ratio). If the error rate of the imported data is below the threshold, those erroneous rows will be ignored and other correct data will be imported.

The error rate is calculated as:

  1. #Filtered Rows / (#Filtered Rows + #Loaded Rows)

That is to say, Unselected Rows will not participate in the calculation of the error rate.