Fan out flow

As discussed in previous section, Flume supports fanning out the flow from onesource to multiple channels. There are two modes of fan out, replicating andmultiplexing. In the replicating flow, the event is sent to all the configuredchannels. In case of multiplexing, the event is sent to only a subset ofqualifying channels. To fan out the flow, one needs to specify a list ofchannels for a source and the policy for the fanning it out. This is done byadding a channel “selector” that can be replicating or multiplexing. Thenfurther specify the selection rules if it’s a multiplexer. If you don’t specifya selector, then by default it’s replicating:

  1. # List the sources, sinks and channels for the agent
  2. <Agent>.sources = <Source1>
  3. <Agent>.sinks = <Sink1> <Sink2>
  4. <Agent>.channels = <Channel1> <Channel2>
  5.  
  6. # set list of channels for source (separated by space)
  7. <Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
  8.  
  9. # set channel for sinks
  10. <Agent>.sinks.<Sink1>.channel = <Channel1>
  11. <Agent>.sinks.<Sink2>.channel = <Channel2>
  12.  
  13. <Agent>.sources.<Source1>.selector.type = replicating

The multiplexing select has a further set of properties to bifurcate the flow.This requires specifying a mapping of an event attribute to a set for channel.The selector checks for each configured attribute in the event header. If itmatches the specified value, then that event is sent to all the channels mappedto that value. If there’s no match, then the event is sent to set of channelsconfigured as default:

  1. # Mapping for multiplexing selector
  2. <Agent>.sources.<Source1>.selector.type = multiplexing
  3. <Agent>.sources.<Source1>.selector.header = <someHeader>
  4. <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
  5. <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
  6. <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
  7. #...
  8.  
  9. <Agent>.sources.<Source1>.selector.default = <Channel2>

The mapping allows overlapping the channels for each value.

The following example has a single flow that multiplexed to two paths. Theagent named agent_foo has a single avro source and two channels linked to two sinks:

  1. # list the sources, sinks and channels in the agent
  2. agent_foo.sources = avro-AppSrv-source1
  3. agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
  4. agent_foo.channels = mem-channel-1 file-channel-2
  5.  
  6. # set channels for source
  7. agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
  8.  
  9. # set channel for sinks
  10. agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
  11. agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
  12.  
  13. # channel selector configuration
  14. agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
  15. agent_foo.sources.avro-AppSrv-source1.selector.header = State
  16. agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
  17. agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
  18. agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
  19. agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector checks for a header called “State”. If the value is “CA” then itssent to mem-channel-1, if its “AZ” then it goes to file-channel-2 or if its“NY” then both. If the “State” header is not set or doesn’t match any of thethree, then it goes to mem-channel-1 which is designated as ‘default’.

The selector also supports optional channels. To specify optional channels fora header, the config parameter ‘optional’ is used in the following way:

  1. # channel selector configuration
  2. agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
  3. agent_foo.sources.avro-AppSrv-source1.selector.header = State
  4. agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
  5. agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
  6. agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
  7. agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
  8. agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
  9. agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector will attempt to write to the required channels first and will failthe transaction if even one of these channels fails to consume the events. Thetransaction is reattempted on all of the channels. Once all requiredchannels have consumed the events, then the selector will attempt to write tothe optional channels. A failure by any of the optional channels to consume theevent is simply ignored and not retried.

If there is an overlap between the optional channels and required channels for aspecific header, the channel is considered to be required, and a failure in thechannel will cause the entire set of required channels to be retried. Forinstance, in the above example, for the header “CA” mem-channel-1 is consideredto be a required channel even though it is marked both as required and optional,and a failure to write to this channel will cause thatevent to be retried on all channels configured for the selector.

Note that if a header does not have any required channels, then the event willbe written to the default channels and will be attempted to be written to theoptional channels for that header. Specifying optional channels will still causethe event to be written to the default channels, if no required channels arespecified. If no channels are designated as default and there are no required,the selector will attempt to write the events to the optional channels. Anyfailures are simply ignored in that case.