Message

Message is one of core parts of Watermill.

Message

Message is one of core parts of Watermill. Messages are emitted by Publishers and received by Subscribers.When a message is processed, you should send an Ack() or a Nack() when the processing failed.

Acks and Nacks are processed by Subscribers (in default implementations, the subscribers are waiting for an Ack or a Nack).

Full source: github.com/ThreeDotsLabs/watermill/message/message.go

  1. // ...
  2. type Message struct {
  3. // UUID is an unique identifier of message.
  4. //
  5. // It is only used by Watermill for debugging.
  6. // UUID can be empty.
  7. UUID string
  8. // Metadata contains the message metadata.
  9. //
  10. // Can be used to store data which doesn't require unmarshaling entire payload.
  11. // It is something similar to HTTP request's headers.
  12. //
  13. // Metadata is marshaled and will be saved to PubSub.
  14. Metadata Metadata
  15. // Payload is message's payload.
  16. Payload Payload
  17. // ack is closed, when acknowledge is received.
  18. ack chan struct{}
  19. // noACk is closed, when negative acknowledge is received.
  20. noAck chan struct{}
  21. ackMutex sync.Mutex
  22. ackSentType ackType
  23. ctx context.Context
  24. }
  25. // ...

Ack

Sending Ack

Full source: github.com/ThreeDotsLabs/watermill/message/message.go

  1. // ...
  2. // Ack sends message's acknowledgement.
  3. //
  4. // Ack is not blocking.
  5. // Ack is idempotent.
  6. // False is returned, if Nack is already sent.
  7. func (m *Message) Ack() bool {
  8. // ...

Nack

Full source: github.com/ThreeDotsLabs/watermill/message/message.go

  1. // ...
  2. // Nack sends message's negative acknowledgement.
  3. //
  4. // Nack is not blocking.
  5. // Nack is idempotent.
  6. // False is returned, if Ack is already sent.
  7. func (m *Message) Nack() bool {
  8. // ...

Receiving Ack/Nack

Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/message/receiving-ack.go

  1. // ...
  2. select {
  3. case <-msg.Acked():
  4. log.Print("ack received")
  5. case <-msg.Nacked():
  6. log.Print("nack received")
  7. }
  8. // ...

Context

Message contains the standard library context, just like an HTTP request.

Full source: github.com/ThreeDotsLabs/watermill/message/message.go

  1. // ...
  2. // Context returns the message's context. To change the context, use
  3. // SetContext.
  4. //
  5. // The returned context is always non-nil; it defaults to the
  6. // background context.
  7. func (m *Message) Context() context.Context {
  8. if m.ctx != nil {
  9. return m.ctx
  10. }
  11. return context.Background()
  12. }
  13. // SetContext sets provided context to the message.
  14. func (m *Message) SetContext(ctx context.Context) {
  15. m.ctx = ctx
  16. }
  17. // ...