Storm 与 Kestrel

本文说明了如何使用 Storm 从 Kestrel 集群中消费数据。

前言

Storm

本教程中使用了 storm-kestrel 项目和 storm-starter 项目中的例子。建议读者将这几个项目 clone 到本地,并动手运行其中的例子。

Kestrel

本文假定读者可以如此项目所述在本地运行一个 Kestrel 集群。

Kestrel 服务器与队列

Kestrel 服务中包含有一组消息队列。Kestrel 队列是一种非常简单的消息队列,可以运行于 JVM 上,并使用 memcache 协议(以及一些扩展)与客户端交互。详情可以参考 storm-kestrel 项目中的 KestrelThriftClient 类的实现。

每个队列均严格遵循先入先出的规则。为了提高服务性能,数据都是缓存在系统内存中的;不过,只有开头的 128MB 是保存在内存中的。在服务停止的时候,队列的状态会保存到一个日志文件中。

请参阅此文了解更多详细信息。

Kestrel 具有 快速 小巧 持久 可靠 等特点。

例如,Twitter 就使用 Kestrel 作为消息系统的核心环节,此文中介绍了相关信息。

** 向 Kestrel 中添加数据

首先,我们需要一个可以向 Kestrel 的队列添加数据的程序。下述方法使用了 storm-kestrel 项目中的 KestrelClient 的实现。该方法从一个包含 5 个句子的数组中随机选择一个句子添加到 Kestrel 的队列中。

  1. private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
  2. throws ParseError, IOException {
  3. String[] sentences = new String[] {
  4. "the cow jumped over the moon",
  5. "an apple a day keeps the doctor away",
  6. "four score and seven years ago",
  7. "snow white and the seven dwarfs",
  8. "i am at two with nature"};
  9. Random _rand = new Random();
  10. for(int i=1; i<=10; i++){
  11. String sentence = sentences[_rand.nextInt(sentences.length)];
  12. String val = "ID " + i + " " + sentence;
  13. boolean queueSucess = kestrelClient.queue(queueName, val);
  14. System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
  15. }
  16. }

从 Kestrel 中移除数据

此方法从一个队列中取出一个数据,但并不把该数据从队列中删除:

  1. private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){
  2. Item item = kestrelClient.dequeue(queueName);
  3. if(item==null){
  4. System.out.println("The queue (" + queueName + ") contains no items.");
  5. }
  6. else
  7. {
  8. byte[] data = item._data;
  9. String receivedVal = new String(data);
  10. System.out.println("receivedItem=" + receivedVal);
  11. }
  12. }

此方法会从队列中取出并移除数据:

  1. private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
  2. throws IOException, ParseError
  3. {
  4. for(int i=1; i<=12; i++){
  5. Item item = kestrelClient.dequeue(queueName);
  6. if(item==null){
  7. System.out.println("The queue (" + queueName + ") contains no items.");
  8. }
  9. else
  10. {
  11. int itemID = item._id;
  12. byte[] data = item._data;
  13. String receivedVal = new String(data);
  14. kestrelClient.ack(queueName, itemID);
  15. System.out.println("receivedItem=" + receivedVal);
  16. }
  17. }
  18. }

向 Kestrel 中连续添加数据

下面的程序可以向本地 Kestrel 服务的一个 sentence_queue 队列中连续添加句子,这也是我们的最后一个程序。

可以在命令行窗口中输入一个右中括号 ] 并回车来停止程序。

  1. import java.io.IOException;
  2. import java.io.InputStream;
  3. import java.util.Random;
  4. import backtype.storm.spout.KestrelClient;
  5. import backtype.storm.spout.KestrelClient.Item;
  6. import backtype.storm.spout.KestrelClient.ParseError;
  7. public class AddSentenceItemsToKestrel {
  8. /**
  9. * @param args
  10. */
  11. public static void main(String[] args) {
  12. InputStream is = System.in;
  13. char closing_bracket = ']';
  14. int val = closing_bracket;
  15. boolean aux = true;
  16. try {
  17. KestrelClient kestrelClient = null;
  18. String queueName = "sentence_queue";
  19. while(aux){
  20. kestrelClient = new KestrelClient("localhost",22133);
  21. queueSentenceItems(kestrelClient, queueName);
  22. kestrelClient.close();
  23. Thread.sleep(1000);
  24. if(is.available()>0){
  25. if(val==is.read())
  26. aux=false;
  27. }
  28. }
  29. } catch (IOException e) {
  30. // TODO Auto-generated catch block
  31. e.printStackTrace();
  32. }
  33. catch (ParseError e) {
  34. // TODO Auto-generated catch block
  35. e.printStackTrace();
  36. } catch (InterruptedException e) {
  37. // TODO Auto-generated catch block
  38. e.printStackTrace();
  39. }
  40. System.out.println("end");
  41. }
  42. }

使用 KestrelSpout

下面的拓扑使用 KestrelSpout 从一个 Kestrel 队列中读取句子,并将句子分割成若干个单词(Bolt:SplitSentence),然后输出每个单词出现的次数(Bolt:WordCount)。数据处理的细节可以参考消息的可靠性保证一文。

  1. TopologyBuilder builder = new TopologyBuilder();
  2. builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
  3. builder.setBolt("split", new SplitSentence(), 10)
  4. .shuffleGrouping("sentences");
  5. builder.setBolt("count", new WordCount(), 20)
  6. .fieldsGrouping("split", new Fields("word"));

运行

首先,以生产模式或者开发者模式启动你的本地 Kestrel 服务。

然后,等待大约 5 秒钟以防出现网络连接异常。

现在可以运行向队列中添加数据的程序,并启动 Storm 拓扑。程序启动的顺序并不重要。

如果你以 TOPOLOGY_DEBUG 模式运行拓扑你会观察到拓扑中 tuple 发送的细节信息。