数据的流处理利器

流处理(Stream processing)是一种计算机编程范式,其允许给定一个数据序列(流处理数据源),一系列数据操作(函数)被应用到流中的每个元素。同时流处理工具可以显著提高程序员的开发效率,允许他们编写有效、干净和简洁的代码。

流数据处理在我们的日常工作中非常常见,举个例子,我们在业务开发中往往会记录许多业务日志,这些日志一般是先发送到Kafka,然后再由Job消费Kafaka写到elasticsearch,在进行日志流处理的过程中,往往还会对日志做一些处理,比如过滤无效的日志,做一些计算以及重新组合日志等等,示意图如下:

fx_log

流处理工具fx

gozero是一个功能完备的微服务框架,框架中内置了很多非常实用的工具,其中就包含流数据处理工具fx,下面我们通过一个简单的例子来认识下该工具:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "os/signal"
  6. "syscall"
  7. "time"
  8. "github.com/zeromicro/go-zero/core/fx"
  9. )
  10. func main() {
  11. ch := make(chan int)
  12. go inputStream(ch)
  13. go outputStream(ch)
  14. c := make(chan os.Signal, 1)
  15. signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
  16. <-c
  17. }
  18. func inputStream(ch chan int) {
  19. count := 0
  20. for {
  21. ch <- count
  22. time.Sleep(time.Millisecond * 500)
  23. count++
  24. }
  25. }
  26. func outputStream(ch chan int) {
  27. fx.From(func(source chan<- interface{}) {
  28. for c := range ch {
  29. source <- c
  30. }
  31. }).Walk(func(item interface{}, pipe chan<- interface{}) {
  32. count := item.(int)
  33. pipe <- count
  34. }).Filter(func(item interface{}) bool {
  35. itemInt := item.(int)
  36. if itemInt%2 == 0 {
  37. return true
  38. }
  39. return false
  40. }).ForEach(func(item interface{}) {
  41. fmt.Println(item)
  42. })
  43. }

inputStream函数模拟了流数据的产生,outputStream函数模拟了流数据的处理过程,其中From函数为流的输入,Walk函数并发的作用在每一个item上,Filter函数对item进行过滤为true保留为false不保留,ForEach函数遍历输出每一个item元素。

流数据处理中间操作

一个流的数据处理可能存在许多的中间操作,每个中间操作都可以作用在流上。就像流水线上的工人一样,每个工人操作完零件后都会返回处理完成的新零件,同理流处理中间操作完成后也会返回一个新的流。

fx_middle

fx的流处理中间操作:

操作函数 功能 输入
Distinct 去除重复的item KeyFunc,返回需要去重的key
Filter 过滤不满足条件的item FilterFunc,Option控制并发量
Group 对item进行分组 KeyFunc,以key进行分组
Head 取出前n个item,返回新stream int64保留数量
Map 对象转换 MapFunc,Option控制并发量
Merge 合并item到slice并生成新stream
Reverse 反转item
Sort 对item进行排序 LessFunc实现排序算法
Tail 与Head功能类似,取出后n个item组成新stream int64保留数量
Walk 作用在每个item上 WalkFunc,Option控制并发量

下图展示了每个步骤和每个步骤的结果:

fx_step_result

用法与原理分析

From

通过From函数构建流并返回Stream,流数据通过channel进行存储:

  1. // 例子
  2. s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
  3. fx.From(func(source chan<- interface{}) {
  4. for _, v := range s {
  5. source <- v
  6. }
  7. })
  8. // 源码
  9. func From(generate GenerateFunc) Stream {
  10. source := make(chan interface{})
  11. go func() {
  12. defer close(source)
  13. // 构造流数据写入channel
  14. generate(source)
  15. }()
  16. return Range(source)
  17. }

Filter

Filter函数提供过滤item的功能,FilterFunc定义过滤逻辑true保留item,false则不保留:

  1. // 例子 保留偶数
  2. s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
  3. fx.From(func(source chan<- interface{}) {
  4. for _, v := range s {
  5. source <- v
  6. }
  7. }).Filter(func(item interface{}) bool {
  8. if item.(int)%2 == 0 {
  9. return true
  10. }
  11. return false
  12. })
  13. // 源码
  14. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  15. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  16. // 执行过滤函数true保留,false丢弃
  17. if fn(item) {
  18. pipe <- item
  19. }
  20. }, opts...)
  21. }

Group

Group对流数据进行分组,需定义分组的key,数据分组后以slice存入channel:

  1. // 例子 按照首字符"g"或者"p"分组,没有则分到另一组
  2. ss := []string{"golang", "google", "php", "python", "java", "c++"}
  3. fx.From(func(source chan<- interface{}) {
  4. for _, s := range ss {
  5. source <- s
  6. }
  7. }).Group(func(item interface{}) interface{} {
  8. if strings.HasPrefix(item.(string), "g") {
  9. return "g"
  10. } else if strings.HasPrefix(item.(string), "p") {
  11. return "p"
  12. }
  13. return ""
  14. }).ForEach(func(item interface{}) {
  15. fmt.Println(item)
  16. })
  17. }
  18. // 源码
  19. func (p Stream) Group(fn KeyFunc) Stream {
  20. // 定义分组存储map
  21. groups := make(map[interface{}][]interface{})
  22. for item := range p.source {
  23. // 用户自定义分组key
  24. key := fn(item)
  25. // key相同分到一组
  26. groups[key] = append(groups[key], item)
  27. }
  28. source := make(chan interface{})
  29. go func() {
  30. for _, group := range groups {
  31. // 相同key的一组数据写入到channel
  32. source <- group
  33. }
  34. close(source)
  35. }()
  36. return Range(source)
  37. }

Reverse

reverse可以对流中元素进行反转处理:

流处理组件 fx - 图4

  1. // 例子
  2. fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
  3. fmt.Println(item)
  4. })
  5. // 源码
  6. func (p Stream) Reverse() Stream {
  7. var items []interface{}
  8. // 获取流中数据
  9. for item := range p.source {
  10. items = append(items, item)
  11. }
  12. // 反转算法
  13. for i := len(items)/2 - 1; i >= 0; i-- {
  14. opp := len(items) - 1 - i
  15. items[i], items[opp] = items[opp], items[i]
  16. }
  17. // 写入流
  18. return Just(items...)
  19. }

Distinct

distinct对流中元素进行去重,去重在业务开发中比较常用,经常需要对用户id等做去重操作:

  1. // 例子
  2. fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
  3. return item
  4. }).ForEach(func(item interface{}) {
  5. fmt.Println(item)
  6. })
  7. // 结果为 1,2,3,4,5,6
  8. // 源码
  9. func (p Stream) Distinct(fn KeyFunc) Stream {
  10. source := make(chan interface{})
  11. threading.GoSafe(func() {
  12. defer close(source)
  13. // 通过key进行去重,相同key只保留一个
  14. keys := make(map[interface{}]lang.PlaceholderType)
  15. for item := range p.source {
  16. key := fn(item)
  17. // key存在则不保留
  18. if _, ok := keys[key]; !ok {
  19. source <- item
  20. keys[key] = lang.Placeholder
  21. }
  22. }
  23. })
  24. return Range(source)
  25. }

Walk

Walk函数并发的作用在流中每一个item上,可以通过WithWorkers设置并发数,默认并发数为16,最小并发数为1,如设置unlimitedWorkers为true则并发数无限制,但并发写入流中的数据由defaultWorkers限制,WalkFunc中用户可以自定义后续写入流中的元素,可以不写入也可以写入多个元素:

  1. // 例子
  2. fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
  3. newItem := strings.ToUpper(item.(string))
  4. pipe <- newItem
  5. }).ForEach(func(item interface{}) {
  6. fmt.Println(item)
  7. })
  8. // 源码
  9. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  10. pipe := make(chan interface{}, option.workers)
  11. go func() {
  12. var wg sync.WaitGroup
  13. pool := make(chan lang.PlaceholderType, option.workers)
  14. for {
  15. // 控制并发数量
  16. pool <- lang.Placeholder
  17. item, ok := <-p.source
  18. if !ok {
  19. <-pool
  20. break
  21. }
  22. wg.Add(1)
  23. go func() {
  24. defer func() {
  25. wg.Done()
  26. <-pool
  27. }()
  28. // 作用在每个元素上
  29. fn(item, pipe)
  30. }()
  31. }
  32. // 等待处理完成
  33. wg.Wait()
  34. close(pipe)
  35. }()
  36. return Range(pipe)
  37. }

并发处理

fx工具除了进行流数据处理以外还提供了函数并发功能,在微服务中实现某个功能往往需要依赖多个服务,并发的处理依赖可以有效的降低依赖耗时,提升服务的性能。

concurrent_denpendency

  1. fx.Parallel(func() {
  2. userRPC() // 依赖1
  3. }, func() {
  4. accountRPC() // 依赖2
  5. }, func() {
  6. orderRPC() // 依赖3
  7. })

注意fx.Parallel进行依赖并行处理的时候不会有error返回,如需有error返回或者有一个依赖报错需要立马结束依赖请求请使用MapReduce工具进行处理。

总结

本篇文章介绍了流处理的基本概念和gozero中的流处理工具fx,在实际的生产中流处理场景应用也非常多,希望本篇文章能给大家带来一定的启发,更好的应对工作中的流处理场景。