事务接口

Transaction接口是Flume可靠性的基础。所有的组件(source,sink以及channel)都需要使用transaction。

一个Transaction是通过channel实现的。每个source以及都需要连接channel,但是不包括transaction对象。source实际上是使用channelSelector接口实现Transaction.Event的存储以及消费都在一个Transaction活动中。比如:

  1. Channel ch = new MemoryChannel();
  2. Transaction txn = ch.getTransaction();
  3. txn.begin();
  4. try {
  5. // This try clause includes whatever Channel operations you want to do
  6. Event eventToStage = EventBuilder.withBody("Hello Flume!",
  7. Charset.forName("UTF-8"));
  8. ch.put(eventToStage);
  9. // Event takenEvent = ch.take();
  10. // ...
  11. txn.commit();
  12. } catch (Throwable t) {
  13. txn.rollback();
  14. // Log exception, handle individual exceptions as needed
  15. // re-throw all Errors
  16. if (t instanceof Error) {
  17. throw (Error)t;
  18. }
  19. } finally {
  20. txn.close();
  21. }

这里仅仅给出存储的例子。在begin()返回后,Transaction开启,并且当Event进入到channel后,如果存储成功,则Transaction提交并且关闭。

sink

sink从channel中取出event,然后转发给下一个节点或者存储在外部的存储点。一个sink与一个channel协同工作,在FLume的配置文件中设置。有一个SinkRunner实例会管理每个配置的sink,当flume 框架调用sinkRunner.start()方法的时候,新的线程将会开启用于扮演sink的角色(使用sinkRunner.PollingRunner作为线程的Runable).这个线程管理了sink的生命周期。sink需要实现start()以及stop()方法,作为LifecycleAware接口的一部分。Sink.start()方法用于初始化sink,并切换到可以处理Event的状态。Sink.process()方法负责把channel中的数据取出提取Event事件。Sink.stop()方法则负责必要的清理工作(释放资源)。sink通过实现configurable接口,也可以自行进行一些配置。

  1. public class MySink extends AbstractSink implements Configurable {
  2. private String myProp;
  3. @Override
  4. public void configure(Context context) {
  5. String myProp = context.getString("myProp", "defaultValue");
  6. // Process the myProp value (e.g. validation)
  7. // Store myProp for later retrieval by process() method
  8. this.myProp = myProp;
  9. }
  10. @Override
  11. public void start() {
  12. // Initialize the connection to the external repository (e.g. HDFS) that
  13. // this Sink will forward Events to ..
  14. }
  15. @Override
  16. public void stop () {
  17. // Disconnect from the external respository and do any
  18. // additional cleanup (e.g. releasing resources or nulling-out
  19. // field values) ..
  20. }
  21. @Override
  22. public Status process() throws EventDeliveryException {
  23. Status status = null;
  24. // Start transaction
  25. Channel ch = getChannel();
  26. Transaction txn = ch.getTransaction();
  27. txn.begin();
  28. try {
  29. // This try clause includes whatever Channel operations you want to do
  30. Event event = ch.take();
  31. // Send the Event to the external repository.
  32. // storeSomeData(e);
  33. txn.commit();
  34. status = Status.READY;
  35. } catch (Throwable t) {
  36. txn.rollback();
  37. // Log exception, handle individual exceptions as needed
  38. status = Status.BACKOFF;
  39. // re-throw all Errors
  40. if (t instanceof Error) {
  41. throw (Error)t;
  42. }
  43. } finally {
  44. txn.close();
  45. }
  46. return status;
  47. }
  48. }

Source

source目的是从外部的客户端或者channel中获取Event对象。一个source可以是一个ChannelProcessor的实例来产生event。ChannelProcessor可以通过channelSeletor获得实例。Transaction也可以保证source与channel之间的可靠性。

与sinkRunner.PollingRunner的Runable类似,PollingRunner Runable可以通过PollableSourceRunner.start()创建。

注意实际上有两个source,PollableSource是准备阶段。另一个是EventDrivenSource。它具有回调机制。

  1. public class MySource extends AbstractSource implements Configurable, PollableSource
  2. private String myProp;
  3. @Override
  4. public void configure(Context context) {
  5. String myProp = context.getString("myProp", "defaultValue");
  6. // Process the myProp value (e.g. validation, convert to another type, ...)
  7. // Store myProp for later retrieval by process() method
  8. this.myProp = myProp;
  9. }
  10. @Override
  11. public void start() {
  12. // Initialize the connection to the external client
  13. }
  14. @Override
  15. public void stop () {
  16. // Disconnect from external client and do any additional cleanup
  17. // (e.g. releasing resources or nulling-out field values) ..
  18. }
  19. @Override
  20. public Status process() throws EventDeliveryException {
  21. Status status = null;
  22. // Start transaction
  23. Channel ch = getChannel();
  24. Transaction txn = ch.getTransaction();
  25. txn.begin();
  26. try {
  27. // This try clause includes whatever Channel operations you want to do
  28. // Receive new data
  29. Event e = getSomeData();
  30. // Store the Event into this Source's associated Channel(s)
  31. getChannelProcessor().processEvent(e)
  32. txn.commit();
  33. status = Status.READY;
  34. } catch (Throwable t) {
  35. txn.rollback();
  36. // Log exception, handle individual exceptions as needed
  37. status = Status.BACKOFF;
  38. // re-throw all Errors
  39. if (t instanceof Error) {
  40. throw (Error)t;
  41. }
  42. } finally {
  43. txn.close();
  44. }
  45. return status;
  46. }