sequence列

sequence列目前只支持Uniq模型,Uniq模型主要针对需要唯一主键的场景,可以保证主键唯一性约束,但是由于使用REPLACE聚合方式,在同一批次中导入的数据,替换顺序不做保证,详细介绍可以参考这里。替换顺序无法保证则无法确定最终导入到表中的具体数据,存在了不确定性。

为了解决这个问题,Doris支持了sequence列,通过用户在导入时指定sequence列,相同key列下,REPLACE聚合类型的列将按照sequence列的值进行替换,较大值可以替换较小值,反之则无法替换。该方法将顺序的确定交给了用户,由用户控制替换顺序。

原理

通过增加一个隐藏列__DORIS_SEQUENCE_COL__实现,该列的类型由用户在建表时指定,在导入时确定该列具体值,并依据该值对REPLACE列进行替换。

建表

创建Uniq表时,将按照用户指定类型自动添加一个隐藏列__DORIS_SEQUENCE_COL__

导入

导入时,fe在解析的过程中将隐藏列的值设置成 order by 表达式的值(broker load和routine load),或者function_column.sequence_col表达式的值(stream load), value列将按照该值进行替换。隐藏列__DORIS_SEQUENCE_COL__的值既可以设置为数据源中一列,也可以是表结构中的一列。

读取

请求包含value列时需要需要额外读取__DORIS_SEQUENCE_COL__列,该列用于在相同key列下,REPLACE聚合函数替换顺序的依据,较大值可以替换较小值,反之则不能替换。

Cumulative Compaction

Cumulative Compaction 时和读取过程原理相同

Base Compaction

Base Compaction 时读取过程原理相同

语法

建表时语法方面在property中增加了一个属性,用来标识__DORIS_SEQUENCE_COL__的类型 导入的语法设计方面主要是增加一个从sequence列的到其他column的映射,各个导入方式设置的将在下面介绍

建表

创建Uniq表时,可以指定sequence列类型

  1. PROPERTIES (
  2. "function_column.sequence_type" = 'Date',
  3. );

sequence_type用来指定sequence列的类型,可以为整型和时间类型

stream load

stream load 的写法是在header中的function_column.sequence_col字段添加隐藏列对应的source_sequence的映射, 示例

  1. curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load

broker load

ORDER BY 处设置隐藏列映射的source_sequence字段

  1. LOAD LABEL db1.label1
  2. (
  3. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  4. INTO TABLE `tbl1`
  5. COLUMNS TERMINATED BY ","
  6. (k1,k2,source_sequence,v1,v2)
  7. ORDER BY source_sequence
  8. )
  9. WITH BROKER 'broker'
  10. (
  11. "username"="user",
  12. "password"="pass"
  13. )
  14. PROPERTIES
  15. (
  16. "timeout" = "3600"
  17. );

routine load

映射方式同上,示例如下

  1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  2. [WITH MERGE|APPEND|DELETE]
  3. COLUMNS(k1, k2, source_sequence, v1, v2),
  4. WHERE k1 > 100 and k2 like "%doris%"
  5. [ORDER BY source_sequence]
  6. PROPERTIES
  7. (
  8. "desired_concurrent_number"="3",
  9. "max_batch_interval" = "20",
  10. "max_batch_rows" = "300000",
  11. "max_batch_size" = "209715200",
  12. "strict_mode" = "false"
  13. )
  14. FROM KAFKA
  15. (
  16. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  17. "kafka_topic" = "my_topic",
  18. "kafka_partitions" = "0,1,2,3",
  19. "kafka_offsets" = "101,0,0,200"
  20. );

启用sequence column支持

在新建表时如果设置了function_column.sequence_type ,则新建表将支持sequence column。 对于一个不支持sequence column的表,如果想要使用该功能,可以使用如下语句: ALTER TABLE example_db.my_table ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "Date") 来启用。 如果确定一个表是否支持sequence column,可以通过设置一个session variable来显示隐藏列 SET show_hidden_columns=true ,之后使用desc tablename,如果输出中有__DORIS_SEQUENCE_COL__ 列则支持,如果没有则不支持

使用示例

下面以stream load 为例 展示下使用方式

  1. 创建支持sequence column的表

表结构如下:

  1. MySQL > desc test_table;
  2. +-------------+--------------+------+-------+---------+---------+
  3. | Field | Type | Null | Key | Default | Extra |
  4. +-------------+--------------+------+-------+---------+---------+
  5. | user_id | BIGINT | No | true | NULL | |
  6. | date | DATE | No | true | NULL | |
  7. | group_id | BIGINT | No | true | NULL | |
  8. | modify_date | DATE | No | false | NULL | REPLACE |
  9. | keyword | VARCHAR(128) | No | false | NULL | REPLACE |
  10. +-------------+--------------+------+-------+---------+---------+
  1. 正常导入数据:

导入如下数据

  1. 1 2020-02-22 1 2020-02-22 a
  2. 1 2020-02-22 1 2020-02-22 b
  3. 1 2020-02-22 1 2020-03-05 c
  4. 1 2020-02-22 1 2020-02-26 d
  5. 1 2020-02-22 1 2020-02-22 e
  6. 1 2020-02-22 1 2020-02-22 b

此处以stream load为例, 将sequence column映射为modify_date列

  1. curl --location-trusted -u root: -H "function_column.sequence_col: modify_date" -T testData http://host:port/api/test/test_table/_stream_load

结果为

  1. MySQL > select * from test_table;
  2. +---------+------------+----------+-------------+---------+
  3. | user_id | date | group_id | modify_date | keyword |
  4. +---------+------------+----------+-------------+---------+
  5. | 1 | 2020-02-22 | 1 | 2020-03-05 | c |
  6. +---------+------------+----------+-------------+---------+

在这次导入中,因sequence column的值(也就是modify_date中的值)中’2020-03-05’为最大值,所以keyword列中最终保留了c。

  1. 替换顺序的保证

上述步骤完成后,接着导入如下数据

  1. 1 2020-02-22 1 2020-02-22 a
  2. 1 2020-02-22 1 2020-02-23 b

查询数据

  1. MySQL [test]> select * from test_table;
  2. +---------+------------+----------+-------------+---------+
  3. | user_id | date | group_id | modify_date | keyword |
  4. +---------+------------+----------+-------------+---------+
  5. | 1 | 2020-02-22 | 1 | 2020-03-05 | c |
  6. +---------+------------+----------+-------------+---------+

由于新导入的数据的sequence column都小于表中已有的值,无法替换 再尝试导入如下数据

  1. 1 2020-02-22 1 2020-02-22 a
  2. 1 2020-02-22 1 2020-03-23 w

查询数据

  1. MySQL [test]> select * from test_table;
  2. +---------+------------+----------+-------------+---------+
  3. | user_id | date | group_id | modify_date | keyword |
  4. +---------+------------+----------+-------------+---------+
  5. | 1 | 2020-02-22 | 1 | 2020-03-23 | w |
  6. +---------+------------+----------+-------------+---------+

此时就可以替换表中原有的数据