RocketMQ Streams 快速开始
RocketMQ Streams工程中运行
参考RocketMQ Streams工程rocketmq-streams-examples模块下程序可以直接运行;运行example步骤:
- 本地启动RocketMQ 5.0及以上版本;
- 使用mqAdmin创建example中数据源topic;
- 启动example中例子;
- 向RocketMQ的源topic中写入合适数据(依据示例而定);
RocketMQ Streams以SDK方式被应用依赖
环境准备
- 64bit JDK 1.8及以上
- Maven 3.2及以上
- 本地启动RocketMQ,启动文档
构建RocketMQ Streams
添加pom依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-streams</artifactId><!-- 根据需要修改 --><version>1.1.0</version></dependency></dependencies>
编写流计算程序
public class WordCount {public static void main(String[] args) {StreamBuilder builder = new StreamBuilder("wordCount");builder.source("sourceTopic", total -> {String value = new String(total, StandardCharsets.UTF_8);return new Pair<>(null, value);}).flatMap((ValueMapperAction<String, List<String>>) value -> {String[] splits = value.toLowerCase().split("\\W+");return Arrays.asList(splits);}).keyBy(value -> value).count().toRStream().print();TopologyBuilder topologyBuilder = builder.build();Properties properties = new Properties();properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {@Overridepublic void run() {rocketMQStream.stop();latch.countDown();}});try {rocketMQStream.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0);}}
向RocketMQ sourceTopic中写入数据并观察结果
如果向sourceTopic中写入的数据如下:每行数据作为一个消息发送;
"To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation",
统计单词出现频率,计算结果如下:
(key=to, value=1)(key=be, value=1)(key=or, value=1)(key=not, value=1)(key=to, value=2)(key=be, value=2)(key=that, value=1)(key=is, value=1)(key=the, value=1)(key=whether, value=1)(key=tis, value=1)(key=nobler, value=1)(key=mind, value=1)(key=against, value=1)(key=troubles, value=1)(key=slings, value=1)(key=die, value=1)(key=natural, value=1)(key=flesh, value=1)(key=sea, value=1)(key=fortune, value=1)(key=shocks, value=1)(key=consummation, value=1)(key=to, value=3)(key=to, value=4)(key=to, value=5)(key=say, value=1)(key=end, value=1)(key=end, value=2)(key=to, value=6)(key=to, value=7)(key=to, value=8)(key=or, value=2)(key=them, value=1)(key=take, value=1)(key=arms, value=1)(key=of, value=1)(key=and, value=1)(key=of, value=2)(key=and, value=2)(key=by, value=1)(key=sleep, value=1)(key=and, value=3)(key=by, value=2)(key=sleep, value=2)(key=and, value=4)(key=that, value=2)(key=arrows, value=1)(key=heir, value=1)(key=question, value=1)(key=is, value=2)(key=the, value=2)(key=suffer, value=1)(key=a, value=1)(key=the, value=3)(key=no, value=1)(key=a, value=2)(key=opposing, value=1)(key=the, value=4)(key=the, value=5)(key=a, value=3)(key=in, value=1)(key=more, value=1)(key=heartache, value=1)(key=outrageous, value=1)(key=we, value=1)(key=thousand, value=1)(key=tis, value=2)