限流(并发控制)

概述

限流器是一种服务治理能力,用于限制服务的并发调用量,以保护服务的稳定性。在本次文档中仅介绍对 rest,grpc 服务的限流器的使用,不介绍限流算法。

限流一般有单节点限流、集群限流(将限流数值对集群节点数求平均值,其本质还是单节点限流)、分布式限流。

本次介绍的是单节点限流,与其说是限流,倒不如说是并发控制更贴切。

准备工作

  1. 创建一个 go mod 的 demo 工程
  1. $ mkdir -p ~/workspace/demo && cd ~/workspace/demo
  2. $ go mod init demo

rest 服务

限流配置

  1. RestConf struct {
  2. ...
  3. MaxConns int `json:",default=10000"` // 最大并发连接数,默认值为 10000 qps
  4. ...
  5. }

示例

我们用一个 demo 来介绍一下限流器的使用。

  1. demo 工程中新建目录 rest-limit-demo
  1. $ cd ~/workspace/demo
  2. $ mkdir rest-limit-demo && cd rest-limit-demo
  1. 新建一个 limit.api 文件,将如下内容拷贝到文件中

limit.api

  1. syntax = "v1"
  2. service limit {
  3. @handler ping
  4. get /ping
  5. }
  1. 生成 rest 代码
  1. $ cd ~/workspace/demo/rest-limit-demo
  2. $ goctl api go -api limit.api -dir .
  1. 查看目录结构
  1. $ tree
  2. .
  3. ├── etc
  4. └── limit.yaml
  5. ├── internal
  6. ├── config
  7. └── config.go
  8. ├── handler
  9. ├── pinghandler.go
  10. └── routes.go
  11. ├── logic
  12. └── pinglogic.go
  13. ├── svc
  14. └── servicecontext.go
  15. └── types
  16. └── types.go
  17. ├── limit.api
  18. └── limit.go

我们修改一下配置,将 qps 限制为 100,然后逻辑中加一点阻塞逻辑。

  1. 修改配置文件

~/workspace/demo/rest-limit-demo/etc/limit.yaml 中的 maxConns 修改为 100

  1. 添加逻辑代码

~/workspace/demo/rest-limit-demo/internal/logic/pinglogic.go 中的 Ping 方法添加阻塞逻辑

最终代码内容如下

  • limit.yaml
  • pinglogic.go

limit.yaml

  1. Name: limit
  2. Host: 0.0.0.0
  3. Port: 8888
  4. MaxConns: 100
  1. package logic
  2. import (
  3. "context"
  4. "time"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "demo/rest-limit-demo/internal/svc"
  7. )
  8. type PingLogic struct {
  9. logx.Logger
  10. ctx context.Context
  11. svcCtx *svc.ServiceContext
  12. }
  13. func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic {
  14. return &PingLogic{
  15. Logger: logx.WithContext(ctx),
  16. ctx: ctx,
  17. svcCtx: svcCtx,
  18. }
  19. }
  20. func (l *PingLogic) Ping() error {
  21. time.Sleep(50 * time.Millisecond)
  22. return nil
  23. }

我们先来运行一下这个最简单的 rest 服务,我们用 hey 工具来简单压测一下接口。

先启动服务

  1. $ cd ~/workspace/demo/rest-limit-demo
  2. $ go run limit.go

单独开个终端压测

  1. # 我们用 hey 工具来进行压测,压测 90 个并发,执行 1 秒
  2. $ hey -z 1s -c 90 -q 1 'http://localhost:8888/ping'
  3. Summary:
  4. Total: 1.1084 secs
  5. Slowest: 0.1066 secs
  6. Fastest: 0.0607 secs
  7. Average: 0.0890 secs
  8. Requests/sec: 81.1980
  9. Response time histogram:
  10. 0.061 [1] |■
  11. 0.065 [2] |■■■
  12. 0.070 [8] |■■■■■■■■■■■
  13. 0.074 [13] |■■■■■■■■■■■■■■■■■
  14. 0.079 [5] |■■■■■■■
  15. 0.084 [0] |
  16. 0.088 [0] |
  17. 0.093 [2] |■■■
  18. 0.097 [23] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  19. 0.102 [30] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  20. 0.107 [6] |■■■■■■■■
  21. Latency distribution:
  22. 10% in 0.0682 secs
  23. 25% in 0.0742 secs
  24. 50% in 0.0961 secs
  25. 75% in 0.0983 secs
  26. 90% in 0.0996 secs
  27. 95% in 0.1039 secs
  28. 0% in 0.0000 secs
  29. Details (average, fastest, slowest):
  30. DNS+dialup: 0.0054 secs, 0.0607 secs, 0.1066 secs
  31. DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
  32. req write: 0.0002 secs, 0.0000 secs, 0.0011 secs
  33. resp wait: 0.0832 secs, 0.0576 secs, 0.0942 secs
  34. resp read: 0.0001 secs, 0.0000 secs, 0.0012 secs
  35. Status code distribution:
  36. [200] 90 responses

从压测结果来看,90 个请求全部成功,我们来加大并发数,看看会发生什么。

  1. # 我们用 hey 工具来进行压测,压测 110 个并发,执行 1 秒
  2. $ hey -z 1s -c 110 -q 1 'http://127.0.0.1:8888/ping'
  3. Summary:
  4. Total: 1.0833 secs
  5. Slowest: 0.0756 secs
  6. Fastest: 0.0107 secs
  7. Average: 0.0644 secs
  8. Requests/sec: 101.5403
  9. Response time histogram:
  10. 0.011 [1] |■
  11. 0.017 [9] |■■■■■■■
  12. 0.024 [0] |
  13. 0.030 [0] |
  14. 0.037 [0] |
  15. 0.043 [0] |
  16. 0.050 [0] |
  17. 0.056 [0] |
  18. 0.063 [2] |■■
  19. 0.069 [45] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  20. 0.076 [53] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  21. Latency distribution:
  22. 10% in 0.0612 secs
  23. 25% in 0.0665 secs
  24. 50% in 0.0690 secs
  25. 75% in 0.0726 secs
  26. 90% in 0.0737 secs
  27. 95% in 0.0740 secs
  28. 99% in 0.0756 secs
  29. Details (average, fastest, slowest):
  30. DNS+dialup: 0.0040 secs, 0.0107 secs, 0.0756 secs
  31. DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
  32. req write: 0.0001 secs, 0.0000 secs, 0.0025 secs
  33. resp wait: 0.0602 secs, 0.0064 secs, 0.0741 secs
  34. resp read: 0.0000 secs, 0.0000 secs, 0.0001 secs
  35. Status code distribution:
  36. [200] 100 responses
  37. [503] 10 responses

从压测结果来看,我们的服务只能支持 100 个并发,超过 100 个并发的请求都会被限流,返回 503 状态码。 在服务的日志中也会出现限流相关的错误:

  1. {"@timestamp":"2023-02-09T18:41:55.500+08:00","caller":"internal/log.go:62","content":"(/ping - 127.0.0.1:64163) concurrent connections over 100, rejected with code 503","level":"error","span":"08fa61d49b694e63","trace":"622b2287f32ff2d45f4dfce3eec8e62c"}

grpc 服务

grpc 服务属于内网服务,不对外提供服务,只对内部的其他服务提供服务,所以我们默认情况下不需要对其进行限流。

如果真要对其进行限流,可以通过 grpc 中间件来实现,参考示例如下。

我们用一个最简单的 grpc server 服务来演示一下。

  1. demo 工程中新建目录 grpc-limit-demo
  1. $ cd ~/workspace/demo
  2. $ mkdir grpc-limit-demo && cd grpc-limit-demo
  1. 新建一个 limit.proto 文件,将如下内容拷贝到文件中

limit.proto

  1. syntax = "proto3";
  2. package proto;
  3. option go_package = "./proto";
  4. message PingReq{}
  5. message PingResp{}
  6. service limit{
  7. rpc Ping(PingReq) returns (PingResp);
  8. }
  1. 生成 grpc 代码
  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. $ goctl rpc protoc limit.proto --go_out=. --go-grpc_out=. --zrpc_out=.
  1. 查看目录
  1. $ tree
  2. .
  3. ├── etc
  4. └── limit.yaml
  5. ├── internal
  6. ├── config
  7. └── config.go
  8. ├── logic
  9. └── pinglogic.go
  10. ├── server
  11. └── limitserver.go
  12. └── svc
  13. └── servicecontext.go
  14. ├── limit
  15. └── limit.go
  16. ├── limit.go
  17. ├── limit.proto
  18. └── proto
  19. ├── limit.pb.go
  20. └── limit_grpc.pb.go
  21. 8 directories, 10 files

我们在 ~/workspace/demo/grpc-limit-demo/internal/logic/limitlogic.go 中实现 Ping 方法,代码如下:

imitlogic.go

  1. package logic
  2. import (
  3. "context"
  4. "time"
  5. "demo/grpc-limit-demo/internal/svc"
  6. "demo/grpc-limit-demo/proto"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. )
  9. type PingLogic struct {
  10. ctx context.Context
  11. svcCtx *svc.ServiceContext
  12. logx.Logger
  13. }
  14. func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic {
  15. return &PingLogic{
  16. ctx: ctx,
  17. svcCtx: svcCtx,
  18. Logger: logx.WithContext(ctx),
  19. }
  20. }
  21. func (l *PingLogic) Ping(in *proto.PingReq) (*proto.PingResp, error) {
  22. time.Sleep(50*time.Millisecond)
  23. return &proto.PingResp{}, nil
  24. }
  1. ~/workspace/demo/grpc-limit-demo/limit.go 中添加中间件:

limit.go

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "net/http"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "github.com/zeromicro/go-zero/core/syncx"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "demo/grpc-limit-demo/internal/config"
  12. "demo/grpc-limit-demo/internal/server"
  13. "demo/grpc-limit-demo/internal/svc"
  14. "demo/grpc-limit-demo/proto"
  15. "github.com/zeromicro/go-zero/core/conf"
  16. "github.com/zeromicro/go-zero/core/service"
  17. "github.com/zeromicro/go-zero/zrpc"
  18. "google.golang.org/grpc"
  19. "google.golang.org/grpc/reflection"
  20. )
  21. var configFile = flag.String("f", "etc/limit.yaml", "the config file")
  22. func main() {
  23. flag.Parse()
  24. var c config.Config
  25. conf.MustLoad(*configFile, &c)
  26. ctx := svc.NewServiceContext(c)
  27. s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
  28. proto.RegisterLimitServer(grpcServer, server.NewLimitServer(ctx))
  29. if c.Mode == service.DevMode || c.Mode == service.TestMode {
  30. reflection.Register(grpcServer)
  31. }
  32. })
  33. var n = 100
  34. l := syncx.NewLimit(n)
  35. s.AddUnaryInterceptors(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  36. if l.TryBorrow() {
  37. defer func() {
  38. if err := l.Return(); err != nil {
  39. logx.Error(err)
  40. }
  41. }()
  42. return handler(ctx, req)
  43. } else {
  44. logx.Errorf("concurrent connections over %d, rejected with code %d",
  45. n, http.StatusServiceUnavailable)
  46. return nil, status.Error(codes.Unavailable, "concurrent connections over limit")
  47. }
  48. })
  49. defer s.Stop()
  50. fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
  51. s.Start()
  52. }
  53. }

我们将生成的配置文件 ~/workspace/demo/grpc-limit-demo/etc/limit.yaml 文件中的 etcd 配置删除掉,用直连的方式来启动 grpc server。

limit.yaml

  1. Name: limit.rpc
  2. ListenOn: 0.0.0.0:8080

启动服务

  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. $ go run limit.go

现在 grpc server 服务有了,我们用 ghz 来压测一下 。

  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. # 压测 90 qps,共请求 110 次
  3. $ ghz --insecure --proto=limit.proto --call=proto.limit.Ping -d '{}' -c 90 -n 110 127.0.0.1:8080
  4. Summary:
  5. Count: 110
  6. Total: 108.94 ms
  7. Slowest: 60.51 ms
  8. Fastest: 50.24 ms
  9. Average: 55.73 ms
  10. Requests/sec: 1009.75
  11. Response time histogram:
  12. 50.240 [1] |∎∎
  13. 51.266 [14] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  14. 52.293 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  15. 53.320 [3] |∎∎∎∎∎∎
  16. 54.347 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  17. 55.374 [5] |∎∎∎∎∎∎∎∎∎∎
  18. 56.401 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  19. 57.427 [20] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  20. 58.454 [19] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  21. 59.481 [14] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  22. 60.508 [7] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  23. Latency distribution:
  24. 10 % in 50.88 ms
  25. 25 % in 53.40 ms
  26. 50 % in 56.81 ms
  27. 75 % in 57.96 ms
  28. 90 % in 58.93 ms
  29. 95 % in 59.52 ms
  30. 99 % in 60.26 ms
  31. Status code distribution:
  32. [OK] 110 responses

可以看到所有的请求都是成功的,我们再来压测 110 qps。

  1. $ cd ~/workspace/demo/grpc-limit-demo
  2. # 压测 110 qps,共请求 110 次
  3. $ ghz --insecure --proto=limit.proto --call=proto.limit.Ping -d '{}' -c 110 -n 110 127.0.0.1:8080
  4. Summary:
  5. Count: 110
  6. Total: 68.35 ms
  7. Slowest: 67.53 ms
  8. Fastest: 58.76 ms
  9. Average: 58.03 ms
  10. Requests/sec: 1609.33
  11. Response time histogram:
  12. 58.763 [1] |∎∎
  13. 59.640 [7] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  14. 60.517 [12] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  15. 61.394 [7] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  16. 62.272 [10] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  17. 63.149 [15] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  18. 64.026 [14] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  19. 64.903 [19] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  20. 65.780 [9] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  21. 66.657 [3] |∎∎∎∎∎∎
  22. 67.534 [3] |∎∎∎∎∎∎
  23. Latency distribution:
  24. 10 % in 59.81 ms
  25. 25 % in 60.96 ms
  26. 50 % in 62.96 ms
  27. 75 % in 64.52 ms
  28. 90 % in 65.29 ms
  29. 95 % in 66.09 ms
  30. 99 % in 67.03 ms
  31. Status code distribution:
  32. [Unavailable] 10 responses
  33. [OK] 100 responses
  34. Error distribution:
  35. [10] rpc error: code = Unavailable desc = concurrent connections over limit

可以看到,当并发量超过 100 时,就会返回 rpc error: code = Unavailable desc = concurrent connections over limit

参考文献

  1. 《轻量级压测工具 hey》
  2. 《grpc 压测工作 ghz》