自定义开发组件

客户端

客户端操作事件对象的源头,并且把他们发送给flume节点。client最典型的就是处理应用进程产生的数据。目前flume支持Avro,log4j,syslog,http等数据源类型。另外,也有一种execsource类型可以消费本地进程输出的信息。

退出选项不够充分这种情况是很重要的,在这种情况下,如何创建一种自定义的机制发送数据。有两种方式:

  • 第一种就是创建自定义的客户端与flume的source进行交互,比如AvroSource和SyslogTcpSource。这样client需要把数据转换成flume理解的格式。
  • 第二种,是自定义中flume source基于IPC或者RPC协议与本地进程进行沟通,然后把数据传给flume节点。

注意,所有的数据都是存储在flume节点的channel中。

client SDK

尽管flume包含了一系列的数据产生机制,但是很多场景下还是需要与自定义的应用进行交互。Flume SDK可以帮助开发者使用RPC协议与应用进行连接。

RPC参考1:浅出

RPC参考2:深入

RPC 客户端接口

实现RpcClient接口可以把Flume封装起来。用户仅需要调用Flume的API方法,append以及appendBatch即可发送数据,不需要关心详细的消息信息。用户可以使用提供的Event对象,或者使用EventBuilder方法重写withBody()方法。

RPC 客户端 - Avro 以及 Thrift

Avro是默认的RPC协议,NettyAvroRpcClient以及ThriftRpcClient都实现了RpcClient接口。客户端需要创建目标Agent的主机名以及端口号,并且使用RpcClient发送数据。下面就是一个数据产生的应用:

  1. import org.apache.flume.Event;
  2. import org.apache.flume.EventDeliveryException;
  3. import org.apache.flume.api.RpcClient;
  4. import org.apache.flume.api.RpcClientFactory;
  5. import org.apache.flume.event.EventBuilder;
  6. import java.nio.charset.Charset;
  7. public class MyApp {
  8. public static void main(String[] args) {
  9. MyRpcClientFacade client = new MyRpcClientFacade();
  10. // 初始化主机名以及端口号
  11. client.init("host.example.org", 41414);
  12. // 发送给远程节点10条数据
  13. String sampleData = "Hello Flume!";
  14. for (int i = 0; i < 10; i++) {
  15. client.sendDataToFlume(sampleData);
  16. }
  17. client.cleanUp();
  18. }
  19. }
  20. class MyRpcClientFacade {
  21. private RpcClient client;
  22. private String hostname;
  23. private int port;
  24. public void init(String hostname, int port) {
  25. // 初始化RPC客户端
  26. this.hostname = hostname;
  27. this.port = port;
  28. this.client = RpcClientFactory.getDefaultInstance(hostname, port);
  29. // 使用下面的方法创建thrift的客户端
  30. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  31. }
  32. public void sendDataToFlume(String data) {
  33. // 创建事件对象
  34. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  35. try {// 发送事件
  36. client.append(event);
  37. } catch (EventDeliveryException e) {
  38. // 清除信息,重建Client
  39. client.close();
  40. client = null;
  41. client = RpcClientFactory.getDefaultInstance(hostname, port);
  42. }
  43. }
  44. public void cleanUp() {
  45. // 关闭RPC连接
  46. client.close();
  47. }
  48. }

远程的Flume节点需要有一个AvroSource的source,来监听某个端口。下面就是配置的例子:

  1. a1.channels = c1
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels.c1.type = memory
  5. a1.sources.r1.channels = c1
  6. a1.sources.r1.type = avro
  7. # For using a thrift source set the following instead of the above line.
  8. # a1.source.r1.type = thrift
  9. a1.sources.r1.bind = 0.0.0.0
  10. a1.sources.r1.port = 41414
  11. a1.sinks.k1.channel = c1
  12. a1.sinks.k1.type = logger

如果想要更灵活,可以使用默认的flume客户端实现方式(NettyAvroRpcClient以及ThriftRpcClient)可以参考下面的配置:

  1. client.type = default (for avro) or thrift (for thrift)
  2. hosts = h1 # default client accepts only 1 host
  3. # (additional hosts will be ignored)
  4. hosts.h1 = host1.example.org:41414 # host and port must both be specified
  5. # (neither has a default)
  6. batch-size = 100 # Must be >=1 (default: 100)
  7. connect-timeout = 20000 # Must be >=1000 (default: 20000)
  8. request-timeout = 20000 # Must be >=1000 (default: 20000)