Closing the producer goroutine from the consumer

The producer-and-consumer pattern is well used in Go concurrent programming. Whenthe consumer stops, we want to gracefully stop the producer as well.

Problem

When a gRPC server receives a streaming request, it usually calls afunction that returns a channel,reads the result from that channel and send the result to the client one by one.

Take the following code for instance: upon receiving a request, the main goroutineService calls launchJob. launchJob starts a separate goroutine as an anonymousfunction call and returns a channel. In the anonymous function, items will be sent tochannel. And Service on the otherside of the channel will reads from it.

  1. func Service(req *Request, stream *StreamResponse) error {
  2. result := launchJob(req.Content)
  3. for r := range result {
  4. if e := stream.Send(result); e != nil {
  5. // should we signal the running goroutine so it will stop sending?
  6. return e
  7. }
  8. }
  9. }
  10. func launchJob(content string) chan Item {
  11. c := make(chan Item)
  12. go func() {
  13. defer close(c)
  14. acquireScarceResources()
  15. defer releaseScarceResources()
  16. ...
  17. // if stream.Send(result) returns an error and the Service returns, this will be blocked
  18. c <- Item{}
  19. ...
  20. }()
  21. return c
  22. }

There is a major problem in this implementation. As pointed out by the comment,if the Send in Service returns an error, the Service function will return,leaving the anonymous function being blocked on c <- Item{} forever.

This problem is important because the leaking goroutine usually owns scarce systemresources such as network connection and memory.

Solution: pipeline explicit cancellation

Inspired by this blog post sectionExplicit cancellation, we can signal the cancellation via closing on a separatechannel. And we can follow the terminology as io.Pipe.

  1. package sql
  2. import (
  3. "errors"
  4. )
  5. var ErrClosedPipe = errors.New("pipe: write on closed pipe")
  6. // pipe follows the design at https://blog.golang.org/pipelines
  7. // - wrCh: chan for piping data
  8. // - done: chan for signaling Close from Reader to Writer
  9. type pipe struct {
  10. wrCh chan interface{}
  11. done chan struct{}
  12. }
  13. // PipeReader reads real data
  14. type PipeReader struct {
  15. p *pipe
  16. }
  17. // PipeWriter writes real data
  18. type PipeWriter struct {
  19. p *pipe
  20. }
  21. // Pipe creates a synchronous in-memory pipe.
  22. //
  23. // It is safe to call Read and Write in parallel with each other or with Close.
  24. // Parallel calls to Read and parallel calls to Write are also safe:
  25. // the individual calls will be gated sequentially.
  26. func Pipe() (*PipeReader, *PipeWriter) {
  27. p := &pipe{
  28. wrCh: make(chan interface{}),
  29. done: make(chan struct{})}
  30. return &PipeReader{p}, &PipeWriter{p}
  31. }
  32. // Close closes the reader; subsequent writes to the
  33. func (r *PipeReader) Close() {
  34. close(r.p.done)
  35. }
  36. // ReadAll returns the data chan. The caller should
  37. // use it as `for r := range pr.ReadAll()`
  38. func (r *PipeReader) ReadAll() chan interface{} {
  39. return r.p.wrCh
  40. }
  41. // Close closes the writer; subsequent ReadAll from the
  42. // read half of the pipe will return a closed channel.
  43. func (w *PipeWriter) Close() {
  44. close(w.p.wrCh)
  45. }
  46. // Write writes the item to the underlying data stream.
  47. // It returns ErrClosedPipe when the data stream is closed.
  48. func (w *PipeWriter) Write(item interface{}) error {
  49. select {
  50. case w.p.wrCh <- item:
  51. return nil
  52. case <-w.p.done:
  53. return ErrClosedPipe
  54. }
  55. }

And the consumer and producer be can implemented as

  1. func Service(req *Request, stream *StreamResponse) error {
  2. pr := launchJob(req.Content)
  3. defer pr.Close()
  4. for r := range pr.ReadAll() {
  5. if e := stream.Send(r); e != nil {
  6. return e
  7. }
  8. }
  9. }
  10. func launchJob(content string) PipeReader {
  11. pr, pw := Pipe()
  12. go func() {
  13. defer pw.Close()
  14. if err := pw.Write(Item{}); err != nil {
  15. return
  16. }
  17. }
  18. return pr
  19. }

Further Reading