什么是RPC?

RPC(Remote Procedure Call Protocol)——远程过程调用协议

安全的RPC 客户端 —— Thrift

在1.6版本中,Thrift的source和sink支持kerberos认证。客户端需要使用secureRpcClientFactory的getThriftInstance方法获得SecureThriftRpcClient对象。SecureThriftClient继承ThriftRpcClient(实现了RpcClient接口)。使用SecureRpcClientFactory依赖于Flume-ng-auth模块。客户端的principal以及keytab都需要通过参数的形式传入,他们作为kerberos KDC的证书。另外,目标服务器的principal也需要提供。下面就是secureRpclientFacotry的例子:

  1. import org.apache.flume.Event;
  2. import org.apache.flume.EventDeliveryException;
  3. import org.apache.flume.event.EventBuilder;
  4. import org.apache.flume.api.SecureRpcClientFactory;
  5. import org.apache.flume.api.RpcClientConfigurationConstants;
  6. import org.apache.flume.api.RpcClient;
  7. import java.nio.charset.Charset;
  8. import java.util.Properties;
  9. public class MyApp
  10. {
  11. public static void main(String[] args)
  12. {
  13. MySecureRpcClientFacade client = new MySecureRpcClientFacade();
  14. // Initialize client with the remote Flume agent's host, port
  15. Properties props = new Properties();
  16. props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
  17. props.setProperty("hosts", "h1");
  18. props.setProperty("hosts.h1", "client.example.org" + ":" + String.valueOf(41414));
  19. // Initialize client with the kerberos authentication related properties
  20. props.setProperty("kerberos", "true");
  21. props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG"
  22. props.setProperty("client-keytab", "/tmp/flumeclient.keytab"); props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG"); client.init(props);
  23. // Send 10 events to the remote Flume agent. That agent should be
  24. // configured to listen with an AvroSource.
  25. String sampleData = "Hello Flume!";
  26. for(int i = 0; i < 10; i++)
  27. {
  28. client.sendDataToFlume(sampleData);
  29. }
  30. client.cleanUp();
  31. }
  32. }
  33. class MySecureRpcClientFacade
  34. {
  35. private RpcClient client;
  36. private Properties properties;
  37. public void init(Properties properties)
  38. {
  39. // Setup the RPC connection
  40. this.properties = properties;
  41. // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
  42. this.client = SecureRpcClientFactory.getThriftInstance(properties);
  43. }
  44. public void sendDataToFlume(String data)
  45. {
  46. // Create a Flume Event object that encapsulates the sample data
  47. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  48. // Send the event
  49. try
  50. {
  51. client.append(event);
  52. }
  53. catch(EventDeliveryException e)
  54. {
  55. // clean up and recreate the client
  56. client.close();
  57. client = null;
  58. client = SecureRpcClientFactory.getThriftInstance(properties);
  59. }
  60. }
  61. public void cleanUp()
  62. {
  63. // Close the RPC connection
  64. client.close();
  65. }
  66. }
  67. }

ThriftSource则需要配置成kerberos模式。

  1. a1.channels = c1
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels.c1.type = memory
  5. a1.sources.r1.channels = c1
  6. a1.sources.r1.type = thrift
  7. a1.sources.r1.bind = 0.0.0.0
  8. a1.sources.r1.port = 41414
  9. a1.sources.r1.kerberos = true
  10. a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
  11. a1.sources.r1.agent-keytab = /tmp/flume.keytab
  12. a1.sinks.k1.channel = c1
  13. a1.sinks.k1.type = logger

容错机制

下面的类使用的是默认的Avro RPC Client,它基于<host>:<port>的列表组成容错组。容错RPC Client目前不支持Thrift.如果当前与指定的agent通信出错,则会自动选取列表中的下一个通信。比如:

  1. // Setup properties for the failover
  2. Properties props = new Properties();
  3. props.put("client.type", "default_failover");
  4. // List of hosts (space-separated list of user-chosen host aliases)
  5. props.put("hosts", "h1 h2 h3");
  6. // host/port pair for each host alias
  7. String host1 = "host1.example.org:41414";
  8. String host2 = "host2.example.org:41414";
  9. String host3 = "host3.example.org:41414";
  10. props.put("hosts.h1", host1);
  11. props.put("hosts.h2", host2);
  12. props.put("hosts.h3", host3);
  13. // create the client with failover properties
  14. RpcClient client = RpcClientFactory.getInstance(props);

为了更灵活一些,failover flume client实现FailoverRpcClient,可以基于下面的配置:

  1. client.type = default_failover
  2. hosts = h1 h2 h3 # at least one is required, but 2 or
  3. # more makes better sense
  4. hosts.h1 = host1.example.org:41414
  5. hosts.h2 = host2.example.org:41414
  6. hosts.h3 = host3.example.org:41414
  7. max-attempts = 3 # Must be >=0 (default: number of hosts
  8. # specified, 3 in this case). A '0'
  9. # value doesn't make much sense because
  10. # it will just cause an append call to
  11. # immmediately fail. A '1' value means
  12. # that the failover client will try only
  13. # once to send the Event, and if it
  14. # fails then there will be no failover
  15. # to a second client, so this value
  16. # causes the failover client to
  17. # degenerate into just a default client.
  18. # It makes sense to set this value to at
  19. # least the number of hosts that you
  20. # specified.
  21. batch-size = 100 # Must be >=1 (default: 100)
  22. connect-timeout = 20000 # Must be >=1000 (default: 20000)
  23. request-timeout = 20000 # Must be >=1000 (default: 20000)

负载均衡

Flume客户端SDK也支持在多个主机中负载均衡。client使用<host>:<port>的形式组成一个负载均衡组。client端会配置负载均衡的策略,可能是随机选择配置的主机,也可能是基于轮询的模式。你可以通过实现LoadBalancingRpcClient$HostSelector接口,指定自定义的类。在这种情况下,FQCN需要指定成特定的host selector.负载均衡RPC目前不支持Thrift.

如果backoff可用,那么在主机失败进行选举的时候会排除名单中的主机。当超时后,如果这个主机仍然不可用就会认为选举失败,超时时间会以指数级增长,以避免某些主机反应迟钝

最大的backoff事件可以通过maxBackoff进行配置。默认是30S(在OrderSelector中指定)。backoff 参数会以指数级增长。最大限制为65536秒,即18.2小时。

  1. // Setup properties for the load balancing
  2. Properties props = new Properties();
  3. props.put("client.type", "default_loadbalance");
  4. // List of hosts (space-separated list of user-chosen host aliases)
  5. props.put("hosts", "h1 h2 h3");
  6. // host/port pair for each host alias
  7. String host1 = "host1.example.org:41414";
  8. String host2 = "host2.example.org:41414";
  9. String host3 = "host3.example.org:41414";
  10. props.put("hosts.h1", host1);
  11. props.put("hosts.h2", host2);
  12. props.put("hosts.h3", host3);
  13. props.put("host-selector", "random"); // For random host selection
  14. // props.put("host-selector", "round_robin"); // For round-robin host
  15. // // selection
  16. props.put("backoff", "true"); // Disabled by default.
  17. props.put("maxBackoff", "10000"); // Defaults 0, which effectively
  18. // becomes 30000 ms
  19. // Create the client with load balancing properties
  20. RpcClient client = RpcClientFactory.getInstance(props);

也可以直接如下配置:

  1. client.type = default_loadbalance
  2. hosts = h1 h2 h3 # At least 2 hosts are required
  3. hosts.h1 = host1.example.org:41414
  4. hosts.h2 = host2.example.org:41414
  5. hosts.h3 = host3.example.org:41414
  6. backoff = false # Specifies whether the client should
  7. # back-off from (i.e. temporarily
  8. # blacklist) a failed host
  9. # (default: false).
  10. maxBackoff = 0 # Max timeout in millis that a will
  11. # remain inactive due to a previous
  12. # failure with that host (default: 0,
  13. # which effectively becomes 30000)
  14. host-selector = round_robin # The host selection strategy used
  15. # when load-balancing among hosts
  16. # (default: round_robin).
  17. # Other values are include "random"
  18. # or the FQCN of a custom class
  19. # that implements
  20. # LoadBalancingRpcClient$HostSelector
  21. batch-size = 100 # Must be >=1 (default: 100)
  22. connect-timeout = 20000 # Must be >=1000 (default: 20000)
  23. request-timeout = 20000 # Must be >=1000 (default: 20000)

嵌入式节点

Flume支持嵌入式API,把节点嵌入到应用中。这种节点意味着更加轻量级,比如没有source,sink以及channel的概念。EmbeddedAgent对象的put以及putAll方法可以搜集事件,目前仅支持File Channel以及Memory Channel,sink仅支持AvroSink.

配置嵌入式节点与配置普通节点类似。下面是额外的配置:

属性名称默认值描述
source.typeembedded唯一可用的source就是embedded source
channel.type-可以是memory或者file,对应的是MemoryChannel以及FileChannel
channel.-配置channelde canshu
sinks-sink的名称列表
sink.type-值必须为avro
sink.-sink的配置
processor.type-可以使failover或者load_balance,对应的是FailoverSinksProcessor以及LoadBalancingSinkProcessor
processor.-配置processor
source.interceptors-拦截器的列表
source.interceptors.-拦截器配置
  1. Map<String, String> properties = new HashMap<String, String>();
  2. properties.put("channel.type", "memory");
  3. properties.put("channel.capacity", "200");
  4. properties.put("sinks", "sink1 sink2");
  5. properties.put("sink1.type", "avro");
  6. properties.put("sink2.type", "avro");
  7. properties.put("sink1.hostname", "collector1.apache.org");
  8. properties.put("sink1.port", "5564");
  9. properties.put("sink2.hostname", "collector2.apache.org");
  10. properties.put("sink2.port", "5565");
  11. properties.put("processor.type", "load_balance");
  12. properties.put("source.interceptors", "i1");
  13. properties.put("source.interceptors.i1.type", "static");
  14. properties.put("source.interceptors.i1.key", "key1");
  15. properties.put("source.interceptors.i1.value", "value1");
  16. EmbeddedAgent agent = new EmbeddedAgent("myagent");
  17. agent.configure(properties);
  18. agent.start();
  19. List<Event> events = Lists.newArrayList();
  20. events.add(event);
  21. events.add(event);
  22. events.add(event);
  23. events.add(event);
  24. agent.putAll(events);
  25. ...
  26. agent.stop();