Sink组逻辑处理器

你可以把多个sink分成一个组, 这时候 Sink组逻辑处理器 可以对这同一个组里的几个sink进行负载均衡或者其中一个sink发生故障后将输出Event的任务转移到其他的sink上。

提示

说的直白一些,这N个sink本来是要将Event输出到对应的N个目的地的,通过 Sink组逻辑处理器 就可以把这N个sink配置成负载均衡或者故障转移的工作方式(暂时还不支持自定义的)。负载均衡就方式是把channel里面的Event按照配置的负载机制(比如轮询)分别发送到sink各自对应的目的地;故障转移就是这N个sink同一时间只有一个在工作,其余的作为备用,工作的sink挂掉之后备用的sink顶上。

必需的参数已用 粗体 标明。


属性

默认值

解释

sinks



这一组的所有sink名,多个用空格分开

processor.type

default

这个sink组的逻辑处理器类型,可选值 default (默认一对一的) 、 failover (故障转移) 、 load_balance (负载均衡)

配置范例:

  1. a1.sinkgroups = g1
  2. a1.sinkgroups.g1.sinks = k1 k2
  3. a1.sinkgroups.g1.processor.type = load_balance

默认

默认的组逻辑处理器就是只有一个sink的情况(准确说这根本不算一个组),所以这种情况就没必要配置sink组了。本文档前面的例子都是 source - channel - sink这种一对一,单个sink的。

故障转移

故障转移组逻辑处理器维护了一个发送Event失败的sink的列表,保证有一个sink是可用的来发送Event。

故障转移机制的工作原理是将故障sink降级到一个池中,在池中为它们分配冷却期(超时时间),在重试之前随顺序故障而增加。 Sink成功发送事件后,它将恢复到实时池。sink具有与之相关的优先级,数值越大,优先级越高。如果在发送Event时Sink发生故障,会继续尝试下一个具有最高优先级的sink。 例如,在优先级为80的sink之前激活优先级为100的sink。如果未指定优先级,则根据配置中的顺序来选取。

要使用故障转移选择器,不仅要设置sink组的选择器为failover,还有为每一个sink设置一个唯一的优先级数值。 可以使用 maxpenalty 属性设置故障转移时间的上限(毫秒)。

必需的参数已用 粗体 标明。


属性

默认值

解释

sinks



这一组的所有sink名,多个用空格分开

processor.type

default

组件类型,这个是: failover

processor.priority.<sinkName>



组内sink的权重值,<sinkName>必须是当前组关联的sink之一。数值(绝对值)越高越早被激活

processor.maxpenalty

30000

发生异常的sink最大故障转移时间(毫秒)

配置范例:

  1. a1.sinkgroups = g1
  2. a1.sinkgroups.g1.sinks = k1 k2
  3. a1.sinkgroups.g1.processor.type = failover
  4. a1.sinkgroups.g1.processor.priority.k1 = 5
  5. a1.sinkgroups.g1.processor.priority.k2 = 10
  6. a1.sinkgroups.g1.processor.maxpenalty = 10000

负载均衡

负载均衡Sink 选择器提供了在多个sink上进行负载均衡流量的功能。 它维护一个活动sink列表的索引来实现负载的分配。 默认支持了轮询(roundrobin)和随机(random)两种选择机制分配负载。默认是轮询,可以通过配置来更改。也可以从 _AbstractSinkSelector 继承写一个自定义的选择器。

  • 工作时,此选择器使用其配置的选择机制选择下一个sink并调用它。 如果所选sink无法正常工作,则处理器通过其配置的选择机制选择下一个可用sink。 此实现不会将失败的Sink列入黑名单,而是继续乐观地尝试每个可用的Sink。
  • 如果所有sink调用都失败了,选择器会将故障抛给sink的运行器。

如果backoff设置为true则启用了退避机制,失败的sink会被放入黑名单,达到一定的超时时间后会自动从黑名单移除。 如从黑名单出来后sink仍然失败,则再次进入黑名单而且超时时间会翻倍,以避免在无响应的sink上浪费过长时间。如果没有启用退避机制,在禁用此功能的情况下,发生sink传输失败后,会将本次负载传给下一个sink继续尝试,因此这种情况下是不均衡的。

必需的参数已用 粗体 标明。


属性

默认值

解释

processor.sinks



这一组的所有sink名,多个用空格分开

processor.type

default

组件类型,这个是: loadbalance

processor.backoff

false

失败的sink是否成倍地增加退避它的时间。如果设置为false,负载均衡在某一个sink发生异常后,下一次选择sink的时候仍然会将失败的这个sink加入候选队列;如果设置为true,某个sink连续发生异常时会成倍地增加它的退避时间,在退避的时间内是无法参与负载均衡竞争的。退避机制只统计1个小时发生的异常,超过1个小时没有发生异常就会重新计算

processor.selector

round_robin

负载均衡机制,可选值:round_robin (轮询)、 random (随机选择)、「自定义选择器的全限定类名」:自定义的负载器要继承 _AbstractSinkSelector

processor.selector.maxTimeOut

30000

发生异常的sink最长退避时间(毫秒)如果设置了processor.backoff=true,某一个sink发生异常的时候就会触发自动退避它一段时间,这个 maxTimeOut 就是退避一个sink的最长时间

配置范例:

  1. a1.sinkgroups = g1
  2. a1.sinkgroups.g1.sinks = k1 k2
  3. a1.sinkgroups.g1.processor.type = load_balance
  4. a1.sinkgroups.g1.processor.backoff = true
  5. a1.sinkgroups.g1.processor.selector = random

自定义

目前还不支持自定义Sink组逻辑处理器