Getting started

Watermill up and running.

What is Watermill?

Watermill is a Golang library for working efficiently with message streams. It is intended for building event-drivenapplications. It can be used for event sourcing, RPC over messages, sagas, and whatever else comes to your mind.You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog, if that fits your use case.

It comes with a set of Pub/Sub implementations and can be easily extended by your own.

Watermill also ships with standard middlewares like instrumentation, poison queue, throttling, correlation,and other tools used by every message-driven application.

Why use Watermill?

With more projects adopting the microservices pattern over recent years, we realized that synchronous communicationis not always the right choice. Asynchronous methods started to grow as a new standard way to communicate.

But while there’s a lot of existing tooling for synchronous integration patterns (e.g. HTTP), correctly setting upa message-oriented project can be a challenge. There’s a lot of different message queues and streaming systems,each with different features and client library API.

Watermill aims to be the standard messaging library for Go, hiding all that complexity behind an API that is easy tounderstand. It provides all you might need for building an application based on events or other asynchronous patterns.After looking at the examples, you should be able to quickly integrate Watermill with your project.

Install

  1. go get -u github.com/ThreeDotsLabs/watermill

One Minute Background

The basic idea behind event-driven applications stays always the same: listen for incoming messages and react to them.Watermill supports this behavior for multiple publishers and subscribers.

The core part of Watermill is the Message. It is as important as http.Requestis for the http package. Most Watermill features use this struct in some way.

Even though PubSub libraries come with complex features, for Watermill it’s enough to implement two interfaces to startworking with them: the Publisher and Subscriber.

  1. type Publisher interface {
  2. Publish(topic string, messages ...*Message) error
  3. Close() error
  4. }
  5. type Subscriber interface {
  6. Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
  7. Close() error
  8. }

Subscribing for Messages

Let’s start with subscribing. Subscribe expects a topic name and returns a channel of incoming messages.What topic exactly means depends on the PubSub implementation.

  1. messages, err := subscriber.Subscribe(ctx, "example.topic")
  2. if err != nil {
  3. panic(err)
  4. }
  5. for msg := range messages {
  6. fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
  7. msg.Ack()
  8. }

See detailed examples below for supported PubSubs.

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

  1. // ...
  2. package main
  3. import (
  4. "context"
  5. "log"
  6. "time"
  7. "github.com/ThreeDotsLabs/watermill"
  8. "github.com/ThreeDotsLabs/watermill/message"
  9. "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
  10. )
  11. func main() {
  12. pubSub := gochannel.NewGoChannel(
  13. gochannel.Config{},
  14. watermill.NewStdLogger(false, false),
  15. )
  16. messages, err := pubSub.Subscribe(context.Background(), "example.topic")
  17. if err != nil {
  18. panic(err)
  19. }
  20. go process(messages)
  21. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

  1. // ...
  2. func process(messages <-chan *message.Message) {
  3. for msg := range messages {
  4. log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
  5. // we need to Acknowledge that we received and processed the message,
  6. // otherwise, it will be resent over and over again.
  7. msg.Ack()
  8. }
  9. }

Running in Docker

The easiest way to run Watermill locally with Kafka is using Docker.

Full source: _examples/pubsubs/kafka/docker-compose.yml

  1. version: '3'
  2. services:
  3. server:
  4. image: golang:1.11
  5. restart: unless-stopped
  6. depends_on:
  7. - kafka
  8. volumes:
  9. - .:/app
  10. - $GOPATH/pkg/mod:/go/pkg/mod
  11. working_dir: /app
  12. command: go run main.go
  13. zookeeper:
  14. image: confluentinc/cp-zookeeper:latest
  15. restart: unless-stopped
  16. logging:
  17. driver: none
  18. environment:
  19. ZOOKEEPER_CLIENT_PORT: 2181
  20. kafka:
  21. image: confluentinc/cp-kafka:latest
  22. restart: unless-stopped
  23. depends_on:
  24. - zookeeper
  25. logging:
  26. driver: none
  27. environment:
  28. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  29. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
  30. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  31. KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

The source should go to main.go.

To run, execute docker-compose up command.

A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

  1. // ...
  2. package main
  3. import (
  4. "context"
  5. "log"
  6. "time"
  7. "github.com/Shopify/sarama"
  8. "github.com/ThreeDotsLabs/watermill"
  9. "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
  10. "github.com/ThreeDotsLabs/watermill/message"
  11. )
  12. func main() {
  13. saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
  14. // equivalent of auto.offset.reset: earliest
  15. saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
  16. subscriber, err := kafka.NewSubscriber(
  17. kafka.SubscriberConfig{
  18. Brokers: []string{"kafka:9092"},
  19. Unmarshaler: kafka.DefaultMarshaler{},
  20. OverwriteSaramaConfig: saramaSubscriberConfig,
  21. ConsumerGroup: "test_consumer_group",
  22. },
  23. watermill.NewStdLogger(false, false),
  24. )
  25. if err != nil {
  26. panic(err)
  27. }
  28. messages, err := subscriber.Subscribe(context.Background(), "example.topic")
  29. if err != nil {
  30. panic(err)
  31. }
  32. go process(messages)
  33. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

  1. // ...
  2. func process(messages <-chan *message.Message) {
  3. for msg := range messages {
  4. log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
  5. // we need to Acknowledge that we received and processed the message,
  6. // otherwise, it will be resent over and over again.
  7. msg.Ack()
  8. }
  9. }

Running in Docker

The easiest way to run Watermill locally with NATS is using Docker.

Full source: _examples/pubsubs/nats-streaming/docker-compose.yml

  1. version: '3'
  2. services:
  3. server:
  4. image: golang:1.11
  5. restart: unless-stopped
  6. depends_on:
  7. - nats-streaming
  8. volumes:
  9. - .:/app
  10. - $GOPATH/pkg/mod:/go/pkg/mod
  11. working_dir: /app
  12. command: go run main.go
  13. nats-streaming:
  14. image: nats-streaming:0.11.2
  15. restart: unless-stopped

The source should go to main.go.

To run execute docker-compose up command.

A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-streaming/main.go

  1. // ...
  2. package main
  3. import (
  4. "context"
  5. "log"
  6. "time"
  7. stan "github.com/nats-io/stan.go"
  8. "github.com/ThreeDotsLabs/watermill"
  9. "github.com/ThreeDotsLabs/watermill-nats/pkg/nats"
  10. "github.com/ThreeDotsLabs/watermill/message"
  11. )
  12. func main() {
  13. subscriber, err := nats.NewStreamingSubscriber(
  14. nats.StreamingSubscriberConfig{
  15. ClusterID: "test-cluster",
  16. ClientID: "example-subscriber",
  17. QueueGroup: "example",
  18. DurableName: "my-durable",
  19. SubscribersCount: 4, // how many goroutines should consume messages
  20. CloseTimeout: time.Minute,
  21. AckWaitTimeout: time.Second * 30,
  22. StanOptions: []stan.Option{
  23. stan.NatsURL("nats://nats-streaming:4222"),
  24. },
  25. Unmarshaler: nats.GobMarshaler{},
  26. },
  27. watermill.NewStdLogger(false, false),
  28. )
  29. if err != nil {
  30. panic(err)
  31. }
  32. messages, err := subscriber.Subscribe(context.Background(), "example.topic")
  33. if err != nil {
  34. panic(err)
  35. }
  36. go process(messages)
  37. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-streaming/main.go

  1. // ...
  2. func process(messages <-chan *message.Message) {
  3. for msg := range messages {
  4. log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
  5. // we need to Acknowledge that we received and processed the message,
  6. // otherwise, it will be resent over and over again.
  7. msg.Ack()
  8. }
  9. }

Running in Docker

You can run Google Cloud Pub/Sub emulator locally for development.

Full source: _examples/pubsubs/googlecloud/docker-compose.yml

  1. version: '3'
  2. services:
  3. server:
  4. image: golang:1.11
  5. restart: unless-stopped
  6. depends_on:
  7. - googlecloud
  8. volumes:
  9. - .:/app
  10. - $GOPATH/pkg/mod:/go/pkg/mod
  11. environment:
  12. # use local emulator instead of google cloud engine
  13. PUBSUB_EMULATOR_HOST: "googlecloud:8085"
  14. working_dir: /app
  15. command: go run main.go
  16. googlecloud:
  17. image: google/cloud-sdk:228.0.0
  18. entrypoint: gcloud --quiet beta emulators pubsub start --host-port=googlecloud:8085 --verbosity=debug --log-http
  19. restart: unless-stopped

The source should go to main.go.

To run, execute docker-compose up.

A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go

  1. // ...
  2. package main
  3. import (
  4. "context"
  5. "log"
  6. "time"
  7. "github.com/ThreeDotsLabs/watermill"
  8. "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
  9. "github.com/ThreeDotsLabs/watermill/message"
  10. )
  11. func main() {
  12. logger := watermill.NewStdLogger(false, false)
  13. subscriber, err := googlecloud.NewSubscriber(
  14. googlecloud.SubscriberConfig{
  15. // custom function to generate Subscription Name,
  16. // there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
  17. GenerateSubscriptionName: func(topic string) string {
  18. return "test-sub_" + topic
  19. },
  20. ProjectID: "test-project",
  21. },
  22. logger,
  23. )
  24. if err != nil {
  25. panic(err)
  26. }
  27. // Subscribe will create the subscription. Only messages that are sent after the subscription is created may be received.
  28. messages, err := subscriber.Subscribe(context.Background(), "example.topic")
  29. if err != nil {
  30. panic(err)
  31. }
  32. go process(messages)
  33. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go

  1. // ...
  2. func process(messages <-chan *message.Message) {
  3. for msg := range messages {
  4. log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
  5. // we need to Acknowledge that we received and processed the message,
  6. // otherwise, it will be resent over and over again.
  7. msg.Ack()
  8. }
  9. }

Running in Docker

Full source: _examples/pubsubs/amqp/docker-compose.yml

  1. version: '3'
  2. services:
  3. server:
  4. image: golang:1.11
  5. restart: unless-stopped
  6. depends_on:
  7. - rabbitmq
  8. volumes:
  9. - .:/app
  10. - $GOPATH/pkg/mod:/go/pkg/mod
  11. working_dir: /app
  12. command: go run main.go
  13. rabbitmq:
  14. image: rabbitmq:3.7
  15. restart: unless-stopped

The source should go to main.go.

To run, execute docker-compose up.

A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

  1. // ...
  2. package main
  3. import (
  4. "context"
  5. "log"
  6. "time"
  7. "github.com/ThreeDotsLabs/watermill"
  8. "github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp"
  9. "github.com/ThreeDotsLabs/watermill/message"
  10. )
  11. var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
  12. func main() {
  13. amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
  14. subscriber, err := amqp.NewSubscriber(
  15. // This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
  16. // It works as a simple queue.
  17. //
  18. // If you want to implement a Pub/Sub style service instead, check
  19. // https://watermill.io/docs/pub-sub-implementations/#amqp-consumer-groups
  20. amqpConfig,
  21. watermill.NewStdLogger(false, false),
  22. )
  23. if err != nil {
  24. panic(err)
  25. }
  26. messages, err := subscriber.Subscribe(context.Background(), "example.topic")
  27. if err != nil {
  28. panic(err)
  29. }
  30. go process(messages)
  31. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

  1. // ...
  2. func process(messages <-chan *message.Message) {
  3. for msg := range messages {
  4. log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
  5. // we need to Acknowledge that we received and processed the message,
  6. // otherwise, it will be resent over and over again.
  7. msg.Ack()
  8. }
  9. }

Running in Docker

Full source: _examples/pubsubs/sql/docker-compose.yml

  1. version: '3'
  2. services:
  3. server:
  4. image: golang:1.12
  5. restart: unless-stopped
  6. depends_on:
  7. - mysql
  8. volumes:
  9. - .:/app
  10. - $GOPATH/pkg/mod:/go/pkg/mod
  11. working_dir: /app
  12. command: go run main.go
  13. mysql:
  14. image: mysql:8.0
  15. restart: unless-stopped
  16. ports:
  17. - 3306:3306
  18. environment:
  19. MYSQL_DATABASE: watermill
  20. MYSQL_ALLOW_EMPTY_PASSWORD: "yes"

The source should go to main.go.

To run, execute docker-compose up.

A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

  1. // ...
  2. package main
  3. import (
  4. "context"
  5. stdSQL "database/sql"
  6. "log"
  7. "time"
  8. driver "github.com/go-sql-driver/mysql"
  9. "github.com/ThreeDotsLabs/watermill"
  10. "github.com/ThreeDotsLabs/watermill-sql/pkg/sql"
  11. "github.com/ThreeDotsLabs/watermill/message"
  12. )
  13. func main() {
  14. db := createDB()
  15. logger := watermill.NewStdLogger(false, false)
  16. subscriber, err := sql.NewSubscriber(
  17. db,
  18. sql.SubscriberConfig{
  19. SchemaAdapter: sql.DefaultSchema{},
  20. OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
  21. InitializeSchema: true,
  22. },
  23. logger,
  24. )
  25. if err != nil {
  26. panic(err)
  27. }
  28. messages, err := subscriber.Subscribe(context.Background(), "example_topic")
  29. if err != nil {
  30. panic(err)
  31. }
  32. go process(messages)
  33. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

  1. // ...
  2. func process(messages <-chan *message.Message) {
  3. for msg := range messages {
  4. log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
  5. // we need to Acknowledge that we received and processed the message,
  6. // otherwise, it will be resent over and over again.
  7. msg.Ack()
  8. }
  9. }

Creating Messages

Watermill doesn’t enforce any message format. NewMessage expects a slice of bytes as the payload. You can usestrings, JSON, protobuf, Avro, gob, or anything else that serializes to []byte.

The message UUID is optional, but recommended, as it helps with debugging.

  1. msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

Publishing Messages

Publish expects a topic and one or more Messages to be published.

  1. err := publisher.Publish("example.topic", msg)
  2. if err != nil {
  3. panic(err)
  4. }

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

  1. // ...
  2. go process(messages)
  3. publishMessages(pubSub)
  4. }
  5. func publishMessages(publisher message.Publisher) {
  6. for {
  7. msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
  8. if err := publisher.Publish("example.topic", msg); err != nil {
  9. panic(err)
  10. }
  11. time.Sleep(time.Second)
  12. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

  1. // ...
  2. go process(messages)
  3. publisher, err := kafka.NewPublisher(
  4. kafka.PublisherConfig{
  5. Brokers: []string{"kafka:9092"},
  6. Marshaler: kafka.DefaultMarshaler{},
  7. },
  8. watermill.NewStdLogger(false, false),
  9. )
  10. if err != nil {
  11. panic(err)
  12. }
  13. publishMessages(publisher)
  14. }
  15. func publishMessages(publisher message.Publisher) {
  16. for {
  17. msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
  18. if err := publisher.Publish("example.topic", msg); err != nil {
  19. panic(err)
  20. }
  21. time.Sleep(time.Second)
  22. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-streaming/main.go

  1. // ...
  2. go process(messages)
  3. publisher, err := nats.NewStreamingPublisher(
  4. nats.StreamingPublisherConfig{
  5. ClusterID: "test-cluster",
  6. ClientID: "example-publisher",
  7. StanOptions: []stan.Option{
  8. stan.NatsURL("nats://nats-streaming:4222"),
  9. },
  10. Marshaler: nats.GobMarshaler{},
  11. },
  12. watermill.NewStdLogger(false, false),
  13. )
  14. if err != nil {
  15. panic(err)
  16. }
  17. publishMessages(publisher)
  18. }
  19. func publishMessages(publisher message.Publisher) {
  20. for {
  21. msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
  22. if err := publisher.Publish("example.topic", msg); err != nil {
  23. panic(err)
  24. }
  25. time.Sleep(time.Second)
  26. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go

  1. // ...
  2. go process(messages)
  3. publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{
  4. ProjectID: "test-project",
  5. }, logger)
  6. if err != nil {
  7. panic(err)
  8. }
  9. publishMessages(publisher)
  10. }
  11. func publishMessages(publisher message.Publisher) {
  12. for {
  13. msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
  14. if err := publisher.Publish("example.topic", msg); err != nil {
  15. panic(err)
  16. }
  17. time.Sleep(time.Second)
  18. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

  1. // ...
  2. go process(messages)
  3. publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
  4. if err != nil {
  5. panic(err)
  6. }
  7. publishMessages(publisher)
  8. }
  9. func publishMessages(publisher message.Publisher) {
  10. for {
  11. msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
  12. if err := publisher.Publish("example.topic", msg); err != nil {
  13. panic(err)
  14. }
  15. time.Sleep(time.Second)
  16. // ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

  1. // ...
  2. go process(messages)
  3. publisher, err := sql.NewPublisher(
  4. db,
  5. sql.PublisherConfig{
  6. SchemaAdapter: sql.DefaultSchema{},
  7. },
  8. logger,
  9. )
  10. if err != nil {
  11. panic(err)
  12. }
  13. publishMessages(publisher)
  14. }
  15. func createDB() *stdSQL.DB {
  16. conf := driver.NewConfig()
  17. conf.Net = "tcp"
  18. conf.User = "root"
  19. conf.Addr = "mysql"
  20. conf.DBName = "watermill"
  21. db, err := stdSQL.Open("mysql", conf.FormatDSN())
  22. if err != nil {
  23. panic(err)
  24. }
  25. err = db.Ping()
  26. if err != nil {
  27. panic(err)
  28. }
  29. return db
  30. }
  31. func publishMessages(publisher message.Publisher) {
  32. for {
  33. msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
  34. if err := publisher.Publish("example_topic", msg); err != nil {
  35. panic(err)
  36. }
  37. time.Sleep(time.Second)
  38. // ...

Using Message Router

Publishers and subscribers are rather low-level parts of Watermill.In most cases, you’d usually want to use a high-level interface and features like correlation, metrics, poison queue, retrying, throttling, etc..

You might want to send an Ack only if the message was processed successfully.In other cases, you’ll Ack immediately and then worry about processing.Sometimes, you want to perform some action based on the incoming message, and publish another message in response.

To handle these requirements, there is a component named Router.

Example application of Message Router

The flow of the example application looks like this:

  • A message is produced on topic incoming_messages_topic every second.
  • struct_handler handler listens on incoming_messages_topic. When a message is received, the UUID is printed and a new message is produced on outgoing_messages_topic.
  • print_incoming_messages handler listens on incoming_messages_topic and prints the messages’ UUID, payload and metadata.
  • print_outgoing_messages handler listens on outgoing_messages_topic and prints the messages’ UUID, payload and metadata. Correlation ID should be the same as in the message on incoming_messages_topic.

Router configuration

Start with configuring the router, adding plugins and middlewares.Then set up handlers that the router will use. Each handler will independently handle messages.

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

  1. // ...
  2. package main
  3. import (
  4. "context"
  5. "fmt"
  6. "log"
  7. "time"
  8. "github.com/ThreeDotsLabs/watermill"
  9. "github.com/ThreeDotsLabs/watermill/message"
  10. "github.com/ThreeDotsLabs/watermill/message/router/middleware"
  11. "github.com/ThreeDotsLabs/watermill/message/router/plugin"
  12. "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
  13. )
  14. var (
  15. // For this example, we're using just a simple logger implementation,
  16. // You probably want to ship your own implementation of `watermill.LoggerAdapter`.
  17. logger = watermill.NewStdLogger(false, false)
  18. )
  19. func main() {
  20. router, err := message.NewRouter(message.RouterConfig{}, logger)
  21. if err != nil {
  22. panic(err)
  23. }
  24. // SignalsHandler will gracefully shutdown Router when SIGTERM is received.
  25. // You can also close the router by just calling `r.Close()`.
  26. router.AddPlugin(plugin.SignalsHandler)
  27. router.AddMiddleware(
  28. // CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
  29. middleware.CorrelationID,
  30. // The handler function is retried if it returns an error.
  31. // After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
  32. middleware.Retry{
  33. MaxRetries: 3,
  34. InitialInterval: time.Millisecond * 100,
  35. Logger: logger,
  36. }.Middleware,
  37. // Recoverer handles panics from handlers.
  38. // In this case, it passes them as errors to the Retry middleware.
  39. middleware.Recoverer,
  40. )
  41. // For simplicity, we are using the gochannel Pub/Sub here,
  42. // You can replace it with any Pub/Sub implementation, it will work the same.
  43. pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
  44. // Producing some incoming messages in background
  45. go publishMessages(pubSub)
  46. router.AddHandler(
  47. "struct_handler", // handler name, must be unique
  48. "incoming_messages_topic", // topic from which we will read events
  49. pubSub,
  50. "outgoing_messages_topic", // topic to which we will publish events
  51. pubSub,
  52. structHandler{}.Handler,
  53. )
  54. // just for debug, we are printing all messages received on `incoming_messages_topic`
  55. router.AddNoPublisherHandler(
  56. "print_incoming_messages",
  57. "incoming_messages_topic",
  58. pubSub,
  59. printMessages,
  60. )
  61. // just for debug, we are printing all events sent to `outgoing_messages_topic`
  62. router.AddNoPublisherHandler(
  63. "print_outgoing_messages",
  64. "outgoing_messages_topic",
  65. pubSub,
  66. printMessages,
  67. )
  68. // Now that all handlers are registered, we're running the Router.
  69. // Run is blocking while the router is running.
  70. ctx := context.Background()
  71. if err := router.Run(ctx); err != nil {
  72. panic(err)
  73. }
  74. }
  75. // ...

Incoming messages

The struct_handler consumes messages from incoming_messages_topic, so we are simulating incoming traffic by calling publishMessages() in the background.Notice that we’ve added the SetCorrelationID middleware. A Correlation ID will be added to all messages produced by the router (it will be stored in metadata).

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

  1. // ...
  2. func publishMessages(publisher message.Publisher) {
  3. for {
  4. msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
  5. middleware.SetCorrelationID(watermill.NewUUID(), msg)
  6. log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
  7. if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
  8. panic(err)
  9. }
  10. time.Sleep(time.Second)
  11. }
  12. }
  13. // ...

Handlers

You may have noticed that there are two types of handler functions:

  • function func(msg message.Message) ([]message.Message, error)
  • method func (c structHandler) Handler(msg message.Message) ([]message.Message, error)If your handler is a function without any dependencies, it’s fine to use the first one.The second option is useful when your handler requires some dependencies like database handle, a logger, etc.

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

  1. // ...
  2. func printMessages(msg *message.Message) error {
  3. fmt.Printf(
  4. "\n> Received message: %s\n> %s\n> metadata: %v\n\n",
  5. msg.UUID, string(msg.Payload), msg.Metadata,
  6. )
  7. return nil
  8. }
  9. type structHandler struct {
  10. // we can add some dependencies here
  11. }
  12. func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
  13. log.Println("structHandler received message", msg.UUID)
  14. msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
  15. return message.Messages{msg}, nil
  16. }

Done!

You can run this example by go run main.go.

You’ve just created your first application with Watermill. You can find the full source in /_examples/basic/3-router/main.go.

Logging

To see Watermill’s logs, you have to pass any logger that implements the LoggerAdapter.For experimental development, you can use NewStdLogger.

Testing

Watermill provides a set of test scenariosthat any Pub/Sub implementation can use. Each test suite needs to declare what features it supports and how to construct a new Pub/Sub.These scenarios check both basic usage and more uncommon use cases. Stress tests are also included.

Deployment

Watermill is not a framework. We don’t enforce any type of deployment and it’s totally up to you.

What’s next?

For more detailed documentation check documentation topics.

Examples

Check out the examples that will show you how to start using Watermill.

The recommended entry point is Your first Watermill application.It contains the entire environment in docker-compose.yml, including Golang and Kafka, which you can run with one command.

After that, you can see the Realtime feed example.It uses more middlewares and contains two handlers. There is also a separate application for publishing messages.

For a different subscriber implementation, namely HTTP, refer to the receiving-webhooks example. It is a very simple application that saves webhooks to Kafka.

Full list of examples can be found in the project’s README.

Support

If anything is not clear, feel free to use any of our support channels, we will be glad to help.