扇出流

如前面章节所述,Flume支持流的扇出形式配置,就是一个source连接多个channel。有两种扇出模式,复制多路复用 。在复制模式下,source中的Event会被发送到与source连接的所有channel上。在多路复用模式下,Event仅被发送到部分channel上。为了分散流量,需要指定好source的所有channel和Event分发的策略。这是通过增加一个复制或多路复用的选择器来实现的,如果是多路复用选择器,还要进一步指定Event分发的规则。如果没有配置选择器,默认就是复制选择器。

  1. # 列出这个Agent的source、sink和channel,注意这里有1个source、2个channel和2个sink
  2. <Agent>.sources = <Source1>
  3. <Agent>.sinks = <Sink1> <Sink2>
  4. <Agent>.channels = <Channel1> <Channel2>
  5.  
  6. # 指定与source1连接的channel,这里配置了两个channel
  7. <Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
  8.  
  9. # 将两个sink分别与两个channel相连接
  10. <Agent>.sinks.<Sink1>.channel = <Channel1>
  11. <Agent>.sinks.<Sink2>.channel = <Channel2>
  12.  
  13. # 指定source1的channel选择器类型是复制选择器(按照上段介绍,不显示配置这个选择器的话,默认也是复制)
  14. <Agent>.sources.<Source1>.selector.type = replicating

多路复用选择器具有另外一组属性可以配置来分发数据流。这需要指定Event属性到channel的映射,选择器检查Event header中每一个配置中指定的属性值,如果与配置的规则相匹配,则该Event将被发送到规则设定的channel上。如果没有匹配的规则,则Event会被发送到默认的channel上,具体看下面配置:

  1. # 多路复用选择器的完整配置如下
  2. <Agent>.sources.<Source1>.selector.type = multiplexing # 选择器类型是多路复用
  3. <Agent>.sources.<Source1>.selector.header = <someHeader> # 假如这个<someHeader>值是abc,则选择器会读取Event header中的abc属性来作为分发的依据
  4. <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> # 加入这里Value1配置的是3,则Event header中abc属性的值等于3的Event会被发送到channel1上
  5. <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> # 同上,Event header中abc属性等于Value2的Event会被发送到channel1和channel2上
  6. <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> # 同上规则,Event header中abc属性等于Value3的Event会被发送到channel2上
  7. #...
  8.  
  9. <Agent>.sources.<Source1>.selector.default = <Channel2> # Event header读取到的abc属性值不属于上面配置的任何一个的话,默认就会发送到这个channel2上

映射的配置允许为每个值配置重复的channel

下面的例子中,一个数据流被分发到了两个路径上。这个叫agent_foo的Agent有一个Avro Source和两个channel,这两个channel分别连接到了两个sink上:

  1. # 列出了Agent的所有source、 sink 和 channel
  2. agent_foo.sources = avro-AppSrv-source1
  3. agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
  4. agent_foo.channels = mem-channel-1 file-channel-2
  5.  
  6. # 让source与两个channel相连接
  7. agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
  8.  
  9. # 分别设定两个sink对应的channel
  10. agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
  11. agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
  12.  
  13. # source的channel选择器配置
  14. agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing # 选择器类型是多路复用,非复制
  15. agent_foo.sources.avro-AppSrv-source1.selector.header = State # 读取Event header中名字叫做State的属性值,以这个值作为分发的映射依据
  16. agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 # State=CA时,Event发送到mem-channel-1上
  17. agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 # State=AZ时,Event发送到file-channel-2上
  18. agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 # State=NY时,Event发送到mem-channel-1和file-channel-2上
  19. agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1 # 如果State不等于上面配置的任何一个值,则Event会发送到mem-channel-1上

上面配置中,选择器检查每个Event中名为“State”的Event header。 如果该值为“CA”,则将其发送到mem-channel-1,如果其为“AZ”,则将其发送到file-channel-2,或者如果其为“NY”则发送到两个channel上。如果Event header中没有“State”或者与前面三个中任何一个都不匹配,则Event被发送到被设置为default的mem-channel-1上。

多路复用选择器还支持一个 optional 属性,看下面的例子:

  1. # 以下是一个channel选择器的配置
  2. agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
  3. agent_foo.sources.avro-AppSrv-source1.selector.header = State
  4. agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 # CA被第一次映射到mem-channel-1
  5. agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
  6. agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
  7. agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2 # 关键看这行,State=CA的映射在上面本来已经指定到mem-channel-1了,这里又另外配置了两个channel
  8. agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
  9. agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

提示

“必需channel”的意思就是被选择器配置里精确匹配到的channel,上面例子里面除了 optional 那一行,剩下的四行映射里面全都是“必需channel”;“可选channel”就是通过 optional 参数配置的映射。

通常选择器会尝试将匹配到的Event写入指定的所有channel中,如果任何一个channel发生了写入失败的情况,就会导致整个事务的的失败,然后会在所有的channel上重试(不管某一个channel之前成功与否,只有所有channel都成功了才认为事务成功了)。一旦所有channel写入成功,选择器还会继续将Event写入与之匹配的“可选channel”上,但是“可选channel”如果发生写入失败,选择器会忽略它。

如果“可选channel”与“必需channel”的channel有重叠(上面关于CA的两行配置就有相同的mem-channel-1),则认为该channel是必需的,这个mem-channel-1发生失败时会导致重试所有“必需channel”。上面例子中的mem-channel-1发生失败的话就会导致evnet在所有channel重试。

提示

这里注意一下,CA这个例子中,“必需channel”失败会导致Event在选择器为它配置的所有通道上重试,是因为第一段中说过“ 一旦所有channel写入成功,选择器还会继续将Event写入与之匹配的“可选channel”上 ”,依据这个原则,再看CA的例子必需的mem-channel-1失败后,重试且成功了,然后再把“可选channel”重试一遍,也就是mem-channel-1和file-channel-2

如果一个Event的header没有找到匹配的“必需channel”,则它会被发送到默认的channel,并且会尝试发送到与这个Event对应的“可选channel”上。无必需,会发送到默认和可选;无必需无默认,还是会发送到可选,这种情况下所有失败都会被忽略。