gRPC

transporter/grpc 中基于谷歌的 grpc 框架实现了Transporter,用以注册 grpc 到 kratos.Server() 中。

server

配置

Network()

配置服务端的 network 协议,如 tcp

Address()

配置服务端监听的地址

Timeout()

配置服务端的超时设置

Logger()

配置服务端使用日志

Middleware()

配置服务端的 kratos 中间件

UnaryInterceptor()

配置服务端使用的 grpc 拦截器

Options()

配置一些额外的 grpc.ServerOption

主要的实现细节

NewServer()

  1. func NewServer(opts ...ServerOption) *Server {
  2. // grpc server 默认配置
  3. srv := &Server{
  4. network: "tcp",
  5. address: ":0",
  6. timeout: 1 * time.Second,
  7. health: health.NewServer(),
  8. log: log.NewHelper(log.DefaultLogger),
  9. }
  10. // 递归 opts
  11. for _, o := range opts {
  12. o(srv)
  13. }
  14. // kratos middleware 转换成 grpc 拦截器,并处理一些细节
  15. var ints = []grpc.UnaryServerInterceptor{
  16. srv.unaryServerInterceptor(),
  17. }
  18. if len(srv.ints) > 0 {
  19. ints = append(ints, srv.ints...)
  20. }
  21. // 将 UnaryInterceptor 转换成 ServerOption
  22. var grpcOpts = []grpc.ServerOption{
  23. grpc.ChainUnaryInterceptor(ints...),
  24. }
  25. if len(srv.grpcOpts) > 0 {
  26. grpcOpts = append(grpcOpts, srv.grpcOpts...)
  27. }
  28. // 创建 grpc server
  29. srv.Server = grpc.NewServer(grpcOpts...)
  30. // 创建 metadata server
  31. srv.metadata = apimd.NewServer(srv.Server)
  32. // 内部注册
  33. grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
  34. apimd.RegisterMetadataServer(srv.Server, srv.metadata)
  35. reflection.Register(srv.Server)
  36. return srv
  37. }

unaryServerInterceptor()

  1. func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
  2. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  3. // 把两个 ctx 合并成一个
  4. ctx, cancel := ic.Merge(ctx, s.ctx)
  5. defer cancel()
  6. // 从 ctx 中取出 metadata
  7. md, _ := grpcmd.FromIncomingContext(ctx)
  8. // 把一些信息绑定到 ctx 上
  9. ctx = transport.NewServerContext(ctx, &Transport{
  10. endpoint: s.endpoint.String(),
  11. operation: info.FullMethod,
  12. header: headerCarrier(md),
  13. })
  14. // ctx 超时设置
  15. if s.timeout > 0 {
  16. ctx, cancel = context.WithTimeout(ctx, s.timeout)
  17. defer cancel()
  18. }
  19. // 中间件处理
  20. h := func(ctx context.Context, req interface{}) (interface{}, error) {
  21. return handler(ctx, req)
  22. }
  23. if len(s.middleware) > 0 {
  24. h = middleware.Chain(s.middleware...)(h)
  25. }
  26. return h(ctx, req)
  27. }
  28. }

使用方式

简单列举了一些 kratos 中 grpc 的用法,其他 grpc 用法可以到 grpc 仓库中查看。

注册 grpc server

  1. gs := grpc.NewServer()
  2. app := kratos.New(
  3. kratos.Name("kratos"),
  4. kratos.Version("v1.0.0"),
  5. kratos.Server(gs),
  6. )

grpc server 中使用 kratos middleware

  1. grpcSrv := grpc.NewServer(
  2. grpc.Address(":9000"),
  3. grpc.Middleware(
  4. logging.Server(),
  5. ),
  6. )

middleware 中处理 grpc 请求

  1. if info, ok := transport.FromServerContext(ctx); ok {
  2. kind = info.Kind().String()
  3. operation = info.Operation()
  4. }

client

配置

WithEndpoint()

配置客户端使用的对端连接地址,如果不使用服务发现则为ip:port,如果使用服务发现则格式为discovery://\<authority>/\<serviceName>

WithTimeout()

配置客户端的请求默认超时时间,如果有链路超时优先使用链路超时时间

WithMiddleware()

配置客户端使用的 kratos 中间件

WithDiscovery()

配置客户端使用的服务发现

WithUnaryInterceptor()

配置客户端使用的 grpc 原生拦截器

WithOptions()

配置一些额外的 grpc.ClientOption

主要的实现细节

dial()

  1. func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
  2. // 默认配置
  3. options := clientOptions{
  4. timeout: 500 * time.Millisecond,
  5. }
  6. // 遍历 opts
  7. for _, o := range opts {
  8. o(&options)
  9. }
  10. // 将 kratos 中间件转化成 grpc 拦截器
  11. var ints = []grpc.UnaryClientInterceptor{
  12. unaryClientInterceptor(options.middleware, options.timeout),
  13. }
  14. if len(options.ints) > 0 {
  15. ints = append(ints, options.ints...)
  16. }
  17. var grpcOpts = []grpc.DialOption{
  18. // 负载均衡
  19. grpc.WithBalancerName(roundrobin.Name),
  20. grpc.WithChainUnaryInterceptor(ints...),
  21. }
  22. if options.discovery != nil {
  23. // 如果存在服务发现配置,就配置 grpc 的 Resolvers
  24. grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
  25. }
  26. if insecure {
  27. // 跳过证书验证
  28. grpcOpts = append(grpcOpts, grpc.WithInsecure())
  29. }
  30. if len(options.grpcOpts) > 0 {
  31. grpcOpts = append(grpcOpts, options.grpcOpts...)
  32. }
  33. return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
  34. }
  1. func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
  2. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  3. // 把一些信息绑定到 ctx 上
  4. ctx = transport.NewClientContext(ctx, &Transport{
  5. endpoint: cc.Target(),
  6. operation: method,
  7. header: headerCarrier{},
  8. })
  9. if timeout > 0 {
  10. // timeout 如果大于 0,就重新设置一下 ctx 的超时时间
  11. var cancel context.CancelFunc
  12. ctx, cancel = context.WithTimeout(ctx, timeout)
  13. defer cancel()
  14. }
  15. // 中间件处理
  16. h := func(ctx context.Context, req interface{}) (interface{}, error) {
  17. if tr, ok := transport.FromClientContext(ctx); ok {
  18. keys := tr.Header().Keys()
  19. keyvals := make([]string, 0, len(keys))
  20. for _, k := range keys {
  21. keyvals = append(keyvals, k, tr.Header().Get(k))
  22. }
  23. ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
  24. }
  25. return reply, invoker(ctx, method, req, reply, cc, opts...)
  26. }
  27. if len(ms) > 0 {
  28. h = middleware.Chain(ms...)(h)
  29. }
  30. _, err := h(ctx, req)
  31. return err
  32. }
  33. }

使用方式

创建客户端连接

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. grpc.WithEndpoint("127.0.0.1:9000"),
  4. )

使用中间件

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. transport.WithEndpoint("127.0.0.1:9000"),
  4. transport.WithMiddleware(
  5. recovery.Recovery(),
  6. ),
  7. )

使用服务发现

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. grpc.WithEndpoint("discovery:///helloworld"),
  4. grpc.WithDiscovery(r),
  5. )

References