gRPC

Our transporter/grpc is developed upon gRPC, and implements Transporter interface. You could use it for the communication between services on gRPC protocol.

Server

Options

Network()

To set communication protocol such as tcp.

Address()

To set server’s listening address.

Timeout()

To set the server-side timeout.

Logger()

To set logger.

Middleware()

To set middleware for gRPC server.

UnaryInterceptor()

To set interceptors for gRPC server.

Options()

To set some extra grpc.ServerOption

Implementation Details

NewServer()

  1. func NewServer(opts ...ServerOption) *Server {
  2. // grpc server default configuration
  3. srv := &Server{
  4. network: "tcp",
  5. address: ":0",
  6. timeout: 1 * time.Second,
  7. health: health.NewServer(),
  8. log: log.NewHelper(log.DefaultLogger),
  9. }
  10. // apply opts
  11. for _, o := range opts {
  12. o(srv)
  13. }
  14. // convert middleware to grpc interceptor
  15. var ints = []grpc.UnaryServerInterceptor{
  16. srv.unaryServerInterceptor(),
  17. }
  18. if len(srv.ints) > 0 {
  19. ints = append(ints, srv.ints...)
  20. }
  21. // convert UnaryInterceptor to ServerOption
  22. var grpcOpts = []grpc.ServerOption{
  23. grpc.ChainUnaryInterceptor(ints...),
  24. }
  25. if len(srv.grpcOpts) > 0 {
  26. grpcOpts = append(grpcOpts, srv.grpcOpts...)
  27. }
  28. // create grpc server
  29. srv.Server = grpc.NewServer(grpcOpts...)
  30. // create metadata server
  31. srv.metadata = apimd.NewServer(srv.Server)
  32. // register these internal API
  33. grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
  34. apimd.RegisterMetadataServer(srv.Server, srv.metadata)
  35. reflection.Register(srv.Server)
  36. return srv
  37. }

unaryServerInterceptor()

  1. func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
  2. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  3. // merge two ctx
  4. ctx, cancel := ic.Merge(ctx, s.ctx)
  5. defer cancel()
  6. // get metadata from ctx
  7. md, _ := grpcmd.FromIncomingContext(ctx)
  8. // bind some information into ctx
  9. ctx = transport.NewServerContext(ctx, &Transport{
  10. endpoint: s.endpoint.String(),
  11. operation: info.FullMethod,
  12. header: headerCarrier(md),
  13. })
  14. // set timeout
  15. if s.timeout > 0 {
  16. ctx, cancel = context.WithTimeout(ctx, s.timeout)
  17. defer cancel()
  18. }
  19. // middleware
  20. h := func(ctx context.Context, req interface{}) (interface{}, error) {
  21. return handler(ctx, req)
  22. }
  23. if len(s.middleware) > 0 {
  24. h = middleware.Chain(s.middleware...)(h)
  25. }
  26. return h(ctx, req)
  27. }
  28. }

Usage

These are some basic usage of gRPC, you could refer to gRPC Docs for advanced examples.

Register gRPC Server

  1. gs := grpc.NewServer()
  2. app := kratos.New(
  3. kratos.Name("kratos"),
  4. kratos.Version("v1.0.0"),
  5. kratos.Server(gs),
  6. )

Set middleware in gRPC Server

  1. grpcSrv := grpc.NewServer(
  2. grpc.Address(":9000"),
  3. grpc.Middleware(
  4. logging.Server(),
  5. ),
  6. )

Process Request in gRPC Middleware

  1. if info, ok := transport.FromServerContext(ctx); ok {
  2. kind = info.Kind().String()
  3. operation = info.Operation()
  4. }

client

Options

WithEndpoint()

To set the endpoint which the client will connect to.

WithTimeout()

To set the client-side timeout.

WithMiddleware()

To set middleware.

WithDiscovery()

To set the discovery for gRPC client.

WithUnaryInterceptor()

To set interceptors for gRPC client.

WithOptions()

To set some extra grpc.ClientOption

Implementation Details

dial()

  1. func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
  2. // default options
  3. options := clientOptions{
  4. timeout: 500 * time.Millisecond,
  5. }
  6. // apply opts
  7. for _, o := range opts {
  8. o(&options)
  9. }
  10. // convert middleware to grpc interceptor
  11. var ints = []grpc.UnaryClientInterceptor{
  12. unaryClientInterceptor(options.middleware, options.timeout),
  13. }
  14. if len(options.ints) > 0 {
  15. ints = append(ints, options.ints...)
  16. }
  17. var grpcOpts = []grpc.DialOption{
  18. // client side balancer
  19. grpc.WithBalancerName(roundrobin.Name),
  20. grpc.WithChainUnaryInterceptor(ints...),
  21. }
  22. if options.discovery != nil {
  23. // To use service discovery
  24. grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
  25. }
  26. if insecure {
  27. // to disable transport security for connection
  28. grpcOpts = append(grpcOpts, grpc.WithInsecure())
  29. }
  30. if len(options.grpcOpts) > 0 {
  31. grpcOpts = append(grpcOpts, options.grpcOpts...)
  32. }
  33. return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
  34. }

unaryClientInterceptor()

  1. func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
  2. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  3. // bind some information into ctx
  4. ctx = transport.NewClientContext(ctx, &Transport{
  5. endpoint: cc.Target(),
  6. operation: method,
  7. header: headerCarrier{},
  8. })
  9. if timeout > 0 {
  10. // set the timeout
  11. var cancel context.CancelFunc
  12. ctx, cancel = context.WithTimeout(ctx, timeout)
  13. defer cancel()
  14. }
  15. // middleware
  16. h := func(ctx context.Context, req interface{}) (interface{}, error) {
  17. if tr, ok := transport.FromClientContext(ctx); ok {
  18. keys := tr.Header().Keys()
  19. keyvals := make([]string, 0, len(keys))
  20. for _, k := range keys {
  21. keyvals = append(keyvals, k, tr.Header().Get(k))
  22. }
  23. ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
  24. }
  25. return reply, invoker(ctx, method, req, reply, cc, opts...)
  26. }
  27. if len(ms) > 0 {
  28. h = middleware.Chain(ms...)(h)
  29. }
  30. _, err := h(ctx, req)
  31. return err
  32. }
  33. }

Usage

Client Connection

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. grpc.WithEndpoint("127.0.0.1:9000"),
  4. )

Middleware

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. transport.WithEndpoint("127.0.0.1:9000"),
  4. transport.WithMiddleware(
  5. recovery.Recovery(),
  6. ),
  7. )

Service Discovery

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. grpc.WithEndpoint("discovery:///helloworld"),
  4. grpc.WithDiscovery(r),
  5. )

References