description
    自定义Body Parser

    elton-body-parser只提供对application/json以及application/x-www-form-urlencoded转换为json字节的处理,在实际使用中还存在一些其它的场景。如xml,自定义数据结构等。

    在实际项目中,统计数据一般记录至influxdb,为了性能的考虑,统计数据是批量提交(如每1000个统计点提交一次)。数据提交的时候,重复的字符比较多,为了减少带宽的占用,所以先做压缩处理。考虑到性能的原因,采用了snappy压缩处理。下面是抽取出来的示例代码:

    1. package main
    2. import (
    3. "bytes"
    4. "fmt"
    5. "io/ioutil"
    6. "net/http"
    7. "strconv"
    8. "strings"
    9. "time"
    10. "github.com/golang/snappy"
    11. "github.com/vicanso/elton"
    12. )
    13. // 仅示例,对于出错直接panic
    14. func post() {
    15. // weather,location=us-midwest temperature=82 1465839830100400200
    16. max := 1000
    17. arr := make([]string, max)
    18. for i := 0; i < max; i++ {
    19. arr[i] = "weather,location=us-midwest temperature=82 " + strconv.FormatInt(time.Now().UnixNano(), 10)
    20. }
    21. var dst []byte
    22. data := snappy.Encode(dst, []byte(strings.Join(arr, "\n")))
    23. req, err := http.NewRequest("POST", "http://127.0.0.1:3000/influx", bytes.NewReader(data))
    24. req.Header.Set(elton.HeaderContentType, ContentTypeIfx)
    25. if err != nil {
    26. panic(err)
    27. }
    28. resp, err := http.DefaultClient.Do(req)
    29. if err != nil {
    30. panic(err)
    31. }
    32. result, _ := ioutil.ReadAll(resp.Body)
    33. fmt.Println(string(result))
    34. }
    35. const (
    36. // ContentTypeIfx influx data type
    37. ContentTypeIfx = "application/ifx"
    38. )
    39. // NewInfluxParser influx parser
    40. func NewInfluxParser() elton.Handler {
    41. return func(c *elton.Context) (err error) {
    42. // 对于非POST请求,以及数据类型不匹配的,则跳过
    43. if c.Request.Method != http.MethodPost ||
    44. c.GetRequestHeader(elton.HeaderContentType) != ContentTypeIfx {
    45. return c.Next()
    46. }
    47. body, err := ioutil.ReadAll(c.Request.Body)
    48. // 如果读取数据时出错,直接返回
    49. if err != nil {
    50. return
    51. }
    52. var dst []byte
    53. data, err := snappy.Decode(dst, body)
    54. // 如果解压出错,直接返回(也可再自定义出错类型,方便排查)
    55. if err != nil {
    56. return
    57. }
    58. // 至此则解压生成提交的数据了
    59. c.RequestBody = data
    60. return c.Next()
    61. }
    62. }
    63. func main() {
    64. e := elton.New()
    65. go func() {
    66. // 等待一秒让elton启动(仅为了测试方便,直接客户端服务端同一份代码)
    67. time.Sleep(time.Second)
    68. post()
    69. }()
    70. e.Use(NewInfluxParser())
    71. e.POST("/influx", func(c *elton.Context) (err error) {
    72. points := strings.SplitN(string(c.RequestBody), "\n", -1)
    73. c.BodyBuffer = bytes.NewBufferString("add " + strconv.Itoa(len(points)) + " points to influxdb done")
    74. return
    75. })
    76. err := e.ListenAndServe(":3000")
    77. if err != nil {
    78. panic(err)
    79. }
    80. }

    通过各类自定义的中间件,可以实现各种不同的提交数据的解析,只要将解析结果保存至Context.RequestBody中,后续则由处理函数再将字节转换为相对应的结构,简单易用。

    elton-body-parser提供自定义Decoder方式,可以按实际使用添加Decoder,上面的实现可以简化为:

    1. package main
    2. import (
    3. "bytes"
    4. "fmt"
    5. "io/ioutil"
    6. "net/http"
    7. "regexp"
    8. "strconv"
    9. "strings"
    10. "time"
    11. "github.com/golang/snappy"
    12. "github.com/vicanso/elton"
    13. "github.com/vicanso/elton/middleware"
    14. )
    15. // 仅示例,对于出错直接panic
    16. func post() {
    17. // weather,location=us-midwest temperature=82 1465839830100400200
    18. max := 1000
    19. arr := make([]string, max)
    20. for i := 0; i < max; i++ {
    21. arr[i] = "weather,location=us-midwest temperature=82 " + strconv.FormatInt(time.Now().UnixNano(), 10)
    22. }
    23. var dst []byte
    24. data := snappy.Encode(dst, []byte(strings.Join(arr, "\n")))
    25. req, err := http.NewRequest("POST", "http://127.0.0.1:3000/influx", bytes.NewReader(data))
    26. req.Header.Set(elton.HeaderContentType, ContentTypeIfx)
    27. if err != nil {
    28. panic(err)
    29. }
    30. resp, err := http.DefaultClient.Do(req)
    31. if err != nil {
    32. panic(err)
    33. }
    34. result, _ := ioutil.ReadAll(resp.Body)
    35. fmt.Println(string(result))
    36. }
    37. const (
    38. // ContentTypeIfx influx data type
    39. ContentTypeIfx = "application/ifx"
    40. )
    41. func main() {
    42. e := elton.New()
    43. go func() {
    44. // 等待一秒让elton启动(仅为了测试方便,直接客户端服务端同一份代码)
    45. time.Sleep(time.Second)
    46. post()
    47. }()
    48. conf := middleware.BodyParserConfig{
    49. // 设置对哪些content type处理,默认只处理application/json
    50. ContentTypeValidate: func(c *elton.Context) bool {
    51. ct := c.GetRequestHeader(elton.HeaderContentType)
    52. return regexp.MustCompile("application/json|" + ContentTypeIfx).MatchString(ct)
    53. },
    54. }
    55. // gzip解压
    56. conf.AddDecoder(middleware.NewGzipDecoder())
    57. // json decoder
    58. conf.AddDecoder(middleware.NewJSONDecoder())
    59. // 添加自定义influx的decoder
    60. conf.AddDecoder(&middleware.BodyDecoder{
    61. // 判断是否符合该decoder
    62. Validate: func(c *elton.Context) bool {
    63. return c.GetRequestHeader(elton.HeaderContentType) == ContentTypeIfx
    64. },
    65. // 解压snappy
    66. Decode: func(c *elton.Context, orginalData []byte) (data []byte, err error) {
    67. var dst []byte
    68. data, err = snappy.Decode(dst, orginalData)
    69. return
    70. },
    71. })
    72. e.Use(middleware.NewBodyParser(conf))
    73. e.POST("/influx", func(c *elton.Context) (err error) {
    74. points := strings.SplitN(string(c.RequestBody), "\n", -1)
    75. c.BodyBuffer = bytes.NewBufferString("add " + strconv.Itoa(len(points)) + " points to influxdb done")
    76. return
    77. })
    78. err := e.ListenAndServe(":3000")
    79. if err != nil {
    80. panic(err)
    81. }
    82. }