http2 协议帧格式

我们知道网络传输都是以二进制的形式,所以所有的协议底层的传输也是二进制。那么问题来了,client 往 server 发一个数据包,server 如何知道数据包是完成还是还在发送呢?又或者,假如一个数据包过大,client 需要拆成几个包发送,或者数据包过小,client 需要合成一个包发送,server 如何识别呢?为了解决这些问题,client 和 server 都会约定好一些双方都能理解的“规则“,这就是协议。

我们知道 grpc 传输层是基于 http2 协议规范,所以我们先要了解 http2 协议帧的格式。http 2 协议帧格式如下:

  1. Frame Format
  2. All frames begin with a fixed 9-octet header followed by a variable-length payload.
  3. +-----------------------------------------------+
  4. | Length (24) |
  5. +---------------+---------------+---------------+
  6. | Type (8) | Flags (8) |
  7. +-+-------------+---------------+-------------------------------+
  8. |R| Stream Identifier (31) |
  9. +=+=============================================================+
  10. | Frame Payload (0...) ...
  11. +---------------------------------------------------------------+

对于一个网络包而言,首先要知道这个包的格式,然后才能按照约定的格式解析出这个包。那么 grpc 的包是什么样的格式呢? 看了源码后,先直接揭晓出来,它其实是这样的

11. http2 协议帧格式 - 图1

http 帧格式为:length (3 byte) + type(1 byte) + flag (1 byte) + R (1 bit) + stream identifier (31 bit) + paypoad,payload 是消息具体内容

前 9 个字节是 http 包头,length 表示消息长度,type 表示 http 帧的类型,http 一共规定了 10 种帧类型:

  • HEADERS帧 头信息,对应于HTTP HEADER
  • DATA帧 对应于HTTP Response Body
  • PRIORITY帧 用于调整流的优先级
  • RST_STREAM帧 流终止帧,用于中断资源的传输
  • SETTINGS帧 用户客户服务器交流连接配置信息
  • PUSH_PROMISE帧 服务器向客户端主动推送资源
  • GOAWAY帧 通知对方断开连接
  • PING帧 心跳帧,检测往返时间和连接可用性
  • WINDOW_UPDATE帧 调整帧大小
  • CONTINUATION帧 HEADERS太大时的续帧

flag 表示标志位,http 一共三种标志位:

  • END_STREAM 流结束标志,表示当前帧是流的最后一个帧
  • END_HEADERS 头结束表示,表示当前帧是头信息的最后一个帧
  • PADDED 填充标志,在数据Payload里填充无用信息,用于干扰信道监听

R 是 1bit 的保留位,stream identifier 是流 id,http 会为每一个数据流分配一个 id

具体可以参考:http frame

解析 http 帧头

回到 examples 目录的 helloworld 目录, 在 server main 函数中跟踪 s.Serve 方法: s.Serve() ——> s.handleRawConn(rawConn) ——> s.serveStreams(st)

来看一下 serveStreams 这个方法

  1. func (s *Server) serveStreams(st transport.ServerTransport) {
  2. defer st.Close()
  3. var wg sync.WaitGroup
  4. st.HandleStreams(func(stream *transport.Stream) {
  5. wg.Add(1)
  6. go func() {
  7. defer wg.Done()
  8. s.handleStream(st, stream, s.traceInfo(st, stream))
  9. }()
  10. }, func(ctx context.Context, method string) context.Context {
  11. if !EnableTracing {
  12. return ctx
  13. }
  14. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  15. return trace.NewContext(ctx, tr)
  16. })
  17. wg.Wait()
  18. }

这里调用了 transport 的 HandleStreams 方法, 这个方法就是 http 帧的处理的具体实现。它的底层直接调用的 http2 包的 ReadFrame 方法去读取一个 http 帧数据。

  1. type framer struct {
  2. writer *bufWriter
  3. fr *http2.Framer
  4. }
  5. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  6. defer close(t.readerDone)
  7. for {
  8. frame, err := t.framer.fr.ReadFrame()
  9. atomic.StoreUint32(&t.activity, 1)
  10. ...
  11. switch frame := frame.(type) {
  12. case *http2.MetaHeadersFrame:
  13. if t.operateHeaders(frame, handle, traceCtx) {
  14. t.Close()
  15. break
  16. }
  17. case *http2.DataFrame:
  18. t.handleData(frame)
  19. case *http2.RSTStreamFrame:
  20. t.handleRSTStream(frame)
  21. case *http2.SettingsFrame:
  22. t.handleSettings(frame)
  23. case *http2.PingFrame:
  24. t.handlePing(frame)
  25. case *http2.WindowUpdateFrame:
  26. t.handleWindowUpdate(frame)
  27. case *http2.GoAwayFrame:
  28. // TODO: Handle GoAway from the client appropriately.
  29. default:
  30. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  31. }
  32. }
  33. }

通过 http2 包的 ReadFrame 直接读取出一个帧的数据。

  1. func (fr *Framer) ReadFrame() (Frame, error) {
  2. fr.errDetail = nil
  3. if fr.lastFrame != nil {
  4. fr.lastFrame.invalidate()
  5. }
  6. fh, err := readFrameHeader(fr.headerBuf[:], fr.r)
  7. if err != nil {
  8. return nil, err
  9. }
  10. if fh.Length > fr.maxReadSize {
  11. return nil, ErrFrameTooLarge
  12. }
  13. payload := fr.getReadBuf(fh.Length)
  14. if _, err := io.ReadFull(fr.r, payload); err != nil {
  15. return nil, err
  16. }
  17. f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
  18. if err != nil {
  19. if ce, ok := err.(connError); ok {
  20. return nil, fr.connError(ce.Code, ce.Reason)
  21. }
  22. return nil, err
  23. }
  24. if err := fr.checkFrameOrder(f); err != nil {
  25. return nil, err
  26. }
  27. if fr.logReads {
  28. fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
  29. }
  30. if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
  31. return fr.readMetaFrame(f.(*HeadersFrame))
  32. }
  33. return f, nil
  34. }

fh, err := readFrameHeader(fr.headerBuf[:], fr.r) 这一行代码读取了 http 的包头数据,我们来看一下 headerBuf 的长度,发现果然是 9 个字节。

  1. const frameHeaderLen = 9
  2. func readFrameHeader(buf []byte, r io.Reader) (FrameHeader, error) {
  3. _, err := io.ReadFull(r, buf[:frameHeaderLen])
  4. if err != nil {
  5. return FrameHeader{}, err
  6. }
  7. return FrameHeader{
  8. Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
  9. Type: FrameType(buf[3]),
  10. Flags: Flags(buf[4]),
  11. StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
  12. valid: true,
  13. }, nil
  14. }

解析业务数据

经过上面的过程,我们终于将 http 包头给读出来了。前面说到了,读出 http 包体之后,还需要解析 grpc 协议头。那这部分是怎么去解析的呢?

回到 http2 读帧的部分,当发现帧的格式是 MetaHeadersFrame,也就是第一个帧时,会调用 operateHeaders 方法

  1. case *http2.MetaHeadersFrame:
  2. if t.operateHeaders(frame, handle, traceCtx) {
  3. t.Close()
  4. break
  5. }

看一下 operateHeaders ,里面会去调用 handle(s) , 这个handle 其实是 之前 s.Serve() ——> s.handleRawConn(rawConn) ——> s.serveStreams(st) 这个路径下的 HandleStreams 方法传入的,也就是会去调用 handleStream 这个方法

  1. st.HandleStreams(func(stream *transport.Stream) {
  2. wg.Add(1)
  3. go func() {
  4. defer wg.Done()
  5. s.handleStream(st, stream, s.traceInfo(st, stream))
  6. }()
  7. }, func(ctx context.Context, method string) context.Context {
  8. if !EnableTracing {
  9. return ctx
  10. }
  11. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  12. return trace.NewContext(ctx, tr)
  13. })

s.handleStream(st, stream, s.traceInfo(st, stream)) ——> s.processUnaryRPC(t, stream, srv, md, trInfo) ———> d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) ,进入 recvAndDecompress 这个函数,里面调用了

  1. pf, d, err := p.recvMsg(maxReceiveMessageSize)

进入 recvMsg,发现它就是解析 grpc 协议 的函数,先把协议头读出来,用了 5 个字节。从协议头中得知协议体消息的长度,然后用一个相应长度的 byte 数组把协议体读出来

  1. type parser struct {
  2. r io.Reader
  3. header [5]byte
  4. }
  5. func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
  6. if _, err := p.r.Read(p.header[:]); err != nil {
  7. return 0, nil, err
  8. }
  9. pf = payloadFormat(p.header[0])
  10. length := binary.BigEndian.Uint32(p.header[1:])
  11. ...
  12. msg = make([]byte, int(length))
  13. if _, err := p.r.Read(msg); err != nil {
  14. if err == io.EOF {
  15. err = io.ErrUnexpectedEOF
  16. }
  17. return 0, nil, err
  18. }
  19. return pf, msg, nil
  20. }

继续回到这个图,前面说到了 grpc 协议头是 5个字节。 11. http2 协议帧格式 - 图2 compressed-flag 表示是否压缩,值为 1 是压缩消息体数据,0 不压缩。 length 表示消息体数据长度。现在终于知道了这个数据结构的由来!

读取出来的数据是二进制的,读出来原数据之后呢,我们就可以针对相应的数据做解包操作了。这里可以参考我的另一篇文章 :10-grpc 协议编解码器