Doris On ES

Doris-On-ES not only take advantage of Doris’s distributed query planning capability but also ES (Elastic search)’s full-text search capability, provide a more complete OLAP scenario solution:

  1. Multi-index Distributed Join Query in ES
  2. Joint Query of Tables in Doris and ES, More Complex Full-Text Retrieval and Filtering

This document mainly introduces the realization principle and usage of this function.

Glossary

Noun in Doris

  • FE: Frontend, the front-end node of Doris. Responsible for metadata management and request access.
  • BE: Backend, Doris’s back-end node. Responsible for query execution and data storage.

Noun in ES

  • DataNode: The data storage and computing node of ES.
  • MasterNode: The Master node of ES, which manages metadata, nodes, data distribution, etc.
  • scroll: The built-in data set cursor feature of ES for streaming scanning and filtering of data.
  • _source: contains the original JSON document body that was passed at index time
  • doc_values: store the same values as the _source but in a column-oriented fashion
  • keyword: string datatype in ES, but the content not analyzed by analyzer
  • text: string datatype in ES, the content analyzed by analyzer

How To Use

Create ES Index

  1. PUT test
  2. {
  3. "settings": {
  4. "index": {
  5. "number_of_shards": "1",
  6. "number_of_replicas": "0"
  7. }
  8. },
  9. "mappings": {
  10. "doc": { // There is no need to specify the type when creating indexes after ES7.x version, there is one and only type of `_doc`
  11. "properties": {
  12. "k1": {
  13. "type": "long"
  14. },
  15. "k2": {
  16. "type": "date"
  17. },
  18. "k3": {
  19. "type": "keyword"
  20. },
  21. "k4": {
  22. "type": "text",
  23. "analyzer": "standard"
  24. },
  25. "k5": {
  26. "type": "float"
  27. }
  28. }
  29. }
  30. }
  31. }

Add JSON documents to ES index

  1. POST /_bulk
  2. {"index":{"_index":"test","_type":"doc"}}
  3. { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Elasticsearch", "k4": "Trying out Elasticsearch", "k5": 10.0}
  4. {"index":{"_index":"test","_type":"doc"}}
  5. { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Doris", "k4": "Trying out Doris", "k5": 10.0}
  6. {"index":{"_index":"test","_type":"doc"}}
  7. { "k1" : 100, "k2": "2020-01-01", "k3": "Doris On ES", "k4": "Doris On ES", "k5": 10.0}
  8. {"index":{"_index":"test","_type":"doc"}}
  9. { "k1" : 100, "k2": "2020-01-01", "k3": "Doris", "k4": "Doris", "k5": 10.0}
  10. {"index":{"_index":"test","_type":"doc"}}
  11. { "k1" : 100, "k2": "2020-01-01", "k3": "ES", "k4": "ES", "k5": 10.0}

Create external ES table

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH // ENGINE must be Elasticsearch
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test”,
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root"
  14. );

The following parameters are accepted by ES table:

ParameterDescription
hostsES Cluster Connection Address, maybe one or more node, load-balance is also accepted
indexthe related ES index name, alias is supported, and if you use doc_value, you need to use the real name
typethe type for this index, If not specified, _doc will be used
userusername for ES
passwordpassword for the user
  • For clusters before 7.x, please pay attention to choosing the correct type when building the table
  • The authentication method only supports Http Bastic authentication, need to ensure that this user has access to: /_cluster/state/, _nodes/http and other paths and index read permissions;The cluster has not turned on security authentication, and the user name and password do not need to be set
  • The column names in the Doris table need to exactly match the field names in the ES, and the field types should be as consistent as possible
  • ENGINE must be: Elasticsearch
Filter to push down

An important ability of Doris On ES is the push-down of filter conditions: The filtering conditions are pushed to ES, so that only the data that really meets the conditions will be returned, which can significantly improve query performance and reduce CPU, memory, and IO utilization of Doris and ES

The following operators (Operators) will be optimized to the following ES Query:

SQL syntaxES 5.x+ syntax
=term query
interms query
> , < , >= , ⇐range query
andbool.filter
orbool.should
notbool.must_not
not inbool.must_not + terms query
is_not_nullexists query
is_nullbool.must_not + exists query
esqueryQueryDSL in ES native json form
Data type mapping
Doris\ESbyteshortintegerlongfloatdoublekeywordtextdate
tinyint
smallint
int
bigint
float
double
char
varchar
date
datetime

Enable column scan to optimize query speed(enable_docvalue_scan=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test”,
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root",
  14. "enable_docvalue_scan" = "true"
  15. );

Parameter Description:

ParameterDescription
enable_docvalue_scanwhether to enable ES/Lucene column storage to get the value of the query field, the default is false

Doris obtains data from ES following the following two principles:

  • Best effort: Automatically detect whether the column to be read has column storage enabled (doc_value: true).If all the fields obtained have column storage, Doris will obtain the values ​​of all fields from the column storage(doc_values)
  • Automatic downgrade: If the field to be obtained has one or more field that is not have doc_value, the values ​​of all fields will be parsed from the line store _source
Advantage:

By default, Doris On ES will get all the required columns from the row storage, which is _source, and the storage of _source is the origin json format document,Inferior to column storage in batch read performance,Especially obvious when only a few columns are needed,When only a few columns are obtained, the performance of docvalue is about ten times that of _source

Tip
  1. Fields of type text are not column-stored in ES, so if the value of the field to be obtained has a field of type text, it will be automatically downgraded to get from _source
  2. In the case of too many fields obtained (>= 25), the performance of getting field values ​​from docvalue will be basically the same as getting field values ​​from _source

Detect keyword type field(enable_keyword_sniff=true)

  1. CREATE EXTERNAL TABLE `test` (
  2. `k1` bigint(20) COMMENT "",
  3. `k2` datetime COMMENT "",
  4. `k3` varchar(20) COMMENT "",
  5. `k4` varchar(100) COMMENT "",
  6. `k5` float COMMENT ""
  7. ) ENGINE=ELASTICSEARCH
  8. PROPERTIES (
  9. "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
  10. "index" = "test”,
  11. "type" = "doc",
  12. "user" = "root",
  13. "password" = "root",
  14. "enable_keyword_sniff" = "true"
  15. );

Parameter Description:

ParameterDescription
enable_keyword_sniffWhether to detect the string type (text) fields in ES to obtain additional not analyzed (keyword) field name(multi-fields mechanism)

You can directly import data without creating an index. At this time, ES will automatically create a new index in ES, For a field of type string, a field of type text and field of type keyword will be created meantime, This is the multi-fields feature of ES, mapping is as follows:

  1. "k4": {
  2. "type": "text",
  3. "fields": {
  4. "keyword": {
  5. "type": "keyword",
  6. "ignore_above": 256
  7. }
  8. }
  9. }

When performing conditional filtering on k4, for example =,Doris On ES will convert the query to ES’s TermQuery

SQL filter:

  1. k4 = "Doris On ES"

The query DSL converted into ES is:

  1. "term" : {
  2. "k4": "Doris On ES"
  3. }

Because the first field type of k4 is text, when data is imported, it will perform word segmentation processing according to the word segmentator set by k4 (if it is not set, it is the standard word segmenter) to get three Term of doris, on, and es, as follows ES analyze API analysis:

  1. POST /_analyze
  2. {
  3. "analyzer": "standard",
  4. "text": "Doris On ES"
  5. }

The result of analyzed is:

  1. {
  2. "tokens": [
  3. {
  4. "token": "doris",
  5. "start_offset": 0,
  6. "end_offset": 5,
  7. "type": "<ALPHANUM>",
  8. "position": 0
  9. },
  10. {
  11. "token": "on",
  12. "start_offset": 6,
  13. "end_offset": 8,
  14. "type": "<ALPHANUM>",
  15. "position": 1
  16. },
  17. {
  18. "token": "es",
  19. "start_offset": 9,
  20. "end_offset": 11,
  21. "type": "<ALPHANUM>",
  22. "position": 2
  23. }
  24. ]
  25. }

The query uses:

  1. "term" : {
  2. "k4": "Doris On ES"
  3. }

This term does not match any term in the dictionary,and will not return any results,enable enable_keyword_sniff: true will automatically convert k4 = "Doris On ES" into k4.keyword = "Doris On ES"to exactly match SQL semantics,The converted ES query DSL is:

  1. "term" : {
  2. "k4.keyword": "Doris On ES"
  3. }

The type of k4.keyword is keyword, and writing data into ES is a complete term, so it can be matched

Query usage

After create the ES external table in Doris, there is no difference except that the data model (rollup, pre-aggregation, materialized view, etc.) with other table in Doris

Basic usage

  1. select * from es_table where k1 > 1000 and k3 ='term' or k4 like 'fu*z_'

Extended esquery(field, QueryDSL)

Through the esquery(field, QueryDSL) function, some queries that cannot be expressed in sql, such as match_phrase, geoshape, etc., are pushed down to the ES for filtering. The first column name parameter of esquery is used to associate the index, the second This parameter is the basic JSON expression of ES’s Query DSL, which is contained in curly braces {}, and there can be only one root key of json, such as match_phrase, geo_shape, bool, etc. Match query:

  1. select * from es_table where esquery(k4, '{
  2. "match": {
  3. "k4": "doris on es"
  4. }
  5. }');

Geo related queries:

  1. select * from es_table where esquery(k4, '{
  2. "geo_shape": {
  3. "location": {
  4. "shape": {
  5. "type": "envelope",
  6. "coordinates": [
  7. [
  8. 13,
  9. 53
  10. ],
  11. [
  12. 14,
  13. 52
  14. ]
  15. ]
  16. },
  17. "relation": "within"
  18. }
  19. }
  20. }');

Bool query:

  1. select * from es_table where esquery(k4, ' {
  2. "bool": {
  3. "must": [
  4. {
  5. "terms": {
  6. "k1": [
  7. 11,
  8. 12
  9. ]
  10. }
  11. },
  12. {
  13. "terms": {
  14. "k2": [
  15. 100
  16. ]
  17. }
  18. }
  19. ]
  20. }
  21. }');

Principle

  1. +----------------------------------------------+
  2. | |
  3. | Doris +------------------+ |
  4. | | FE +--------------+-------+
  5. | | | Request Shard Location
  6. | +--+-------------+-+ | |
  7. | ^ ^ | |
  8. | | | | |
  9. | +-------------------+ +------------------+ | |
  10. | | | | | | | | |
  11. | | +----------+----+ | | +--+-----------+ | | |
  12. | | | BE | | | | BE | | | |
  13. | | +---------------+ | | +--------------+ | | |
  14. +----------------------------------------------+ |
  15. | | | | | | |
  16. | | | | | | |
  17. | HTTP SCROLL | | HTTP SCROLL | |
  18. +-----------+---------------------+------------+ |
  19. | | v | | v | | |
  20. | | +------+--------+ | | +------+-------+ | | |
  21. | | | | | | | | | | |
  22. | | | DataNode | | | | DataNode +<-----------+
  23. | | | | | | | | | | |
  24. | | | +<--------------------------------+
  25. | | +---------------+ | | |--------------| | | |
  26. | +-------------------+ +------------------+ | |
  27. | Same Physical Node | |
  28. | | |
  29. | +-----------------------+ | |
  30. | | | | |
  31. | | MasterNode +<-----------------+
  32. | ES | | |
  33. | +-----------------------+ |
  34. +----------------------------------------------+
  1. FE requests the hosts specified by the table to obtain node‘s HTTP port, shards location of the index. If the request fails, it will traverse the host list sequentially until it succeeds or fails completely.

  2. When querying, the query plan will be generated and sent to the corresponding BE node according to some node information obtained by FE and metadata information of index.

  3. The BE node requests locally deployed ES nodes in accordance with the proximity principle. The BE receives data concurrently from each fragment of ES index in the HTTP Scroll mode.

  4. After calculating the result, return it to client

Best Practices

Suggestions for using Date type fields

The use of Datetype fields in ES is very flexible, but in Doris On ES, if the type of the Date type field is not set properly, it will cause the filter condition can not be pushed down.

When creating an index, do maximum format compatibility with the setting of the Date type format:

  1. "dt": {
  2. "type": "date",
  3. "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
  4. }

When creating this field in Doris, it is recommended to set it to date or datetime, and it can also be set to varchar type. The following SQL statements can be used to directly push the filter condition down to ES

  1. select * from doe where k2 > '2020-06-21';
  2. select * from doe where k2 < '2020-06-21 12:00:00';
  3. select * from doe where k2 < 1593497011;
  4. select * from doe where k2 < now();
  5. select * from doe where k2 < date_format(now(), '%Y-%m-%d');

Notice:

  • If you don’t set the format for the time type field In ES, the default format for Date-type field is
  1. strict_date_optional_time||epoch_millis
  • If the date field indexed into ES is unix timestamp, it needs to be converted to ms, and the internal timestamp of ES is processed according to ms unit, otherwise Doris On ES will display wrong column data

Fetch ES metadata field _id

When indexing documents without specifying _id, ES will assign a globally unique _id field to each document. Users can also specify a _id with special represent some business meaning for the document when indexing; if needed, Doris On ES can get the value of this field by adding the _id field of type varchar when creating the ES external table

  1. CREATE EXTERNAL TABLE `doe` (
  2. `_id` varchar COMMENT "",
  3. `city` varchar COMMENT ""
  4. ) ENGINE=ELASTICSEARCH
  5. PROPERTIES (
  6. "hosts" = "http://127.0.0.1:8200",
  7. "user" = "root",
  8. "password" = "root",
  9. "index" = "doe",
  10. "type" = "doc"
  11. }

Notice:

  1. The filtering condition of the _id field only supports two types: = and in
  2. The _id field can only be of type varchar

Q&A

  1. ES Version Requirements

    The main version of ES is larger than 5. The scanning mode of ES data before 2. X and after 5. x is different. At present, the scanning mode of ES data after 5. x is supported.

  2. Does ES Cluster Support X-Pack Authentication

    Support all ES clusters using HTTP Basic authentication

  3. Some queries are much slower than requesting ES

    Yes, for example, query related to _count, etc., the ES internal will directly read the number of documents that meet the requirements of the relevant metadata, without the need to filter the real data.

  4. Whether the aggregation operation can be pushed down

    At present, Doris On ES does not support push-down operations such as sum, avg, min/max, etc., all documents satisfying the conditions are obtained from the ES in batch flow, and then calculated in Doris