Advanced Connect and Custom Dialer in Go

The Go NATS client features a CustomDialer option which allows you to customize the connection logic against the NATS server without having to modify the internals of the client. For example, let’s say that you want to make the client use the context package to use DialContext and be able to cancel connecting to NATS altogether with a deadline, you could then do define a Dialer implementation as follows:

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "net"
  6. "time"
  7. "github.com/nats-io/nats.go"
  8. )
  9. type customDialer struct {
  10. ctx context.Context
  11. nc *nats.Conn
  12. connectTimeout time.Duration
  13. connectTimeWait time.Duration
  14. }
  15. func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
  16. ctx, cancel := context.WithTimeout(cd.ctx, cd.connectTimeout)
  17. defer cancel()
  18. for {
  19. log.Println("Attempting to connect to", address)
  20. if ctx.Err() != nil {
  21. return nil, ctx.Err()
  22. }
  23. select {
  24. case <-cd.ctx.Done():
  25. return nil, cd.ctx.Err()
  26. default:
  27. d := &net.Dialer{}
  28. if conn, err := d.DialContext(ctx, network, address); err == nil {
  29. log.Println("Connected to NATS successfully")
  30. return conn, nil
  31. } else {
  32. time.Sleep(cd.connectTimeWait)
  33. }
  34. }
  35. }
  36. }

With the dialer implementation above, the NATS client will retry a number of times to connect to the NATS server until the context is no longer valid:

  1. func main() {
  2. // Parent context cancels connecting/reconnecting altogether.
  3. ctx, cancel := context.WithCancel(context.Background())
  4. defer cancel()
  5. var err error
  6. var nc *nats.Conn
  7. cd := &customDialer{
  8. ctx: ctx,
  9. connectTimeout: 10 * time.Second,
  10. connectTimeWait: 1 * time.Second,
  11. }
  12. opts := []nats.Option{
  13. nats.SetCustomDialer(cd),
  14. nats.ReconnectWait(2 * time.Second),
  15. nats.ReconnectHandler(func(c *nats.Conn) {
  16. log.Println("Reconnected to", c.ConnectedUrl())
  17. }),
  18. nats.DisconnectHandler(func(c *nats.Conn) {
  19. log.Println("Disconnected from NATS")
  20. }),
  21. nats.ClosedHandler(func(c *nats.Conn) {
  22. log.Println("NATS connection is closed.")
  23. }),
  24. nats.NoReconnect(),
  25. }
  26. go func() {
  27. nc, err = nats.Connect("127.0.0.1:4222", opts...)
  28. }()
  29. WaitForEstablishedConnection:
  30. for {
  31. if err != nil {
  32. log.Fatal(err)
  33. }
  34. // Wait for context to be canceled either by timeout
  35. // or because of establishing a connection...
  36. select {
  37. case <-ctx.Done():
  38. break WaitForEstablishedConnection
  39. default:
  40. }
  41. if nc == nil || !nc.IsConnected() {
  42. log.Println("Connection not ready")
  43. time.Sleep(200 * time.Millisecond)
  44. continue
  45. }
  46. break WaitForEstablishedConnection
  47. }
  48. if ctx.Err() != nil {
  49. log.Fatal(ctx.Err())
  50. }
  51. for {
  52. if nc.IsClosed() {
  53. break
  54. }
  55. if err := nc.Publish("hello", []byte("world")); err != nil {
  56. log.Println(err)
  57. time.Sleep(1 * time.Second)
  58. continue
  59. }
  60. log.Println("Published message")
  61. time.Sleep(1 * time.Second)
  62. }
  63. // Disconnect and flush pending messages
  64. if err := nc.Drain(); err != nil {
  65. log.Println(err)
  66. }
  67. log.Println("Disconnected")
  68. }