Batch Delete

Currently, Doris supports multiple import methods such as broker load, routine load, stream load, etc. The data can only be deleted through the delete statement at present. When the delete statement is used to delete, a new data version will be generated every time delete is executed. Frequent deletion will seriously affect the query performance, and when using the delete method to delete, it is achieved by generating an empty rowset to record the deletion conditions. Each time you read, you must filter the deletion jump conditions. Also when there are many conditions, Performance affects. Compared with other systems, the implementation of greenplum is more like a traditional database product. Snowflake is implemented through the merge syntax.

For scenarios similar to the import of cdc data, insert and delete in the data data generally appear interspersed. In this scenario, our current import method is not enough, even if we can separate insert and delete, it can solve the import problem , But still cannot solve the problem of deletion. Use the batch delete function to solve the needs of these scenarios. There are three ways to merge data import:

  1. APPEND: All data are appended to existing data
  2. DELETE: delete all rows with the same key column value as the imported data
  3. MERGE: APPEND or DELETE according to DELETE ON decision

Principle

This is achieved by adding a hidden column __DELETE_SIGN__, because we are only doing batch deletion on the unique model, so we only need to add a hidden column whose type is bool and the aggregate function is replace. In be, the various aggregation write processes are the same as normal columns, and there are two read schemes:

Remove __DELETE_SIGN__ when fe encounters extensions such as *, and add the condition of __DELETE_SIGN__ != true by default When be reads, a column is added for judgment, and the condition is used to determine whether to delete.

Import

When importing, set the value of the hidden column to the value of the DELETE ON expression during fe parsing. The other aggregation behaviors are the same as the replace aggregation column

Read

When reading, add the condition of __DELETE_SIGN__ != true to all olapScanNodes with hidden columns, be does not perceive this process and executes normally

Cumulative Compaction

In Cumulative Compaction, hidden columns are treated as normal columns, and the compaction logic remains unchanged

Base Compaction

In Base Compaction, delete the rows marked for deletion to reduce the space occupied by data

Syntax

The import syntax design is mainly to add a column mapping that specifies the field of the delete mark column, and this column needs to be added to the imported data. The method of setting each import method is as follows

stream load

The wording of stream load adds a field to set the delete mark column in the columns field in the header. Example -H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"

broker load

Set the field to delete the mark column at PROPERTIES

  1. LOAD LABEL db1.label1
  2. (
  3. [MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
  4. INTO TABLE tbl1
  5. COLUMNS TERMINATED BY ","
  6. (tmp_c1,tmp_c2, label_c3)
  7. SET
  8. (
  9. id=tmp_c2,
  10. name=tmp_c1,
  11. )
  12. [DELETE ON label=true]
  13. )
  14. WITH BROKER'broker'
  15. (
  16. "username"="user",
  17. "password"="pass"
  18. )
  19. PROPERTIES
  20. (
  21. "timeout" = "3600"
  22. );

routine load

Routine load adds a mapping to the columns field. The mapping method is the same as above, the example is as follows

  1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  2. [WITH MERGE|APPEND|DELETE]
  3. COLUMNS(k1, k2, k3, v1, v2, label),
  4. WHERE k1> 100 and k2 like "%doris%"
  5. [DELETE ON label=true]
  6. PROPERTIES
  7. (
  8. "desired_concurrent_number"="3",
  9. "max_batch_interval" = "20",
  10. "max_batch_rows" = "300000",
  11. "max_batch_size" = "209715200",
  12. "strict_mode" = "false"
  13. )
  14. FROM KAFKA
  15. (
  16. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  17. "kafka_topic" = "my_topic",
  18. "kafka_partitions" = "0,1,2,3",
  19. "kafka_offsets" = "101,0,0,200"
  20. );

Enable bulk delete support

There are two ways of enabling batch delete support:

  1. By adding enable_batch_delete_by_default=true in the fe configuration file, all newly created tables after restarting fe support batch deletion, this option defaults to false

  2. For tables that have not changed the above fe configuration or for existing tables that do not support the bulk delete function, you can use the following statement: ALTER TABLE tablename ENABLE FEATURE "BATCH_DELETE" to enable the batch delete.

If you want to determine whether a table supports batch delete, you can set a session variable to display the hidden columns SET show_hidden_columns=true, and then use desc tablename, if there is a __DELETE_SIGN__ column in the output, it is supported, if not, it is not supported

Note

  1. Since import operations other than stream load may be executed out of order inside doris, if it is not stream load when importing using the MERGE method, it needs to be used with load sequence. For the specific syntax, please refer to the sequence column related documents
  2. DELETE ON condition can only be used with MERGE

Usage example

Let’s take stream load as an example to show how to use it

  1. Import data normally:
  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: APPEND" -T ~/table1_data http://127.0.0.1: 8130/api/test/table1/_stream_load

The APPEND condition can be omitted, which has the same effect as the following statement:

  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -T ~/table1_data http://127.0.0.1:8130/api/test/table1 /_stream_load
  1. Delete all data with the same key as the imported data
  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: DELETE" -T ~/table1_data http://127.0.0.1: 8130/api/test/table1/_stream_load

Before load:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 3 | 2 | tom | 2 |
  5. | 4 | 3 | bush | 3 |
  6. | 5 | 3 | helen | 3 |
  7. +--------+----------+----------+------+

Load data:

  1. 3,2,tom,0

After load:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 4 | 3 | bush | 3 |
  5. | 5 | 3 | helen | 3 |
  6. +--------+----------+----------+------+
  1. Import the same row as the key column of the row with site_id=1
  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: MERGE" -H "delete: siteid=1" -T ~/ table1_data http://127.0.0.1:8130/api/test/table1/_stream_load

Before load:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 4 | 3 | bush | 3 |
  5. | 5 | 3 | helen | 3 |
  6. | 1 | 1 | jim | 2 |
  7. +--------+----------+----------+------+

Load data:

  1. 2,1,grace,2
  2. 3,2,tom,2
  3. 1,1,jim,2

After load:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 4 | 3 | bush | 3 |
  5. | 2 | 1 | grace | 2 |
  6. | 3 | 2 | tom | 2 |
  7. | 5 | 3 | helen | 3 |
  8. +--------+----------+----------+------+