输入输出中间件

输入输出中间件可以完全代替 Hprose 过滤器。使用输入输出中间件还是使用 Hprose 过滤器完全看开发者喜好。

输入输出中间件的形式为:

  1. func(
  2. request []byte,
  3. context Context,
  4. next NextFilterHandler) (response []byte, err error) {
  5. ...
  6. response, err = next(request, context)
  7. ...
  8. return response, err

request 是原始请求数据,对于客户端来说它是输出数据,对于服务器端来说,它是输入数据。

context 是调用上下文对象。

next 表示下一个中间件。通过调用 next 将各个中间件串联起来。

next 的返回值 response 是返回的响应数据。对于客户端来说,它是输入数据。对于服务器端来说,它是输出数据。

跟踪调试

下面我们来看一下 Hprose 过滤器中的跟踪调试的例子在这里如何实现。

  1. package main
  2.  
  3. import (
  4. "fmt"
  5.  
  6. "github.com/hprose/hprose-golang/rpc"
  7. )
  8.  
  9. type logFilter struct {
  10. Prompt string
  11. }
  12.  
  13. func (lf logFilter) handler(
  14. request []byte,
  15. context rpc.Context,
  16. next rpc.NextFilterHandler) (response []byte, err error) {
  17. fmt.Printf("%v: %s\r\n", lf.Prompt, request)
  18. response, err = next(request, context)
  19. fmt.Printf("%v: %s\r\n", lf.Prompt, response)
  20. return
  21. }
  22.  
  23. func hello(name string) string {
  24. return "Hello " + name + "!"
  25. }
  26.  
  27. type HelloService struct {
  28. Hello func(string) (string, error)
  29. }
  30.  
  31. func main() {
  32. server := rpc.NewTCPServer("")
  33. server.AddFunction("hello", hello)
  34. server.AddBeforeFilterHandler(logFilter{"Server"}.handler)
  35. server.Handle()
  36. client := rpc.NewClient(server.URI())
  37. client.AddBeforeFilterHandler(logFilter{"Client"}.handler)
  38. var helloService *HelloService
  39. client.UseService(&helloService)
  40. helloService.Hello("World")
  41. client.Close()
  42. server.Close()
  43. }

执行结果为:


  1. Client: Cs5"Hello"a1{s5"World"}z
  2. Server: Cs5"Hello"a1{s5"World"}z
  3. Server: Rs12"Hello World!"z
  4. Client: Rs12"Hello World!"z

这个结果跟使用 Hprose 过滤器的例子的结果一样。

但是我们发现,这里使用 Hprose 中间件要写的代码比起 Hprose 过滤器更简单,只需要一个方法就可以了。

另外,因为这个例子中,我们没有使用过滤器功能,因此使用 AddBeforeFilterHandler(或 BeforeFilter.Use)方法或者 AddAfterFilterHandler(或 AfterFilter.Use)方法添加中间件处理器效果都是一样的。

但如果我们使用了过滤器的话,那么 AddBeforeFilterHandler 添加的中间件处理器的 request 数据是未经过过滤器处理的。过滤器的处理操作在 next 的最后一环中执行。next 返回的响应 response 是经过过滤器处理的。

如果某个通过 AddBeforeFilterHandler 添加的中间件处理器跳过了 next 而直接返回了结果的话,则返回的 response 也是未经过过滤器处理的。而且如果某个 AddBeforeFilterHandler 添加的中间件处理器跳过了 next,不但过滤器不会执行,而且在它之后使用 AddBeforeFilterHandler 所添加的中间件处理器也不会执行,AddAfterFilterHandler 方法所添加的所有中间件处理器也都不会执行。

AddAfterFilterHandler 添加的处理器所收到的 request 都是经过过滤器处理以后的,但它当中使用 next 方法返回的 response 是未经过过滤器处理的。

下面,我们在来看一个结合了压缩过滤器和输入输出缓存中间件的例子。

压缩、缓存、计时

压缩我们使用 Hprose 过滤器 一章中的 compress_handler.go,这里就不列代码了。

  1. package main
  2.  
  3. import (
  4. "fmt"
  5.  
  6. "time"
  7.  
  8. "sync"
  9.  
  10. "github.com/hprose/hprose-golang/rpc"
  11. )
  12.  
  13. type cacheFilter struct {
  14. Cache map[string][]byte
  15. sync.RWMutex
  16. }
  17.  
  18. func (cf *cacheFilter) handler(
  19. request []byte,
  20. context rpc.Context,
  21. next rpc.NextFilterHandler) (response []byte, err error) {
  22. if context.GetBool("cache") {
  23. var ok bool
  24. cf.RLock()
  25. if response, ok = cf.Cache[string(request)]; ok {
  26. cf.RUnlock()
  27. return response, nil
  28. }
  29. cf.RUnlock()
  30. response, err = next(request, context)
  31. if err != nil {
  32. return
  33. }
  34. cf.Lock()
  35. cf.Cache[string(request)] = response
  36. cf.Unlock()
  37. return response, nil
  38. }
  39. return next(request, context)
  40. }
  41.  
  42. type sizeFilter struct {
  43. Message string
  44. }
  45.  
  46. func (sf sizeFilter) handler(
  47. request []byte,
  48. context rpc.Context,
  49. next rpc.NextFilterHandler) (response []byte, err error) {
  50. fmt.Printf("%v request size is %d\r\n", sf.Message, len(request))
  51. response, err = next(request, context)
  52. fmt.Printf("%v response size is %d\r\n", sf.Message, len(response))
  53. return
  54. }
  55.  
  56. type statFilter struct {
  57. Message string
  58. }
  59.  
  60. func (sf statFilter) handler(
  61. request []byte,
  62. context rpc.Context,
  63. next rpc.NextFilterHandler) (response []byte, err error) {
  64. start := time.Now()
  65. response, err = next(request, context)
  66. end := time.Now()
  67. fmt.Printf("%v takes %d ms.\r\n", sf.Message, end.Sub(start)/time.Millisecond)
  68. return
  69. }
  70.  
  71. // TestService is ...
  72. type TestService struct {
  73. Test func([]int) ([]int, error) `userdata:"{\"cache\":true}"`
  74. }
  75.  
  76. func main() {
  77. server := rpc.NewTCPServer("")
  78. server.AddFunction("test", func(data []int) []int {
  79. return data
  80. }).
  81. AddBeforeFilterHandler(
  82. statFilter{"Server: BeforeFilter"}.handler,
  83. sizeFilter{"Server: Compressed"}.handler,
  84. ).
  85. AddFilter(CompressFilter{}).
  86. AddAfterFilterHandler(
  87. statFilter{"Server: AfterFilter"}.handler,
  88. sizeFilter{"Server: Non Compressed"}.handler,
  89. )
  90. server.Handle()
  91. client := rpc.NewClient(server.URI())
  92. client.AddFilter(CompressFilter{}).
  93. AddBeforeFilterHandler(
  94. (&cacheFilter{Cache: make(map[string][]byte)}).handler,
  95. statFilter{"Client: BeforeFilter"}.handler,
  96. sizeFilter{"Client: Compressed"}.handler,
  97. ).
  98. AddAfterFilterHandler(
  99. statFilter{"Client: AfterFilter"}.handler,
  100. sizeFilter{"Client: Non Compressed"}.handler,
  101. )
  102. var testService *TestService
  103. client.UseService(&testService)
  104. args := make([]int, 100000)
  105. for i := range args {
  106. args[i] = i
  107. }
  108. result, err := testService.Test(args)
  109. fmt.Println(len(result), err)
  110. result, err = testService.Test(args)
  111. fmt.Println(len(result), err)
  112. client.Close()
  113. server.Close()
  114. }

该程序运行结果为:


  1. Client: Compressed request size is 688893
  2. Client: Non Compressed request size is 213244
  3. Server: Compressed request size is 213244
  4. Server: Non Compressed request size is 688893
  5. Server: Non Compressed response size is 688881
  6. Server: AfterFilter takes 5 ms.
  7. Server: Compressed response size is 213223
  8. Server: BeforeFilter takes 38 ms.
  9. Client: Non Compressed response size is 213223
  10. Client: AfterFilter takes 38 ms.
  11. Client: Compressed response size is 688881
  12. Client: BeforeFilter takes 69 ms.
  13. 100000 <nil>
  14. 100000 <nil>

我们可以看到两次的执行结果都出来了,但是中间件的输出内容只有一次。原因就是第二次执行时,缓存中间件将缓存的结果直接返回了。因此后面所有的步骤就都略过了。

通过这个例子,我们可以看出,将 Hprose 中间件和 Hprose 过滤器结合,可以实现非常强大的扩展功能。如果你有什么特殊的需求,直接使用 Hprose 无法实现的话,就考虑一下是否可以添加几个 Hprose 中间件和 Hprose 过滤器吧。