Beats output plugin

This is an output implementation of elastic beats for support Filebeat, Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Heartbeat to Apache Doris.

This module is used to output data to Doris for elastic beats, use the HTTP protocol to interact with the Doris FE Http interface, and import data through Doris’s stream load.

Learn more about Doris Stream Load

Learn more about Doris

Compatibility

This output is developed and tested using Beats 7.3.1

Install

Download source code

  1. mkdir -p $GOPATH/src/github.com/apache/
  2. cd $GOPATH/src/github.com/apache/
  3. git clone https://github.com/apache/doris
  4. cd doris/extension/beats

Compile

  1. go build -o filebeat filebeat/filebeat.go
  2. go build -o metricbeat metricbeat/metricbeat.go
  3. go build -o winlogbeat winlogbeat/winlogbeat.go
  4. go build -o packetbeat packetbeat/packetbeat.go
  5. go build -o auditbeat auditbeat/auditbeat.go
  6. go build -o heartbeat heartbeat/heartbeat.go

You will get executables in various subdirectories

Usage

In this section, you can use the sample config file in the directory [./example/], or you can create it as follow steps.

Configure Beat

Add following configuration to *beat.yml

  1. output.doris:
  2. fenodes: ["http://localhost:8030"] # your doris fe address
  3. user: root # your doris user
  4. password: root # your doris password
  5. database: example_db # your doris database
  6. table: example_table # your doris table
  7. codec_format_string: "%{[message]}" # beat-event format expression to row data
  8. headers:
  9. column_separator: ","

Start Beat

Using filebeat as an example

  1. ./filebeat/filebeat -c filebeat.yml -e

Configurations

Connection doris configuration:

NameDescriptionDefault
fenodesFE’s HTTP interactive address eg : [“http://fe1:8030“, “http://fe2:8030“]
userUser name, the user needs to have import permission for the doris table
passwordPassword
databaseDatabase name
tableTable name
labelprefixImport the identification prefix, the final generated ID is {label_prefix}{db}{table}{time_stamp}doris_beats
line_delimiterUsed to specify the newline character in the imported data, the default is \n. Combinations of multiple characters can be used as newlines.\n
headersUsers can pass in stream-load import parameters through the headers.

Beats configuration:

NameDescriptionDefault
codec_format_stringSet the expression of format beat event, and the format result will be added into http body as a row data
codecBeats output codec and the format result will be added to http body as a row, Priority to use codec_format_string
timeoutSet the http client connection timeout
bulk_max_sizeThe maximum number of events processed per batch100000
max_retriesFilebeat ignores the max_retries setting and retries indefinitely.3
backoff.initThe number of seconds to wait before trying to reconnect after a network error.1
backoff.maxThe maximum number of seconds to wait before attempting to connect after a network error.60

Complete usage example of filebeat

Init Doris

  1. CREATE DATABASE example_db;
  2. CREATE TABLE example_db.example_table (
  3. id BIGINT,
  4. name VARCHAR(100)
  5. )
  6. UNIQUE KEY(`id`)
  7. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  8. PROPERTIES (
  9. "replication_num"="1"
  10. );

Configure Filebeat

Create /tmp/beats/filebeat.yml file and add following configuration:

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.log
  6. output.doris:
  7. fenodes: ["http://localhost:8030"] # your doris fe address
  8. user: root # your doris user
  9. password: root # your doris password
  10. database: example_db # your doris database
  11. table: example_table # your doris table
  12. codec_format_string: "%{[message]}"
  13. headers:
  14. column_separator: ","

Start Filebeat

  1. ./filebeat/filebeat -c /tmp/beats/filebeat.yml -e

Validate Load Data

Add write data to /tmp/beats/example.log

  1. echo -e "1,A\n2,B\n3,C\n4,D" >> /tmp/beats/example.log

Observe the filebeat log. If the error log is not printed, the import was successful. At this time, you can view the imported data in the example_db.example_table table

More configure examples

Specify columns

Make /tmp/beats/example.log and add following content:

  1. 1,A
  2. 2,B

Configure columns

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.log
  6. output.doris:
  7. ...
  8. codec_format_string: "%{[message]}"
  9. headers:
  10. columns: "id,name"

Collect json file

Make /tmp/beats/example.json and add following content:

  1. {"id": 1, "name": "A"}
  2. {"id": 2, "name": "B"}

Configure headers

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.json
  6. output.doris:
  7. ...
  8. codec_format_string: "%{[message]}"
  9. headers:
  10. format: json
  11. read_json_by_line: true

Codec output fields

Make /tmp/beats/example.log and add following content:

  1. 1,A
  2. 2,B

Configure codec_format_string

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.log
  6. output.doris:
  7. ...
  8. codec_format_string: "%{[message]},%{[@timestamp]},%{[@metadata.type]}"
  9. headers:
  10. columns: "id,name,beat_timestamp,beat_metadata_type"

FAQ

How to config batch commit size

Add following configuration to your beat.yml

This sample configuration forwards events to the doris if 10000 events are available or the oldest available event has been waiting for 5s in the mem queue:

  1. queue.mem:
  2. events: 10000
  3. flush.min_events: 10000
  4. flush.timeout: 5s

How to use other beats(e.g metricbeat)

Doris beats support all beats modules, see the Install and Usage

How to build docker image

You can package a docker image with an executable file of Install outputs