gRPC

Our transporter/grpc is developed upon gRPC, and implements Transporter interface. You could use it for the communication between services on gRPC protocol.

Server

Options

Network(network string) ServerOption

To set communication protocol such as tcp.

Address(addr string) ServerOption

To set server’s listening address.

Timeout(timeout time.Duration) ServerOption

To set the server-side timeout.

Logger(logger log.Logger) ServerOption

To set logger.

Middleware(m ...middleware.Middleware) ServerOption

To set middleware for gRPC server.

TLSConfig(c *tls.Config) ServerOption

To set TLS config.

UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption

To set interceptors for gRPC server.

StreamInterceptor(in ...grpc.StreamServerInterceptor) ServerOption

To set stream interceptors for gRPC server.

Options(opts ...grpc.ServerOption) ServerOption

To set some extra grpc.ServerOption.

Implementation Details

NewServer()

  1. func NewServer(opts ...ServerOption) *Server {
  2. // grpc server default configuration
  3. srv := &Server{
  4. network: "tcp",
  5. address: ":0",
  6. timeout: 1 * time.Second,
  7. health: health.NewServer(),
  8. log: log.NewHelper(log.GetLogger()),
  9. }
  10. // apply opts
  11. for _, o := range opts {
  12. o(srv)
  13. }
  14. // convert middleware to grpc interceptor
  15. unaryInts := []grpc.UnaryServerInterceptor{
  16. srv.unaryServerInterceptor(),
  17. }
  18. streamInts := []grpc.StreamServerInterceptor{
  19. srv.streamServerInterceptor(),
  20. }
  21. if len(srv.unaryInts) > 0 {
  22. unaryInts = append(unaryInts, srv.unaryInts...)
  23. }
  24. if len(srv.streamInts) > 0 {
  25. streamInts = append(streamInts, srv.streamInts...)
  26. }
  27. // convert UnaryInterceptor and StreamInterceptor to ServerOption
  28. var grpcOpts = []grpc.ServerOption{
  29. grpc.ChainUnaryInterceptor(unaryInts...),
  30. grpc.ChainStreamInterceptor(streamInts...),
  31. }
  32. // convert LTS config to ServerOption
  33. if srv.tlsConf != nil {
  34. grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(srv.tlsConf)))
  35. }
  36. // convert srv.grpcOpts to ServerOption
  37. if len(srv.grpcOpts) > 0 {
  38. grpcOpts = append(grpcOpts, srv.grpcOpts...)
  39. }
  40. // create grpc server
  41. srv.Server = grpc.NewServer(grpcOpts...)
  42. // create metadata server
  43. srv.metadata = apimd.NewServer(srv.Server)
  44. // set lis and endpoint
  45. srv.err = srv.listenAndEndpoint()
  46. // register these internal API
  47. grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
  48. apimd.RegisterMetadataServer(srv.Server, srv.metadata)
  49. reflection.Register(srv.Server)
  50. return srv
  51. }

unaryServerInterceptor()

  1. func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
  2. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  3. // merge two ctx
  4. ctx, cancel := ic.Merge(ctx, s.ctx)
  5. defer cancel()
  6. // get metadata from ctx
  7. md, _ := grpcmd.FromIncomingContext(ctx)
  8. // bind some information into ctx
  9. replyHeader := grpcmd.MD{}
  10. ctx = transport.NewServerContext(ctx, &Transport{
  11. endpoint: s.endpoint.String(),
  12. operation: info.FullMethod,
  13. reqHeader: headerCarrier(md),
  14. replyHeader: headerCarrier(replyHeader),
  15. })
  16. // set timeout
  17. if s.timeout > 0 {
  18. ctx, cancel = context.WithTimeout(ctx, s.timeout)
  19. defer cancel()
  20. }
  21. // middleware
  22. h := func(ctx context.Context, req interface{}) (interface{}, error) {
  23. return handler(ctx, req)
  24. }
  25. if len(s.middleware) > 0 {
  26. h = middleware.Chain(s.middleware...)(h)
  27. }
  28. // execute handler
  29. reply, err := h(ctx, req)
  30. if len(replyHeader) > 0 {
  31. _ = grpc.SetHeader(ctx, replyHeader)
  32. }
  33. return reply, err
  34. }
  35. }

Usage

These are some basic usage of gRPC, you could refer to gRPC Docs for advanced examples.

Register gRPC Server

  1. gs := grpc.NewServer()
  2. app := kratos.New(
  3. kratos.Name("kratos"),
  4. kratos.Version("v1.0.0"),
  5. kratos.Server(gs),
  6. )

Set middleware in gRPC Server

  1. grpcSrv := grpc.NewServer(
  2. grpc.Address(":9000"),
  3. grpc.Middleware(
  4. logging.Server(),
  5. ),
  6. )

Process Request in gRPC Middleware

  1. if info, ok := transport.FromServerContext(ctx); ok {
  2. kind = info.Kind().String()
  3. operation = info.Operation()
  4. }

Client

Options

WithEndpoint(endpoint string) ClientOption

To set the endpoint which the client will connect to.

WithTimeout(timeout time.Duration) ClientOption

To set the client-side timeout.

WithMiddleware(m ...middleware.Middleware) ClientOption

To set middleware.

WithDiscovery(d registry.Discovery) ClientOption

To set the discovery for gRPC client.

WithTLSConfig(c *tls.Config) ClientOption

To set TLS config.

WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption

To set interceptors for gRPC client.

WithOptions(opts ...grpc.DialOption) ClientOption

To set some extra grpc.ClientOption.

Implementation Details

dial()

  1. func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
  2. // default options
  3. options := clientOptions{
  4. timeout: 2000 * time.Millisecond,
  5. balancerName: wrr.Name,
  6. logger: log.GetLogger(),
  7. }
  8. // apply opts
  9. for _, o := range opts {
  10. o(&options)
  11. }
  12. // convert middleware to grpc interceptor
  13. ints := []grpc.UnaryClientInterceptor{
  14. unaryClientInterceptor(options.middleware, options.timeout, options.filters),
  15. }
  16. if len(options.ints) > 0 {
  17. ints = append(ints, options.ints...)
  18. }
  19. // client side balancer
  20. grpcOpts := []grpc.DialOption{
  21. grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, options.balancerName)),
  22. grpc.WithChainUnaryInterceptor(ints...),
  23. }
  24. if options.discovery != nil {
  25. // To use service discovery
  26. grpcOpts = append(grpcOpts,
  27. grpc.WithResolvers(
  28. discovery.NewBuilder(
  29. options.discovery,
  30. discovery.WithInsecure(insecure),
  31. discovery.WithLogger(options.logger),
  32. )))
  33. }
  34. if insecure {
  35. // to disable transport security for connection
  36. grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials()))
  37. }
  38. // TLS config
  39. if options.tlsConf != nil {
  40. grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf)))
  41. }
  42. if len(options.grpcOpts) > 0 {
  43. grpcOpts = append(grpcOpts, options.grpcOpts...)
  44. }
  45. return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
  46. }

unaryClientInterceptor()

  1. func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
  2. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  3. // bind some information into ctx
  4. ctx = transport.NewClientContext(ctx, &Transport{
  5. endpoint: cc.Target(),
  6. operation: method,
  7. reqHeader: headerCarrier{},
  8. filters: filters,
  9. })
  10. if timeout > 0 {
  11. // set the timeout
  12. var cancel context.CancelFunc
  13. ctx, cancel = context.WithTimeout(ctx, timeout)
  14. defer cancel()
  15. }
  16. // middleware
  17. h := func(ctx context.Context, req interface{}) (interface{}, error) {
  18. if tr, ok := transport.FromClientContext(ctx); ok {
  19. header := tr.RequestHeader()
  20. keys := header.Keys()
  21. keyvals := make([]string, 0, len(keys))
  22. for _, k := range keys {
  23. keyvals = append(keyvals, k, header.Get(k))
  24. }
  25. ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
  26. }
  27. return reply, invoker(ctx, method, req, reply, cc, opts...)
  28. }
  29. if len(ms) > 0 {
  30. h = middleware.Chain(ms...)(h)
  31. }
  32. _, err := h(ctx, req)
  33. return err
  34. }
  35. }

Usage

Client Connection

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. grpc.WithEndpoint("127.0.0.1:9000"),
  4. )

Middleware

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. grpc.WithEndpoint("127.0.0.1:9000"),
  4. grpc.WithTimeout(3600 * time.Second),
  5. grpc.WithMiddleware(
  6. recovery.Recovery(),
  7. validate.Validator(),
  8. ),
  9. )

Service Discovery

  1. conn, err := grpc.DialInsecure(
  2. context.Background(),
  3. grpc.WithEndpoint("discovery:///helloworld"),
  4. grpc.WithDiscovery(r),
  5. )

References