Image credit: Jasper Van Der Meij

Sponsor me on Patreon to support more content like this.

In the previous part in this series, we touched upon user authentication and JWT. In this episode, we'll take a quick look at the go-micro's broker functionality and even brokering.

As mentioned in previous posts, go-micro is a pluggable framework, and it interfaces lots of different commonly used technologies. If you take a look at the plugins repo, you'll see just how many it supports out of the box.

In our case, we'll be using the NATS broker plugin.

Event Driven Architecture

Event driven architecture' is a very simple concept at its core. We generally consider good architecture to be decoupled; that services shouldn't be coupled to, or aware of other services. When we use protocols such as gRPC, that's in some case true, because we're saying I would like to publish this request to go.srv.user-service for example. Which uses service discovery to find the actual location of that service. Although that doesn't directly couple us to the implementation, it does couple that service to something else called go.srv.user-service, so it isn't completely decoupled, as it's talking directly to something else.

So what makes event driven architecture truly decoupled? To understand this, let's first look at the process in publishing and subscribing to an event. Service a finished a task x, and then says to the ecosystem 'x has just happened'. It doesn't need to know about, or care about who is listening to that event, or what is affected by that event taking place. That responsibility is left to the listening clients.

It's also easier if you're expecting n number of services to act upon a certain event. For example, if you wanted 12 different services to act upon a new user being created using gRPC, you would have to instantiate 12 clients within your user service. With pubsub, or event driven architecture, your service doesn't need to care about that.

Now, a client service will simply listen to an event. This means that you need some kind of mediator in the middle to accept these events and inform the listening clients of their publication.

In this post, we'll create an event every time we create a user, and we will create a new service for sending out emails. We won't actually write the email implementation, just mock it out for now.

The code

First we need to integrate the NATS broker plug-in into our user-service:

  1. // shippy-user-service/main.go
  2. func main() {
  3. ...
  4. // Init will parse the command line flags.
  5. srv.Init()
  6. // Get instance of the broker using our defaults
  7. pubsub := srv.Server().Options().Broker
  8. // Register handler
  9. pb.RegisterUserServiceHandler(srv.Server(), &service{repo, tokenService, pubsub})
  10. ...
  11. }

Now let's publish the event once we create a new user (see full changes here):

  1. // shippy-user-service/handler.go
  2. const topic = "user.created"
  3. type service struct {
  4. repo Repository
  5. tokenService Authable
  6. PubSub broker.Broker
  7. }
  8. ...
  9. func (srv *service) Create(ctx context.Context, req *pb.User, res *pb.Response) error {
  10. // Generates a hashed version of our password
  11. hashedPass, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
  12. if err != nil {
  13. return err
  14. }
  15. req.Password = string(hashedPass)
  16. if err := srv.repo.Create(req); err != nil {
  17. return err
  18. }
  19. res.User = req
  20. if err := srv.publishEvent(req); err != nil {
  21. return err
  22. }
  23. return nil
  24. }
  25. func (srv *service) publishEvent(user *pb.User) error {
  26. // Marshal to JSON string
  27. body, err := json.Marshal(user)
  28. if err != nil {
  29. return err
  30. }
  31. // Create a broker message
  32. msg := &broker.Message{
  33. Header: map[string]string{
  34. "id": user.Id,
  35. },
  36. Body: body,
  37. }
  38. // Publish message to broker
  39. if err := srv.PubSub.Publish(topic, msg); err != nil {
  40. log.Printf("[pub] failed: %v", err)
  41. }
  42. return nil
  43. }
  44. ...

Make sure you're running Postgres and then let's run this service:

  1. $ docker run -d -p 5432:5432 postgres
  2. $ make build
  3. $ make run

Now let's create our email service. I've created a new repo for this:

  1. // shippy-email-service
  2. package main
  3. import (
  4. "encoding/json"
  5. "log"
  6. pb "github.com/EwanValentine/shippy-user-service/proto/user"
  7. micro "github.com/micro/go-micro"
  8. "github.com/micro/go-micro/broker"
  9. _ "github.com/micro/go-plugins/broker/nats"
  10. )
  11. const topic = "user.created"
  12. func main() {
  13. srv := micro.NewService(
  14. micro.Name("go.micro.srv.email"),
  15. micro.Version("latest"),
  16. )
  17. srv.Init()
  18. // Get the broker instance using our environment variables
  19. pubsub := srv.Server().Options().Broker
  20. if err := pubsub.Connect(); err != nil {
  21. log.Fatal(err)
  22. }
  23. // Subscribe to messages on the broker
  24. _, err := pubsub.Subscribe(topic, func(p broker.Publication) error {
  25. var user *pb.User
  26. if err := json.Unmarshal(p.Message().Body, &user); err != nil {
  27. return err
  28. }
  29. log.Println(user)
  30. go sendEmail(user)
  31. return nil
  32. })
  33. if err != nil {
  34. log.Println(err)
  35. }
  36. // Run the server
  37. if err := srv.Run(); err != nil {
  38. log.Println(err)
  39. }
  40. }
  41. func sendEmail(user *pb.User) error {
  42. log.Println("Sending email to:", user.Name)
  43. return nil
  44. }

Before running this, we'll need to run NATS

  1. $ docker run -d -p 4222:4222 nats

Also, I'd like to quickly explain a part of go-micro that I feel is important in understanding how it works as a framework. You will notice:

  1. srv.Init()
  2. pubsub := srv.Server().Options().Broker

Let's take a quick look at that. When we create a service in go-micro, srv.Init() behind the scenes will look for any configuration, such as any plugins and environment variables, or command line options. It will then instantiate these integrations as part of the service. In order to then use those instances, we need to fetch them out of the service instance. Within srv.Server().Options(), you will also find Transport and Registry.

In our case here, it will find our GO_MICRO_BROKER environment variables, it will find the NATS broker plugin, and create an instance of that. Ready for us to connect to and use.

If you're creating a command line tool, you'd use cmd.Init(), ensuring you're importing "github.com/micro/go-micro/cmd". That will have the same affect.

Now build and run this service: $ make build && make run, ensuring you're running the user service as well. Then head over to our shippy-user-cli repo, and run $ make run, watching our email service output. You should see something like… 2017/12/26 23:57:23 Sending email to: Ewan Valentine.

And that's it! This is a bit of a tenuous example, as our email service is implicitly listening to a single 'user.created' event. But hopefully you can see how this approach allows you to write decoupled services.

It's worth mentioning that using JSON over NATS will have a performance overhead vs gRPC, as we're back into the realm of serialising json strings. But, for some use-cases, that's perfectly acceptable. NATS is incredibly efficient, and great for fire and forget events.

Go-micro also has support for some of the most widely used queue/pubsub technologies ready for you to use. You can see a list of them here. You don't have to change your implementation because go-micro abstracts that away for you. You just need to change the environment variable from MICROBROKER=nats to MICRO_BROKER=googlepubsub, then change the import in your main.go from "github.com/micro/go-plugins/broker/nats" to _ "github.com/micro/go-plugins/broker/googlepubsub" for example.

If you're not using go-micro, there's a NATS go library(NATS itself is written in Go, so naturally support for Go is pretty solid).

Publishing an event:

  1. nc, _ := nats.Connect(nats.DefaultURL)
  2. // Simple Publisher
  3. nc.Publish("user.created", userJsonString)

Subscribing to an event:

  1. // Simple Async Subscriber
  2. nc.Subscribe("user.created", func(m *nats.Msg) {
  3. user := convertUserString(m.Data)
  4. go sendEmail(user)
  5. })

I mentioned earlier that when using a third party message broker, such as NATS, you lose the use of protobuf. Which is a shame, because we lose the ability to communicate using binary streams, which of course have a much lower overhead than serialised JSON strings. But like most concerns, go-micro has an answer to that problem as well.

Built-in to go-micro is a pubsub layer, which sits on top of the broker layer, but without the need for a third party broker such as NATS. But the awesome part of this feature, is that it utilises your protobuf definitions. So we're back in the realms of low-latency binary streams. So let's update our user-service to replace the existing NATS broker, with go-micro's pubsub:

  1. // shippy-user-service/main.go
  2. func main() {
  3. ...
  4. publisher := micro.NewPublisher("user.created", srv.Client())
  5. // Register handler
  6. pb.RegisterUserServiceHandler(srv.Server(), &service{repo, tokenService, publisher})
  7. ...
  8. }
  1. // shippy-user-service/handler.go
  2. func (srv *service) Create(ctx context.Context, req *pb.User, res *pb.Response) error {
  3. // Generates a hashed version of our password
  4. hashedPass, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
  5. if err != nil {
  6. return err
  7. }
  8. req.Password = string(hashedPass)
  9. // Here's our new publisher code, much simpler
  10. if err := srv.repo.Create(req); err != nil {
  11. return err
  12. }
  13. res.User = req
  14. if err := srv.Publisher.Publish(ctx, req); err != nil {
  15. return err
  16. }
  17. return nil
  18. }

Now our email service:

  1. // shippy-email-service
  2. const topic = "user.created"
  3. type Subscriber struct{}
  4. func (sub *Subscriber) Process(ctx context.Context, user *pb.User) error {
  5. log.Println("Picked up a new message")
  6. log.Println("Sending email to:", user.Name)
  7. return nil
  8. }
  9. func main() {
  10. ...
  11. micro.RegisterSubscriber(topic, srv.Server(), new(Subscriber))
  12. ...
  13. }

Now we're using our underlying User protobuf definition across our services, over gRPC, and using no third-party broker. Fantastic!

That's a wrap! Next tutorial we'll look at creating a user-interface for our services, and look at how a web client can begin to interact with our services.

Any bugs, mistakes, or feedback on this article, or anything you would find helpful, please drop me an email.

If you are finding this series useful, and you use an ad-blocker (who can blame you). Please consider chucking me a couple of quid for my time and effort. Cheers! https://monzo.me/ewanvalentine

Or, sponsor me on Patreon to support more content like this.