Flume Sinks

HDFS Sink

这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop,以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本

以下是支持的转义符:


转义符

解释

%{host}

Event header中key为host的值。这个host可以是任意的key,只要header中有就能读取,比如%{aabc}将读取header中key为aabc的值

%t

毫秒值的时间戳(同 System.currentTimeMillis() 方法)

%a

星期的缩写(Mon、Tue等)

%A

星期的全拼(Monday、 Tuesday等)

%b

月份的缩写(Jan、 Feb等)

%B

月份的全拼(January、February等)

%c

日期和时间(Thu Feb 14 23:05:25 2019)

%d

月份中的天(00到31)

%e

月份中的天(1到31)

%D

日期,与%m/%d/%y相同 ,例如:02/09/19

%H

小时(00到23)

%I

小时(01到12)

%j

年中的天数(001到366)

%k

小时(0到23),注意跟 %H的区别

%m

月份(01到12)

%n

月份(1到12)

%M

分钟(00到59)

%p

am或者pm

%s

unix时间戳,是秒值。比如2019/2/14 18:15:49的unix时间戳是:1550139349

%S

秒(00到59)

%y

一年中的最后两位数(00到99),比如1998年的%y就是98

%Y

年(2010这种格式)

%z

数字时区(比如:-0400)

%[localhost]

Agent实例所在主机的hostname

%[IP]

Agent实例所在主机的IP

%[FQDN]

Agent实例所在主机的规范hostname

注意,%[localhost], %[IP] 和 %[FQDN]这三个转义符实际上都是用java的API来获取的,在一些网络环境下可能会获取失败。

正在打开的文件会在名称末尾加上“.tmp”的后缀。文件关闭后,会自动删除此扩展名。这样容易排除目录中的那些已完成的文件。必需的参数已用 粗体 标明。

注解

对于所有与时间相关的转义字符,Event header中必须存在带有“timestamp”键的属性(除非 hdfs.useLocalTimeStamp 设置为 true )。快速自动添加此时间戳的一种方法是使用 时间戳添加拦截器


属性名

默认值

解释

channel



与 Sink 连接的 channel

type



组件类型,这个是: hdfs

hdfs.path



HDFS目录路径(例如:hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Flume在HDFS文件夹下创建新文件的固定前缀

hdfs.fileSuffix



Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)

hdfs.inUsePrefix



Flume正在写入的临时文件前缀,默认没有

hdfs.inUseSuffix

.tmp

Flume正在写入的临时文件后缀

hdfs.rollInterval

30

当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒

hdfs.rollSize

1024

当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节

hdfs.rollCount

10

当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)

hdfs.idleTimeout

0

关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒

hdfs.batchSize

100

向 HDFS 写入内容时每次批量操作的 Event 数量

hdfs.codeC



压缩算法。可选值:gzipbzip2lzolzop` 、 ``snappy

hdfs.fileType

SequenceFile

文件格式,目前支持: SequenceFileDataStreamCompressedStream 。1. DataStream 不会压缩文件,不需要设置hdfs.codeC2. CompressedStream 必须设置hdfs.codeC参数

hdfs.maxOpenFiles

5000

允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭

hdfs.minBlockReplicas



指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。

hdfs.writeFormat

Writable

文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。

hdfs.callTimeout

10000

允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)

hdfs.threadsPoolSize

10

每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)

hdfs.rollTimerPoolSize

1

每个HDFS Sink实例调度定时文件滚动的线程数

hdfs.kerberosPrincipal



用于安全访问 HDFS 的 Kerberos 用户主体

hdfs.kerberosKeytab



用于安全访问 HDFS 的 Kerberos keytab 文件

hdfs.proxyUser

代理名

hdfs.round

false

是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)

hdfs.roundValue

1

向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit )例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute如果roundValue=5,则时间戳会取为:18:30如果roundValue=7,则时间戳会取为:18:28如果roundValue=10,则时间戳会取为:18:30

hdfs.roundUnit

second

向下舍入的单位,可选值: secondminutehour

hdfs.timeZone

Local Time

解析存储目录路径时候所使用的时区名,例如:America/LosAngeles、Asia/Shanghai

hdfs.useLocalTimeStamp

false

使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)

hdfs.closeTries

0

开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。

如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;

如果设置为0,Sink会一直尝试重命名文件直到成功为止;

关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。

hdfs.retryInterval

180

连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。

serializer

TEXT

Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。

serializer.*

根据上面 _serializer 配置的类型来根据需要添加序列化器的参数

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = hdfs
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
  6. a1.sinks.k1.hdfs.filePrefix = events-
  7. a1.sinks.k1.hdfs.round = true
  8. a1.sinks.k1.hdfs.roundValue = 10
  9. a1.sinks.k1.hdfs.roundUnit = minute

上面的例子中时间戳会向前一个整10分钟取整。比如,一个 Event 的 header 中带的时间戳是11:54:34 AM, June 12, 2012,它会保存的 HDFS 路径就是/flume/events/2012-06-12/1150/00。

Hive Sink

此Sink将包含分隔文本或JSON数据的 Event 直接流式传输到 Hive表或分区上。 Event 使用 Hive事务进行写入, 一旦将一组 Event 提交给Hive,它们就会立即显示给Hive查询。即将写入的目标分区既可以预先自己创建,也可以选择让 Flume 创建它们,如果没有的话。 写入的 Event 数据中的字段将映射到 Hive表中的相应列。


属性

默认值

解释

channel



与 Sink 连接的 channel

type



组件类型,这个是: hive

hive.metastore



Hive metastore URI (eg thrift://a.b.com:9083 )

hive.database



Hive 数据库名

hive.table



Hive表名

hive.partition



逗号分隔的要写入的分区信息。比如hive表的分区是(continent: string, country :string, time : string),那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。

hive.txnsPerBatchAsk

100

Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。

heartBeatInterval

240

发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。

autoCreatePartitions

true

Flume 会自动创建必要的 Hive分区以进行流式传输

batchSize

15000

写入一个 Hive事务中最大的 Event 数量

maxOpenConnections

500

允许打开的最大连接数。如果超过此数量,则关闭最近最少使用的连接。

callTimeout

10000

Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。

serializer

序列化器负责解析 Event 中的字段并把它们映射到 Hive表中的列,选择哪种序列化器取决于 Event 中的数据格式,支持的序列化器有:DELIMITEDJSON

round

false

是否启用时间戳舍入机制

roundUnit

minute

舍入值的单位,可选值:secondminutehour

roundValue

1

舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位)例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10;例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30;

timeZone

Local Time

应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等

useLocalTimeStamp

false

替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp )

下面介绍Hive Sink的两个序列化器:

JSON :处理UTF8编码的 Json 格式(严格语法)Event,不需要配置。 JSON中的对象名称直接映射到Hive表中具有相同名称的列。 内部使用 org.apache.hive.hcatalog.data.JsonSerDe ,但独立于 Hive表的 Serde 。此序列化程序需要安装 HCatalog。

DELIMITED: 处理简单的分隔文本 Event。 内部使用 LazySimpleSerde,但独立于 Hive表的 Serde。


属性

默认值

解释

serializer.delimiter

,

(类型:字符串)传入数据中的字段分隔符。 要使用特殊字符,请用双引号括起来,例如“\t”

serializer.fieldnames



从输入字段到Hive表中的列的映射。 指定为Hive表列名称的逗号分隔列表(无空格),按顺序标识输入字段。要跳过字段,请保留未指定的列名称。 例如, ‘time,,ip,message’表示输入映射到hive表中的 time,ip 和 message 列的第1,第3和第4个字段。

serializer.serdeSeparator

Ctrl-A

(类型:字符)自定义底层序列化器的分隔符。如果 serializer.fieldnames 中的字段与 Hive表列的顺序相同,则 serializer.delimiterserializer.serdeSeparator 相同,并且 serializer.fieldnames 中的字段数小于或等于表的字段数量,可以提高效率,因为传入 Event 正文中的字段不需要重新排序以匹配 Hive表列的顺序。对于’\t’这样的特殊字符使用单引号,要确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将本参数也设置为相同的字符。

以下是支持的转义符:


转义符

解释

%{host}

Event header中 key 为 host 的值。这个 host 可以是任意的 key,只要 header 中有就能读取,比如%{aabc}将读取 header 中 key 为 aabc 的值

%t

毫秒值的时间戳(同 System.currentTimeMillis() 方法)

%a

星期的缩写(Mon、Tue等)

%A

星期的全拼(Monday、 Tuesday等)

%b

月份的缩写(Jan、 Feb等)

%B

月份的全拼(January、February等)

%c

日期和时间(Thu Feb 14 23:05:25 2019)

%d

月份中的天(00到31)

%D

日期,与%m/%d/%y相同 ,例如:02/09/19

%H

小时(00到23)

%I

小时(01到12)

%j

年中的天数(001到366)

%k

小时(0到23),注意跟 %H 的区别

%m

月份(01到12)

%M

分钟(00到59)

%p

am 或者 pm

%s

unix时间戳,是秒值。比如:2019/4/1 15:12:47 的unix时间戳是:1554102767

%S

秒(00到59)

%y

一年中的最后两位数(00到99),比如1998年的%y就是98

%Y

年(2010这种格式)

%z

数字时区(比如:-0400)

注解

对于所有与时间相关的转义字符,Event header 中必须存在带有“timestamp”键的属性(除非 useLocalTimeStamp 设置为 true )。快速添加此时间戳的一种方法是使用 时间戳添加拦截器 ( TimestampInterceptor)。

假设Hive表如下:

  1. create table weblogs ( id int , msg string )
  2. partitioned by (continent string, country string, time string)
  3. clustered by (id) into 5 buckets
  4. stored as orc;

配置范例:

  1. a1.channels = c1
  2. a1.channels.c1.type = memory
  3. a1.sinks = k1
  4. a1.sinks.k1.type = hive
  5. a1.sinks.k1.channel = c1
  6. a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
  7. a1.sinks.k1.hive.database = logsdb
  8. a1.sinks.k1.hive.table = weblogs
  9. a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
  10. a1.sinks.k1.useLocalTimeStamp = false
  11. a1.sinks.k1.round = true
  12. a1.sinks.k1.roundValue = 10
  13. a1.sinks.k1.roundUnit = minute
  14. a1.sinks.k1.serializer = DELIMITED
  15. a1.sinks.k1.serializer.delimiter = "\t"
  16. a1.sinks.k1.serializer.serdeSeparator = '\t'
  17. a1.sinks.k1.serializer.fieldnames =id,,msg

以上配置会将时间戳向下舍入到最后10分钟。 例如,将时间戳标头设置为2019年4月1日下午15:21:34且“country”标头设置为“india”的Event将评估为分区(continent =’asia’,country =’india’,time =’2019-04-01-15-20’。序列化程序配置为接收包含三个字段的制表符分隔的输入并跳过第二个字段。

Logger Sink

使用INFO级别把Event内容输出到日志中,一般用来测试、调试使用。这个 Sink 是唯一一个不需要额外配置就能把 Event 的原始内容输出的Sink,参照 输出原始数据到日志

提示

输出原始数据到日志 一节中说过,通常在Flume的运行日志里面输出数据流中的原始的数据内容是非常不可取的,所以 Flume 的组件默认都不会这么做。但是总有特殊的情况想要把 Event 内容打印出来,就可以借助这个Logger Sink了。

必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: logger

maxBytesToLog

16

Event body 输出到日志的最大字节数,超出的部分会被丢弃

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = logger
  4. a1.sinks.k1.channel = c1

Avro Sink

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为Avro Event发送到指定的主机/端口上。Event 从 channel 中批量获取,数量根据配置的 batch-size 而定。必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: avro.

hostname



监听的服务器名(hostname)或者 IP

port



监听的端口

batch-size

100

每次批量发送的 Event 数

connect-timeout

20000

第一次连接请求(握手)的超时时间,单位:毫秒

request-timeout

20000

请求超时时间,单位:毫秒

reset-connection-interval

none

重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。

compression-type

none

压缩类型。可选值: nonedeflate 。压缩类型必须与上一级Avro Source 配置的一致

compression-level

6

Event的压缩级别0:不压缩,1-9:进行压缩,数字越大,压缩率越高

ssl

false

设置为 true 表示Sink开启 SSL下面的 truststoretruststore-passwordtruststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs

trust-all-certs

false

如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。

truststore



自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为Oracle JRE中的“jssecacerts”或“cacerts”)。

truststore-password



上面配置的信任库的密码

truststore-type

JKS

Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型

exclude-protocols

SSLv3

要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。

maxIoWorkers

2 * 机器上可用的处理器核心数量

I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = avro
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hostname = 10.10.10.10
  6. a1.sinks.k1.port = 4545

Thrift Sink

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为 Thrift Event 发送到指定的主机/端口上。Event 从 channel 中获取批量获取,数量根据配置的 batch-size 而定。可以通过启用 kerberos 身份验证将 Thrift Sink 以安全模式启动。如果想以安全模式与 Thrift Source 通信,那么 Thrift Sink 也必须以安全模式运行。 client-principalclient-keytab是 Thrift Sink 用于向 kerberos KDC 进行身份验证的配置参数。 server-principal 表示此Sink将要以安全模式连接的 Thrift Source 的主体,必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: thrift.

hostname



远程 Thrift 服务的主机名或 IP

port



远程 Thrift 的端口

batch-size

100

一起批量发送 Event 数量

connect-timeout

20000

第一次连接请求(握手)的超时时间,单位:毫秒

request-timeout

20000

请求超时时间,单位:毫秒

reset-connection-interval

none

重置连接到下一跳之前的时间量(秒)。 这将强制 Thrift Sink 重新连接到下一跳。 允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。

ssl

false

设置为 true 表示Sink开启 SSL。下面的 truststoretruststore-passwordtruststore-type 就是开启 SSL 后使用的参数

truststore



自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为 Oracle JRE 中的“jssecacerts”或“cacerts”)。

truststore-password



上面配置的信任库的密码

truststore-type

JKS

Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型

exclude-protocols

SSLv3

要排除的以空格分隔的 SSL/TLS 协议列表

kerberos

false

设置为 true 开启 kerberos 身份验证。在 kerberos 模式下,需要 client-principalclient-keytabserver-principal 才能成功进行身份验证并与启用了 kerberos 的 Thrift Source 进行通信。

client-principal

—-

Thrift Sink 用来向 kerberos KDC 进行身份验证的 kerberos 主体。

client-keytab

—-

Thrift Sink 与 client-principal 结合使用的 keytab 文件路径,用于对 kerberos KDC 进行身份验证。

server-principal



Thrift Sink 将要连接到的 Thrift Source 的 kerberos 主体。

提示

官方英文文档 connection-reset-interval 这个参数是错误的,在源码里面是 reset-connection-interval ,本文档已经纠正。

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = thrift
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hostname = 10.10.10.10
  6. a1.sinks.k1.port = 4545

IRC Sink

IRC sink 从连接的 channel 获取消息然后将这些消息中继到配置的 IRC 目标上。必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: irc

hostname



要连接的服务器名(hostname )或 IP

port

6667

要连接的远程服务器端口

nick



昵称

user



用户名

password



密码

chan



频道

name

真实姓名

splitlines

false

是否分割消息后进行发送

splitchars

\n

行分隔符如果上面 splitlines 设置为 true ,会使用这个分隔符把消息体先进行分割再逐个发送,如果你要在配置文件中配置默认值,那么你需要一个转义符,像这样:“ \n”

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = irc
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.hostname = irc.yourdomain.com
  6. a1.sinks.k1.nick = flume
  7. a1.sinks.k1.chan = #flume

File Roll Sink

把 Event 存储到本地文件系统。必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: fileroll.

sink.directory



Event 将要保存的目录

sink.pathManager

DEFAULT

配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: defaultrolltime

default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;

rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;

注:prefix 和 extension 如果没有配置则不会附带

sink.pathManager.extension



如果上面的 _pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名

sink.pathManager.prefix



如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀

sink.rollInterval

30

表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。

sink.serializer

TEXT

配置 Event 序列化器,可选值有:textheader_and_textavro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。text 只会把 Event 的 body 的文本内容序列化;header_and_text 会把 header 和 body 内容都序列化。

batchSize

100

每次请求批处理的 Event 数

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = file_roll
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.sink.directory = /var/log/flume

Null Sink

丢弃所有从 channel 读取到的 Event。必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: null.

batchSize

100

每次批处理的 Event 数量

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = null
  4. a1.sinks.k1.channel = c1

HBaseSinks

HBaseSink

此Sink将数据写入 HBase。 Hbase 配置是从classpath中遇到的第一个 hbase-site.xml 中获取的。 配置指定的 HbaseEventSerializer 接口的实现类用于将 Event 转换为 HBase put 或 increments。然后将这些 put 和 increments 写入 HBase。 该Sink提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。 如果 Hbase 无法写入某些 Event,则Sink将重试该事务中的所有 Event。

这个Sink支持以安全的方式把数据写入到 HBase。为了使用安全写入模式,运行 Flume 实例的用户必须有写入 HBase 目标表的写入权限。可以在配置中指定用于对 KDC 进行身份验证的主体和密钥表。Flume 的 classpath 中的 hbase-site.xml 必须将身份验证设置为 kerberos(有关如何执行此操作的详细信息,请参阅HBase文档)。

Flume提供了两个序列化器。第一个序列化器是 SimpleHbaseEventSerializer ( org.apache.flume.sink.hbase.SimpleHbaseEventSerializer ) ,它把 Event body 原样写入到HBase,并可选增加HBase列,这个实现主要就是提供个例子。第二个序列化器是 RegexHbaseEventSerializer ( org.apache.flume.sink.hbase.RegexHbaseEventSerializer ) ,它把 Event body 按照给定的正则进行分割然后写入到不同的列中。

必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: hbase

table



要写入的 Hbase 表名

columnFamily



要写入的 Hbase 列族

zookeeperQuorum



Zookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值

znodeParent

/hbase

ZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml中 zookeeper.znode.parent 的值。

batchSize

100

每个事务写入的 Event 数量

coalesceIncrements

false

每次提交时,Sink是否合并多个 increment 到一个 cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能。

serializer

org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

指定序列化器。默认的increment column = “iCol”, payload column = “pCol”。

serializer.*



序列化器的属性

kerberosPrincipal



以安全方式访问 HBase 的 Kerberos 用户主体

kerberosKeytab



以安全方式访问 HBase 的 Kerberos keytab 文件目录

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = hbase
  4. a1.sinks.k1.table = foo_table
  5. a1.sinks.k1.columnFamily = bar_cf
  6. a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
  7. a1.sinks.k1.channel = c1
AsyncHBaseSink

这个Sink使用异步模型将数据写入 HBase。这个Sink使用 AsyncHbaseEventSerializer 这个序列化器来转换 Event 为 HBase 的 put 和 increment,然后写入到 HBase。此Sink使用 Asynchbase API 来写入 HBase。该Sink提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。 如果 Hbase 无法写入某些 Event,则Sink将重试该事务中的所有 Event。必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: asynchbase

table



要写入的 Hbase 表名

zookeeperQuorum



Zookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值

znodeParent

/hbase

ZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml 中 zookeeper.znode.parent 的值。

columnFamily



要写入的 Hbase 列族

batchSize

100

每个事务写入的 Event 数量

coalesceIncrements

false

每次提交时,Sink是否合并多个 increment 到一个cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能。

timeout

60000

Sink为事务中所有 Event 等待来自 HBase 响应的超时时间(毫秒)

serializer

org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

序列化器

serializer.*



序列化器的一些属性

如果配置文件中没有提供这些参数配置,Sink就会从 classpath 中第一个 hbase-site.xml 中读取这些需要的配置信息。

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = asynchbase
  4. a1.sinks.k1.table = foo_table
  5. a1.sinks.k1.columnFamily = bar_cf
  6. a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
  7. a1.sinks.k1.channel = c1

MorphlineSolrSink

此Sink从 Flume的 Event 中提取数据,对其进行转换,并将其近乎实时地加载到 Apache Solr 服务器中,后者又向最终用户或搜索应用程序提供查询服务。

此Sink非常适合将原始数据流式传输到 HDFS(通过HDFS Sink)并同时提取、转换并将相同数据加载到 Solr(通过MorphlineSolrSink)的使用场景。特别是,此Sink可以处理来自不同数据源的任意异构原始数据,并将其转换为对搜索应用程序有用的数据模型。

ETL 功能可使用 morphline 的配置文件进行自定义,该文件定义了一系列转换命令,用于将 Event 从一个命令传递到另一个命令。

Morphlines 可以看作是 Unix 管道的演变,其中数据模型被推广为使用通用记录流,包括任意二进制有效载荷。 morphline 命令有点像 Flume 拦截器。 Morphlines 可以嵌入到 Flume 等 Hadoop 组件中。

用于解析和转换一组标准数据格式(如日志文件,Avro,CSV,文本,HTML,XML,PDF,Word,Excel等)的命令是开箱即用的,还有其他自定义命令和解析器用于其他数据格式可以作为插件添加到 morphline。可以索引任何类型的数据格式,并且可以生成任何类型的 Solr 模式的任何 Solr 文档,也可以注册和执行任何自定义 ETL 逻辑。

Morphlines 操纵连续的数据流。数据模型可以描述如下:数据记录是一组命名字段,其中每个字段具有一个或多个值的有序列表。值可以是任何Java对象。也就是说,数据记录本质上是一个哈希表,其中每个哈希表条目包含一个 String 键和一个 Java 对象列表作为值。 (该实现使用 Guava 的 ArrayListMultimap,它是一个 ListMultimap)。请注意,字段可以具有多个值,并且任何两个记录都不需要使用公共字段名称。

此Sink将 Flume Event 的 body 填充到 morphline 记录的 _attachment_body 字段中,并将 Flume Event 的 header 复制到同名的记录字段中。然后命令可以对此数据执行操作。

支持路由到 SolrCloud 集群以提高可伸缩性。索引负载可以分布在大量 MorphlineSolrSinks 上,以提高可伸缩性。可以跨多个 MorphlineSolrSinks 复制索引负载以实现高可用性,例如使用 Flume的负载均衡特性。 MorphlineInterceptor 还可以帮助实现到多个 Solr 集合的动态路由(例如,用于多租户)。

老规矩,morphline 和 solr 的 jar 包需要放在 Flume 的 lib 目录中。

必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: org.apache.flume.sink.solr.morphline.MorphlineSolrSink

morphlineFile



morphline 配置文件的相对或者绝对路径,例如:/etc/flume-ng/conf/morphline.conf

morphlineId

null

如果 morphline 文件里配置了多个 morphline 实例,可以用这个参数来标识 morphline 作为一个可选名字

batchSize

1000

单个事务操作的最大 Event 数量

batchDurationMillis

1000

事务的最大超时时间(毫秒)。达到这个时间或者达到 batchSize 都会触发提交事物。

handlerClass

org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl

实现了 org.apache.flume.sink.solr.morphline.MorphlineHandler 接口的实现类的全限定类名

isProductionMode

false

重要的任务和大规模的生产系统应该启用这个模式,这些系统需要在发生不可恢复的异常时不停机来获取信息。未知的 Solr 架构字段相关的错误、损坏或格式错误的解析器输入数据、解析器错误等都会产生不可恢复的异常。

recoverableExceptionClasses

org.apache.solr.client.solrj.SolrServerException

以逗号分隔的可恢复异常列表,这些异常往往是暂时的,在这种情况下,可以进行相应地重试。 比如:网络连接错误,超时等。当 isProductionMode 标志设置为 true 时,使用此参数配置的可恢复异常将不会被忽略,并且会进行重试。

isIgnoringRecoverableExceptions

false

如果不可恢复的异常被意外错误分类为可恢复,则应启用这个标志。 这使得Sink能够取得进展并避免永远重试一个 Event。

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
  6. # a1.sinks.k1.morphlineId = morphline1
  7. # a1.sinks.k1.batchSize = 1000
  8. # a1.sinks.k1.batchDurationMillis = 1000

ElasticSearchSink

这个Sink把数据写入到 elasticsearch 集群,就像 logstash 一样把 Event 写入以便 Kibana 图形接口可以查询并展示。

必须将环境所需的 elasticsearch 和 lucene-core jar 放在 Flume 安装的 lib 目录中。 Elasticsearch 要求客户端 JAR 的主要版本与服务器的主要版本匹配,并且两者都运行相同的 JVM 次要版本。如果版本不正确,会报 SerializationExceptions 异常。要选择所需的版本,请首先确定 elasticsearch 的版本以及目标群集正在运行的 JVM 版本。然后选择与主要版本匹配的 elasticsearch 客户端库。 0.19.x客户端可以与0.19.x群集通信; 0.20.x可以与0.20.x对话,0.90.x可以与0.90.x对话。确定 elasticsearch 版本后,读取 pom.xml 文件以确定要使用的正确 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 实例程序也应该与目标集群运行的次要版本的 JVM 相匹配。

所有的 Event 每天会被写入到新的索引,名称是<indexName>-yyyy-MM-dd的格式,其中<indexName>可以自定义配置。Sink将在午夜 UTC 开始写入新索引。

默认情况下,Event 会被 ElasticSearchLogStashEventSerializer 序列化器进行序列化。可以通过 serializer 参数配置来更改序和自定义列化器。这个参数可以配置 org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializerorg.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory 接口的实现类,ElasticSearchEventSerializer 现在已经不建议使用了,推荐使用更强大的后者。

必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: org.apache.flume.sink.elasticsearch.ElasticSearchSink

hostNames



逗号分隔的hostname:port列表,如果端口不存在,则使用默认的9300端口

indexName

flume

指定索引名称的前缀。比如:默认是“flume”,使用的索引名称就是 flume-yyyy-MM-dd 这种格式。也支持 header 属性替换的方式,比如%{lyf}就会用 Event header 中的属性名为 lyf 的值。

indexType

logs

文档的索引类型。默认为 log,也支持 header 属性替换的方式,比如%{lyf}就会用 Event header 中的属性名为 lyf 的值。

clusterName

elasticsearch

要连接的 ElasticSearch 集群名称

batchSize

100

每个事务写入的 Event 数量

ttl



TTL 以天为单位,设置了会导致过期文档自动删除,如果没有设置,文档将永远不会被自动删除。 TTL 仅以较早的整数形式被接受,例如 a1.sinks.k1.ttl = 5并且还具有限定符 ms (毫秒), s (秒), m (分钟), h (小时), d (天)和 w (星期)。示例a1.sinks.k1.ttl = 5d 表示将TTL设置为5天。 点击 http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ 了解更多信息。

serializer

org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

序列化器必须实现 ElasticSearchEventSerializerElasticSearchIndexRequestBuilderFactory 接口,推荐使用后者。

serializer.*



序列化器的一些属性配置

注解

使用 header 替换可以方便地通过 header 中的值来动态地决定存储 Event 时要时候用的 indexName 和 indexType。使用此功能时应谨慎,因为 Event 提交者可以控制 indexName 和 indexType。此外,如果使用 elasticsearch REST 客户端,则 Event 提交者可以控制所使用的URL路径。

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = elasticsearch
  4. a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
  5. a1.sinks.k1.indexName = foo_index
  6. a1.sinks.k1.indexType = bar_type
  7. a1.sinks.k1.clusterName = foobar_cluster
  8. a1.sinks.k1.batchSize = 500
  9. a1.sinks.k1.ttl = 5d
  10. a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
  11. a1.sinks.k1.channel = c1

Kite Dataset Sink

这是一个将 Event 写入到 Kite 的实验性的Sink。这个Sink会反序列化每一个 Event body,并将结果存储到 Kite Dataset。它通过按URI加载数据集来确定目标数据集。

唯一支持的序列化方式是 avro,并且必须在在 Event header 中传递数据的结构,使用 flume.avro.schema.literal 加 json 格式的结构信息表示,或者用 flume.avro.schema.url 加一个能够获取到结构信息的URL(比如hdfs:/…这种)。这与使用deserializer.schemaType = LITERAL的 Log4jAppender 和 Spooling Directory Source 的 avro 反序列化器兼容。

注解

1、flume.avro.schema.hash 这个 header 不支持;2、在某些情况下,在超过滚动间隔后会略微发生文件滚动,但是这个延迟不会超过5秒钟,大多数情况下这个延迟是可以忽略的。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: org.apache.flume.sink.kite.DatasetSink

kite.dataset.uri



要打开的数据集的 URI

kite.repo.uri



要打开的存储库的 URI( 不建议使用 ,请改用 kite.dataset.uri

kite.dataset.namespace



将写入记录的数据集命名空间( 不建议使用 ,请改用 kite.dataset.uri

kite.dataset.name



将写入记录的数据集名称( 不建议使用 ,请改用 kite.dataset.uri

kite.batchSize

100

每批中要处理的记录数

kite.rollInterval

30

释放数据文件之前的最长等待时间(秒)

kite.flushable.commitOnBatch

true

如果为 true,Flume 在每次批量操作 kite.batchSize 数据后提交事务并刷新 writer。 此设置仅适用于可刷新数据集。 如果为 true,则可以将具有提交数据的临时文件保留在数据集目录中。需要手动恢复这些文件,以使数据对 DatasetReaders 可见。

kite.syncable.syncOnBatch

true

Sink在提交事务时是否也将同步数据。 此设置仅适用于可同步数据集。 同步操作能保证数据将写入远程系统上的可靠存储上,同时保证数据已经离开Flume客户端的缓冲区(也就是 channel)。当 thekite.flushable.commitOnBatch 属性设置为 false 时,此属性也必须设置为 false

kite.entityParser

avro

将 Flume Event 转换为 kite 实体的转换器。取值可以是 avro 或者 EntityParser.Builder 接口实现类的全限定类名

kite.failurePolicy

retry

发生不可恢复的异常时采取的策略。例如 Event header 中缺少结构信息。默认采取重试的策略。其他可选的值有: save ,这样会把 Event 原始内容写入到 kite.error.dataset.uri 这个数据集。还可以填自定义的处理策略类的全限定类名(需实现 FailurePolicy.Builder 接口)

kite.error.dataset.uri



保存失败的 Event 存储的数据集。当上面的参数 kite.failurePolicy 设置为 save 时,此参数必须进行配置。

auth.kerberosPrincipal



用于 HDFS 安全身份验证的 Kerberos 用户主体

auth.kerberosKeytab



Kerberos 安全验证主体的 keytab 本地文件系统路径

auth.proxyUser



HDFS 操作的用户,如果与 kerberos 主体不同的话

Kafka Sink

这个 Sink 可以把数据发送到 Kafka topic上。目的就是将 Flume 与 Kafka 集成,以便基于拉的处理系统可以处理来自各种 Flume Source 的数据。目前支持 Kafka 0.9.x 发行版。

Flume1.8 不再支持 Kafka 0.9.x(不包括0.9.x)以前的版本。

必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink

kafka.bootstrap.servers



Kafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔

kafka.topic

default-flume-topic

用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性,Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换,比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。

flumeBatchSize

100

一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。

kafka.producer.acks

1

在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。

useFlumeEventFormat

false

默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。

defaultPartitionId



指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发

partitionIdHeader



设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID

allowTopicOverride

true

如果设置为 true,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。

topicHeader

topic

与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称

kafka.producer.security.protocol

PLAINTEXT

设置使用哪种安全协议写入 Kafka。可选值:SASLPLAINTEXTSASL_SSLSSL, 有关安全设置的其他信息,请参见下文。

_more producer security props

如果使用了 SASLPLAINTEXTSASL_SSLSSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置

Other Kafka Producer Properties



其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:_kafka.producer.linger.ms

注解

Kafka Sink使用 Event header 中的 topic 和其他关键属性将 Event 发送到 Kafka。 如果 header 中存在 topic,则会将Event发送到该特定 topic,从而覆盖为Sink配置的 topic。如果 header 中存在指定分区相关的参数,则Kafka将使用相关参数发送到指定分区。 header中特定参数相同的 Event 将被发送到同一分区。 如果为空,则将 Event 会被发送到随机分区。Kafka Sink 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

弃用的一些参数:


属性

默认值

解释

brokerList



改用 kafka.bootstrap.servers

topic

default-flume-topic

改用 kafka.topic

batchSize

100

改用 kafka.flumeBatchSize

requiredAcks

1

改用 kafka.producer.acks

下面给出 Kafka Sink 的配置示例。Kafka 生产者的属性都是以 kafka.producer 为前缀。Kafka 生产者的属性不限于下面示例的几个。此外,可以在此处包含您的自定义属性,并通过作为方法参数传入的Flume Context对象在预处理器中访问它们。

  1. a1.sinks.k1.channel = c1
  2. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  3. a1.sinks.k1.kafka.topic = mytopic
  4. a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
  5. a1.sinks.k1.kafka.flumeBatchSize = 20
  6. a1.sinks.k1.kafka.producer.acks = 1
  7. a1.sinks.k1.kafka.producer.linger.ms = 1
  8. a1.sinks.k1.kafka.producer.compression.type = snappy

安全与加密

Flume 和 Kafka 之间通信渠道是支持安全认证和数据加密的。对于身份安全验证,可以使用 Kafka 0.9.0版本中的 SASL、GSSAPI (Kerberos V5) 或 SSL (虽然名字是SSL,实际是TLS实现)。

截至目前,数据加密仅由SSL / TLS提供。

Setting kafka.producer.security.protocol to any of the following value means:

当你把 kafka.producer.security.protocol 设置下面任何一个值的时候意味着:

  • SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证

  • SASL_SSL - 有数据加密的 Kerberos 或明文认证

  • SSL - 基于TLS的加密,可选的身份验证

警告

启用 SSL 时性能会下降,影响大小取决于 CPU 和 JVM 实现。参考 Kafka security overviewKAFKA-2561

使用TLS

请阅读 Configuring Kafka Clients SSL 中描述的步骤来了解用于微调的其他配置设置,例如下面的几个例子:启用安全策略、密码套件、启用协议、信任库或秘钥库类型。

服务端认证和数据加密的一个配置实例:

  1. a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  2. a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sinks.sink1.kafka.topic = mytopic
  4. a1.sinks.sink1.kafka.producer.security.protocol = SSL
  5. a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  6. a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

注意,默认情况下 ssl.endpoint.identification.algorithm 这个参数没有被定义,因此不会执行主机名验证。如果要启用主机名验证,请加入以下配置:

  1. a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

开启后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

如果还需要客户端身份验证,则还应在 Flume 配置中添加以下内容。 每个Flume 实例都必须拥有其客户证书,来被Kafka 实例单独或通过其签名链来信任。 常见示例是由 Kafka 信任的单个根CA签署每个客户端证书。

  1. a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
  2. a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则 ssl.key.password 属性将为生产者密钥库提供所需的额外密码:

  1. a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos安全配置:

  • 要将Kafka Sink 与使用 Kerberos 保护的Kafka群集一起使用,请为生产者设置上面提到的 producer.security.protocol 属性。 与 Kafka 实例一起使用的 Kerberos keytab 和主体在 JAAS 文件的“KafkaClient”部分中指定。
  • “客户端”部分描述了 Zookeeper 连接信息(如果需要)。 有关 JAAS 文件内容的信息,请参阅 Kafka doc。可以通过 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及系统范围的 kerberos 配置:
  1. JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
  2. JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

  1. a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  2. a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sinks.sink1.kafka.topic = mytopic
  4. a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
  5. a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
  6. a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的安全配置范例:

  1. a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  2. a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sinks.sink1.kafka.topic = mytopic
  4. a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
  5. a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
  6. a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
  7. a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  8. a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

JAAS 文件配置示例。有关其内容的参考,请参阅Kafka文档 SASL configuration 中关于所需认证机制(GSSAPI/PLAIN)的客户端配置部分。与 Kafka Source 和 Kafka Channel 不同,“Client”部分并不是必须的,除非其他组件需要它,否则不必要这样做。 另外,请确保 Flume 进程的操作系统用户对 JAAS 和 keytab 文件具有读权限。

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/path/to/keytabs/flume.keytab"
  6. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  7. };

HTTP Sink

HTTP Sink 从 channel 中获取 Event,然后再向远程 HTTP 接口 POST 发送请求,Event 内容作为 POST 的正文发送。

错误处理取决于目标服务器返回的HTTP响应代码。 Sink的 退避就绪 状态是可配置的,事务提交/回滚结果以及Event是否发送成功在内部指标计数器中也是可配置的。

状态代码不可读的服务器返回的任何格式错误的 HTTP 响应都将产生 退避 信号,并且不会从 channel 中消耗该Event。

必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channel

type



组件类型,这个是: http.

endpoint



将要 POST 提交数据接口的绝对地址

connectTimeout

5000

连接超时(毫秒)

requestTimeout

5000

一次请求操作的最大超时时间(毫秒)

contentTypeHeader

text/plain

HTTP请求的Content-Type请求头

acceptHeader

text/plain

HTTP请求的Accept 请求头

defaultBackoff

true

是否默认启用退避机制,如果配置的 backoff.CODE 没有匹配到某个 http 状态码,默认就会使用这个参数值来决定是否退避

defaultRollback

true

是否默认启用回滚机制,如果配置的 rollback.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否回滚

defaultIncrementMetrics

false

是否默认进行统计计数,如果配置的 incrementMetrics.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否参与计数

backoff.CODE



配置某个 http 状态码是否启用退避机制(支持200这种精确匹配和2XX一组状态码匹配模式)

rollback.CODE



配置某个 http 状态码是否启用回滚机制(支持200这种精确匹配和2XX一组状态码匹配模式)

incrementMetrics.CODE



配置某个 http 状态码是否参与计数(支持200这种精确匹配和2XX一组状态码匹配模式)

注意 backoff, rollback 和 incrementMetrics 的 code 配置通常都是用具体的HTTP状态码,如果2xx和200这两种配置同时存在,则200的状态码会被精确匹配,其余200~299(除了200以外)之间的状态码会被2xx匹配。

提示

Flume里面好多组件都有这个退避机制,其实就是下一级目标没有按照预期执行的时候,会执行一个延迟操作。比如向HTTP接口提交数据发生了错误触发了退避机制生效,系统等待30秒再执行后续的提交操作,如果再次发生错误则等待的时间会翻倍,直到达到系统设置的最大等待上限。通常在重试成功后退避就会被重置,下次遇到错误重新开始计算等待的时间。

任何空的或者为 null 的 Event 不会被提交到HTTP接口上。

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = http
  4. a1.sinks.k1.channel = c1
  5. a1.sinks.k1.endpoint = http://localhost:8080/someuri
  6. a1.sinks.k1.connectTimeout = 2000
  7. a1.sinks.k1.requestTimeout = 2000
  8. a1.sinks.k1.acceptHeader = application/json
  9. a1.sinks.k1.contentTypeHeader = application/json
  10. a1.sinks.k1.defaultBackoff = true
  11. a1.sinks.k1.defaultRollback = true
  12. a1.sinks.k1.defaultIncrementMetrics = false
  13. a1.sinks.k1.backoff.4XX = false
  14. a1.sinks.k1.rollback.4XX = false
  15. a1.sinks.k1.incrementMetrics.4XX = true
  16. a1.sinks.k1.backoff.200 = false
  17. a1.sinks.k1.rollback.200 = false
  18. a1.sinks.k1.incrementMetrics.200 = true

Custom Sink

你可以自己写一个 Sink 接口的实现类。启动 Flume 时候必须把你自定义 Sink 所依赖的其他类配置进 classpath 内。custom source 在写配置文件的 type 时候填你的全限定类名。必需的参数已用 粗体 标明。


属性

默认值

解释

channel



与 Sink 绑定的 channe

type



组件类型,这个填你自定义class的全限定类名

配置范例:

  1. a1.channels = c1
  2. a1.sinks = k1
  3. a1.sinks.k1.type = org.example.MySink
  4. a1.sinks.k1.channel = c1