grpc 数据流转

阅读本文的前提是你对 grpc 协议的编解码和 协议打解包过程都比较清楚了,假如不是很了解可以先去阅读 《10 - grpc 协议编解码器》《11 - grpc 协议解包过程全剖析》

再谈协议

我们知道协议是一款 rpc 框架的基础。协议里面定义了一次客户端需要携带的信息,包括请求的后端服务名 ServiceName,方法名 Method、超时时间 Timeout、编码 Encoding、认证信息 Authority 等等。

前面我们已经说到了,grpc 是基于 http2 协议的,我们来看看 grpc 协议里面的一些关键信息:

grpc 协议

可以看到,一次请求需要携带这么多信息,server 会根据 client 携带的这些信息来进行相应的处理。那么这些协议里面定义的内容要如何被传递下去呢?

数据承载体

为了回答上面的问题,我们需要一个数据承载体结构,来保存协议里面的一些需要透传的一些重要信息,比如 Method 等。在 grpc 中,这个结构就是 Stream, 我们来看一下 Stream 的定义。

  1. // Stream represents an RPC in the transport layer.
  2. type Stream struct {
  3. id uint32
  4. st ServerTransport // nil for client side Stream
  5. ctx context.Context // the associated context of the stream
  6. cancel context.CancelFunc // always nil for client side Stream
  7. done chan struct{} // closed at the end of stream to unblock writers. On the client side.
  8. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
  9. method string // the associated RPC method of the stream
  10. recvCompress string
  11. sendCompress string
  12. buf *recvBuffer
  13. trReader io.Reader
  14. fc *inFlow
  15. wq *writeQuota
  16. // Callback to state application's intentions to read data. This
  17. // is used to adjust flow control, if needed.
  18. requestRead func(int)
  19. headerChan chan struct{} // closed to indicate the end of header metadata.
  20. headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
  21. // hdrMu protects header and trailer metadata on the server-side.
  22. hdrMu sync.Mutex
  23. // On client side, header keeps the received header metadata.
  24. //
  25. // On server side, header keeps the header set by SetHeader(). The complete
  26. // header will merged into this after t.WriteHeader() is called.
  27. header metadata.MD
  28. trailer metadata.MD // the key-value map of trailer metadata.
  29. noHeaders bool // set if the client never received headers (set only after the stream is done).
  30. // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
  31. headerSent uint32
  32. state streamState
  33. // On client-side it is the status error received from the server.
  34. // On server-side it is unused.
  35. status *status.Status
  36. bytesReceived uint32 // indicates whether any bytes have been received on this stream
  37. unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
  38. // contentSubtype is the content-subtype for requests.
  39. // this must be lowercase or the behavior is undefined.
  40. contentSubtype string
  41. }

server 端 Stream 的构造

接下来我们来看看 server 端 Stream 的构造。前面的内容已经说过 server 的处理流程了。我们直接进入 serveStreams 这个方法。路径为:s.Serve(lis) ——> s.handleRawConn(rawConn) ——> s.serveStreams(st)

  1. func (s *Server) serveStreams(st transport.ServerTransport) {
  2. defer st.Close()
  3. var wg sync.WaitGroup
  4. st.HandleStreams(func(stream *transport.Stream) {
  5. wg.Add(1)
  6. go func() {
  7. defer wg.Done()
  8. s.handleStream(st, stream, s.traceInfo(st, stream))
  9. }()
  10. }, func(ctx context.Context, method string) context.Context {
  11. if !EnableTracing {
  12. return ctx
  13. }
  14. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  15. return trace.NewContext(ctx, tr)
  16. })
  17. wg.Wait()
  18. }

最上层 HandleStreams 是对 http2 数据帧的处理。grpc 一共处理了 MetaHeadersFrame 、DataFrame、RSTStreamFrame、SettingsFrame、PingFrame、WindowUpdateFrame、GoAwayFrame 等 7 种帧。

  1. // HandleStreams receives incoming streams using the given handler. This is
  2. // typically run in a separate goroutine.
  3. // traceCtx attaches trace to ctx and returns the new context.
  4. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  5. defer close(t.readerDone)
  6. for {
  7. frame, err := t.framer.fr.ReadFrame()
  8. atomic.StoreUint32(&t.activity, 1)
  9. if err != nil {
  10. if se, ok := err.(http2.StreamError); ok {
  11. warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
  12. t.mu.Lock()
  13. s := t.activeStreams[se.StreamID]
  14. t.mu.Unlock()
  15. if s != nil {
  16. t.closeStream(s, true, se.Code, false)
  17. } else {
  18. t.controlBuf.put(&cleanupStream{
  19. streamID: se.StreamID,
  20. rst: true,
  21. rstCode: se.Code,
  22. onWrite: func() {},
  23. })
  24. }
  25. continue
  26. }
  27. if err == io.EOF || err == io.ErrUnexpectedEOF {
  28. t.Close()
  29. return
  30. }
  31. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  32. t.Close()
  33. return
  34. }
  35. switch frame := frame.(type) {
  36. case *http2.MetaHeadersFrame:
  37. if t.operateHeaders(frame, handle, traceCtx) {
  38. t.Close()
  39. break
  40. }
  41. case *http2.DataFrame:
  42. t.handleData(frame)
  43. case *http2.RSTStreamFrame:
  44. t.handleRSTStream(frame)
  45. case *http2.SettingsFrame:
  46. t.handleSettings(frame)
  47. case *http2.PingFrame:
  48. t.handlePing(frame)
  49. case *http2.WindowUpdateFrame:
  50. t.handleWindowUpdate(frame)
  51. case *http2.GoAwayFrame:
  52. // TODO: Handle GoAway from the client appropriately.
  53. default:
  54. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  55. }
  56. }
  57. }

对于每一次请求而言,client 一定会先发 HeadersFrame 这个帧,grpc 这里是直接使用 http2 工具包进行实现,直接处理的 MetaHeadersFrame 帧,这个帧的定义为:

  1. // A MetaHeadersFrame is the representation of one HEADERS frame and
  2. // zero or more contiguous CONTINUATION frames and the decoding of
  3. // their HPACK-encoded contents.
  4. //
  5. // This type of frame does not appear on the wire and is only returned
  6. // by the Framer when Framer.ReadMetaHeaders is set.
  7. type MetaHeadersFrame struct {
  8. *HeadersFrame
  9. Fields []hpack.HeaderField
  10. Truncated bool
  11. }

所以是在 MetaHeadersFrame 这个帧里去处理包头数据。所以会去执行 operateHeaders 这个方法,在这个方法里面会去构造一个 stream ,这个 stream 里面包含了传输层请求上下文的数据。包括方法名等。

  1. s := &Stream{
  2. id: streamID,
  3. st: t,
  4. buf: buf,
  5. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  6. recvCompress: state.data.encoding,
  7. method: state.data.method,
  8. contentSubtype: state.data.contentSubtype,
  9. }

构造完 stream 后,接下来 tranport 对数据的处理都会将 stream 层层透传下去。所以整个请求内所需要的数据都从 stream 中可以得到,这样就实现了 server 端的数据流转。

client 端数据流转

与 server 相对应,client 端也有一个 clientStream 结构,定义如下:

  1. // clientStream implements a client side Stream.
  2. type clientStream struct {
  3. callHdr *transport.CallHdr
  4. opts []CallOption
  5. callInfo *callInfo
  6. cc *ClientConn
  7. desc *StreamDesc
  8. codec baseCodec
  9. cp Compressor
  10. comp encoding.Compressor
  11. cancel context.CancelFunc // cancels all attempts
  12. sentLast bool // sent an end stream
  13. beginTime time.Time
  14. methodConfig *MethodConfig
  15. ctx context.Context // the application's context, wrapped by stats/tracing
  16. retryThrottler *retryThrottler // The throttler active when the RPC began.
  17. binlog *binarylog.MethodLogger // Binary logger, can be nil.
  18. // serverHeaderBinlogged is a boolean for whether server header has been
  19. // logged. Server header will be logged when the first time one of those
  20. // happens: stream.Header(), stream.Recv().
  21. //
  22. // It's only read and used by Recv() and Header(), so it doesn't need to be
  23. // synchronized.
  24. serverHeaderBinlogged bool
  25. mu sync.Mutex
  26. firstAttempt bool // if true, transparent retry is valid
  27. numRetries int // exclusive of transparent retry attempt(s)
  28. numRetriesSincePushback int // retries since pushback; to reset backoff
  29. finished bool // TODO: replace with atomic cmpxchg or sync.Once?
  30. attempt *csAttempt // the active client stream attempt
  31. // TODO(hedging): hedging will have multiple attempts simultaneously.
  32. committed bool // active attempt committed for retry?
  33. buffer []func(a *csAttempt) error // operations to replay on retry
  34. bufferSize int // current size of buffer
  35. }

client 的构造就更直接了,在 invoke 发起下游调用时, 直接在 sendMsg 之前就会提前构造 clientStream, 如下:

  1. func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  2. cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
  3. if err != nil {
  4. return err
  5. }
  6. if err := cs.SendMsg(req); err != nil {
  7. return err
  8. }
  9. return cs.RecvMsg(reply)
  10. }

stream 这个结构承载了数据流转之外,同时 grpc 流式传输的实现也是基于 stream 去实现的。