Spring Cloud Stream

Spring Cloud Stream是一个构建消息驱动的微服务框架。Spring Cloud Stream构建在Spring Boot之上用以创建DevOps友好的微服务,并且Spring Integration提供了和消息代理的连接。Spring Cloud Stream提供消息代理的自用配置,引入发布订阅的语义概念,引入不同的中间件厂商通用的的消费组和分区,这些自用配置提供了创建流处理应用的基础。

添加@EnableBinding注解在你的程序中,被@StreamListener修饰的方法可以立即连接到消息代理,你将收到流处理事件。

For full documentation visit spring cloud stream.

Quick Start

项目中使用spring-cloud-stream推荐基于一个依赖管理系统 — 下面的代码段可以被复制和粘贴到您的构建。需要帮助吗?看看我们基于MavenGradle构建的入门指南。

  1. <dependencyManagement>
  2. <dependencies>
  3. <dependency>
  4. <groupId>org.springframework.cloud</groupId>
  5. <artifactId>spring-cloud-stream-dependencies</artifactId>
  6. <version>1.0.2.RELEASE</version>
  7. <type>pom</type>
  8. <scope>import</scope>
  9. </dependency>
  10. </dependencies>
  11. </dependencyManagement>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.cloud</groupId>
  15. <artifactId>spring-cloud-stream</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.cloud</groupId>
  19. <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  20. </dependency>
  21. </dependencies>

只要classpath中包含 Spring Cloud Stream和Spring Cloud Stream binder,并且被@EnableBinding修饰,应用将通过总线绑定一个外部代理(Rabbit MQ或Kafka,取决于你的选择)。示例应用:

  1. @SpringBootApplication
  2. @EnableBinding(Source.class)
  3. public class StreamdemoApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(StreamdemoApplication.class, args);
  6. }
  7. @Bean
  8. @InboundChannelAdapter(value = Source.OUTPUT)
  9. public MessageSource<String> timerMessageSource() {
  10. return () -> new GenericMessage<>(new SimpleDateFormat().format(new Date()));
  11. }
  12. }

确定应用运行的时候Kafka同时运行,你可以看kafka-console-consumer.shkafka提供的实用工具,用来监控消息发送。

Sample Projects

Source

Sink

Transformer

Multi-binder

RxJava Processor

Spring Cloud Stream Applications

Spring Cloud Data Flow

Spring XD