说明

gRPC暴露了两个拦截器接口,分别是:

  • grpc.UnaryServerInterceptor服务端拦截器
  • grpc.UnaryClientInterceptor客户端拦截器

基于两个拦截器可以针对性的定制公共模块的封装代码,比如warden/logging.go是通用日志逻辑。

分析

服务端拦截器

让我们先看一下grpc.UnaryServerInterceptor的声明,官方代码位置

  1. // UnaryServerInfo consists of various information about a unary RPC on
  2. // server side. All per-rpc information may be mutated by the interceptor.
  3. type UnaryServerInfo struct {
  4. // Server is the service implementation the user provides. This is read-only.
  5. Server interface{}
  6. // FullMethod is the full RPC method string, i.e., /package.service/method.
  7. FullMethod string
  8. }
  9. // UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal
  10. // execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the
  11. // status package, or else gRPC will use codes.Unknown as the status code and err.Error() as
  12. // the status message of the RPC.
  13. type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)
  14. // UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
  15. // contains all the information of this RPC the interceptor can operate on. And handler is the wrapper
  16. // of the service method implementation. It is the responsibility of the interceptor to invoke handler
  17. // to complete the RPC.
  18. type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

看起来很简单包括:

  • 一个UnaryServerInfo结构体用于ServerFullMethod字段传递,ServergRPC server的对象实例,FullMethod为请求方法的全名
  • 一个UnaryHandler方法用于传递Handler,就是基于proto文件service内声明而生成的方法
  • 一个UnaryServerInterceptor用于拦截Handler方法,可在Handler执行前后插入拦截代码

为了更形象的说明拦截器的执行过程,请看基于proto生成的以下代码代码位置

  1. func _Demo_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
  2. in := new(HelloReq)
  3. if err := dec(in); err != nil {
  4. return nil, err
  5. }
  6. if interceptor == nil {
  7. return srv.(DemoServer).SayHello(ctx, in)
  8. }
  9. info := &grpc.UnaryServerInfo{
  10. Server: srv,
  11. FullMethod: "/demo.service.v1.Demo/SayHello",
  12. }
  13. handler := func(ctx context.Context, req interface{}) (interface{}, error) {
  14. return srv.(DemoServer).SayHello(ctx, req.(*HelloReq))
  15. }
  16. return interceptor(ctx, in, info, handler)
  17. }

这个_Demo_SayHello_Handler方法是关键,该方法会被包装为grpc.ServiceDesc结构,被注册到gRPC内部,具体可在生成的pb.go代码内查找s.RegisterService(&_Demo_serviceDesc, srv)

  • gRPC server收到一次请求时,首先根据请求方法从注册到server内的grpc.ServiceDesc找到该方法对应的Handler如:_Demo_SayHello_Handler并执行
  • _Demo_SayHello_Handler执行过程请看上面具体代码,当interceptor不为nil时,会将SayHello包装为grpc.UnaryHandler结构传递给interceptor

这样就完成了UnaryServerInterceptor的执行过程。那么_Demo_SayHello_Handler内的interceptor是如何注入到gRPC server内,则看下面这段代码官方代码位置

  1. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  2. // server. Only one unary interceptor can be installed. The construction of multiple
  3. // interceptors (e.g., chaining) can be implemented at the caller.
  4. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  5. return func(o *options) {
  6. if o.unaryInt != nil {
  7. panic("The unary server interceptor was already set and may not be reset.")
  8. }
  9. o.unaryInt = i
  10. }
  11. }

请一定注意这方法的注释!!!

Only one unary interceptor can be installed. The construction of multiple interceptors (e.g., chaining) can be implemented at the caller.

gRPC本身只支持一个interceptor,想要多interceptors需要自己实现~~所以warden基于grpc.UnaryClientInterceptor实现了interceptor chain,请看下面代码代码位置

  1. // Use attachs a global inteceptor to the server.
  2. // For example, this is the right place for a rate limiter or error management inteceptor.
  3. func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server {
  4. finalSize := len(s.handlers) + len(handlers)
  5. if finalSize >= int(_abortIndex) {
  6. panic("warden: server use too many handlers")
  7. }
  8. mergedHandlers := make([]grpc.UnaryServerInterceptor, finalSize)
  9. copy(mergedHandlers, s.handlers)
  10. copy(mergedHandlers[len(s.handlers):], handlers)
  11. s.handlers = mergedHandlers
  12. return s
  13. }
  14. // interceptor is a single interceptor out of a chain of many interceptors.
  15. // Execution is done in left-to-right order, including passing of context.
  16. // For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
  17. // will see context changes of one and two.
  18. func (s *Server) interceptor(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  19. var (
  20. i int
  21. chain grpc.UnaryHandler
  22. )
  23. n := len(s.handlers)
  24. if n == 0 {
  25. return handler(ctx, req)
  26. }
  27. chain = func(ic context.Context, ir interface{}) (interface{}, error) {
  28. if i == n-1 {
  29. return handler(ic, ir)
  30. }
  31. i++
  32. return s.handlers[i](ic, ir, args, chain)
  33. }
  34. return s.handlers[0](ctx, req, args, chain)
  35. }

很简单的逻辑:

  • warden server使用Use方法进行grpc.UnaryServerInterceptor的注入,而func (s *Server) interceptor本身就实现了grpc.UnaryServerInterceptor
  • func (s *Server) interceptor可以根据注册的grpc.UnaryServerInterceptor顺序从前到后依次执行

warden在初始化的时候将该方法本身注册到了gRPC server,在NewServer方法内可以看到下面代码:

  1. opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor))
  2. s.server = grpc.NewServer(opt...)

如此完整的服务端拦截器逻辑就串联完成。

客户端拦截器

让我们先看一下grpc.UnaryClientInterceptor的声明,官方代码位置

  1. // UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
  2. type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
  3. // UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC
  4. // and it is the responsibility of the interceptor to call it.
  5. // This is an EXPERIMENTAL API.
  6. type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

看起来和服务端拦截器并没有什么太大的区别,比较简单包括:

  • 一个UnaryInvoker表示客户端具体要发出的执行方法
  • 一个UnaryClientInterceptor用于拦截Invoker方法,可在Invoker执行前后插入拦截代码

具体执行过程,请看基于proto生成的下面代码代码位置

  1. func (c *demoClient) SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) {
  2. out := new(google_protobuf1.Empty)
  3. err := grpc.Invoke(ctx, "/demo.service.v1.Demo/SayHello", in, out, c.cc, opts...)
  4. if err != nil {
  5. return nil, err
  6. }
  7. return out, nil
  8. }

当客户端调用SayHello时可以看到执行了grpc.Invoke方法,并且将fullMethod和其他参数传入,最终会执行下面代码官方代码位置

  1. // Invoke sends the RPC request on the wire and returns after response is
  2. // received. This is typically called by generated code.
  3. //
  4. // All errors returned by Invoke are compatible with the status package.
  5. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
  6. // allow interceptor to see all applicable call options, which means those
  7. // configured as defaults from dial option as well as per-call options
  8. opts = combine(cc.dopts.callOptions, opts)
  9. if cc.dopts.unaryInt != nil {
  10. return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
  11. }
  12. return invoke(ctx, method, args, reply, cc, opts...)
  13. }

其中的unaryInt即为客户端连接创建时注册的拦截器,使用下面代码注册官方代码位置

  1. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for
  2. // unary RPCs.
  3. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  4. return newFuncDialOption(func(o *dialOptions) {
  5. o.unaryInt = f
  6. })
  7. }

需要注意的是客户端的拦截器在官方gRPC内也只能支持注册一个,与服务端拦截器interceptor chain逻辑类似warden在客户端拦截器也做了相同处理,并且在客户端连接时进行注册,请看下面代码代码位置

  1. // Use attachs a global inteceptor to the Client.
  2. // For example, this is the right place for a circuit breaker or error management inteceptor.
  3. func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client {
  4. finalSize := len(c.handlers) + len(handlers)
  5. if finalSize >= int(_abortIndex) {
  6. panic("warden: client use too many handlers")
  7. }
  8. mergedHandlers := make([]grpc.UnaryClientInterceptor, finalSize)
  9. copy(mergedHandlers, c.handlers)
  10. copy(mergedHandlers[len(c.handlers):], handlers)
  11. c.handlers = mergedHandlers
  12. return c
  13. }
  14. // chainUnaryClient creates a single interceptor out of a chain of many interceptors.
  15. //
  16. // Execution is done in left-to-right order, including passing of context.
  17. // For example ChainUnaryClient(one, two, three) will execute one before two before three.
  18. func (c *Client) chainUnaryClient() grpc.UnaryClientInterceptor {
  19. n := len(c.handlers)
  20. if n == 0 {
  21. return func(ctx context.Context, method string, req, reply interface{},
  22. cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  23. return invoker(ctx, method, req, reply, cc, opts...)
  24. }
  25. }
  26. return func(ctx context.Context, method string, req, reply interface{},
  27. cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  28. var (
  29. i int
  30. chainHandler grpc.UnaryInvoker
  31. )
  32. chainHandler = func(ictx context.Context, imethod string, ireq, ireply interface{}, ic *grpc.ClientConn, iopts ...grpc.CallOption) error {
  33. if i == n-1 {
  34. return invoker(ictx, imethod, ireq, ireply, ic, iopts...)
  35. }
  36. i++
  37. return c.handlers[i](ictx, imethod, ireq, ireply, ic, chainHandler, iopts...)
  38. }
  39. return c.handlers[0](ctx, method, req, reply, cc, chainHandler, opts...)
  40. }
  41. }

如此完整的客户端拦截器逻辑就串联完成。

实现自己的拦截器

以服务端拦截器logging为例:

  1. // serverLogging warden grpc logging
  2. func serverLogging() grpc.UnaryServerInterceptor {
  3. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  4. // NOTE: handler执行之前的拦截代码:主要获取一些关键参数,如耗时计时、ip等
  5. // 如果自定义的拦截器只需要在handler执行后,那么可以直接执行handler
  6. startTime := time.Now()
  7. caller := metadata.String(ctx, metadata.Caller)
  8. if caller == "" {
  9. caller = "no_user"
  10. }
  11. var remoteIP string
  12. if peerInfo, ok := peer.FromContext(ctx); ok {
  13. remoteIP = peerInfo.Addr.String()
  14. }
  15. var quota float64
  16. if deadline, ok := ctx.Deadline(); ok {
  17. quota = time.Until(deadline).Seconds()
  18. }
  19. // call server handler
  20. resp, err := handler(ctx, req) // NOTE: 以具体执行的handler为分界线!!!
  21. // NOTE: handler执行之后的拦截代码:主要进行耗时计算、日志记录
  22. // 如果自定义的拦截器在handler执行后不需要逻辑,这可直接返回
  23. // after server response
  24. code := ecode.Cause(err).Code()
  25. duration := time.Since(startTime)
  26. // monitor
  27. statsServer.Timing(caller, int64(duration/time.Millisecond), info.FullMethod)
  28. statsServer.Incr(caller, info.FullMethod, strconv.Itoa(code))
  29. logFields := []log.D{
  30. log.KVString("user", caller),
  31. log.KVString("ip", remoteIP),
  32. log.KVString("path", info.FullMethod),
  33. log.KVInt("ret", code),
  34. // TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc.
  35. log.KVString("args", req.(fmt.Stringer).String()),
  36. log.KVFloat64("ts", duration.Seconds()),
  37. log.KVFloat64("timeout_quota", quota),
  38. log.KVString("source", "grpc-access-log"),
  39. }
  40. if err != nil {
  41. logFields = append(logFields, log.KV("error", err.Error()), log.KV("stack", fmt.Sprintf("%+v", err)))
  42. }
  43. logFn(code, duration)(ctx, logFields...)
  44. return resp, err
  45. }
  46. }

内置拦截器

自适应限流拦截器

更多关于自适应限流的信息,请参考:kratos 自适应限流

  1. package grpc
  2. import (
  3. pb "kratos-demo/api"
  4. "kratos-demo/internal/service"
  5. "github.com/go-kratos/kratos/pkg/conf/paladin"
  6. "github.com/go-kratos/kratos/pkg/net/rpc/warden"
  7. "github.com/go-kratos/kratos/pkg/net/rpc/warden/ratelimiter"
  8. )
  9. // New new a grpc server.
  10. func New(svc *service.Service) *warden.Server {
  11. var rc struct {
  12. Server *warden.ServerConfig
  13. }
  14. if err := paladin.Get("grpc.toml").UnmarshalTOML(&rc); err != nil {
  15. if err != paladin.ErrNotExist {
  16. panic(err)
  17. }
  18. }
  19. ws := warden.NewServer(rc.Server)
  20. // 挂载自适应限流拦截器到 warden server,使用默认配置
  21. limiter := ratelimiter.New(nil)
  22. ws.Use(limiter.Limit())
  23. // 注意替换这里:
  24. // RegisterDemoServer方法是在"api"目录下代码生成的
  25. // 对应proto文件内自定义的service名字,请使用正确方法名替换
  26. pb.RegisterDemoServer(ws.Server(), svc)
  27. ws, err := ws.Start()
  28. if err != nil {
  29. panic(err)
  30. }
  31. return ws
  32. }

扩展阅读

warden快速开始
warden基于pb生成
warden负载均衡
warden服务发现