微服务-05-MapReduce

什么是MapReduce?

MapReduce是Google提出了一个软件架构,用于大规模数据集的并行运算。

MapReduce通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性;每个节点会周期性的把完成的工作和状态的更新报告回来。如果一个节点保持沉默超过一个预设的时间间隔,主节点记录下这个节点状态为死亡,并把分配给这个节点的数据发到别的节点。

go-zeroMapReduce则借鉴其中的思想,接下来一起看下go-zero是如何应用这一思想的

问题背景

在微服务中开发中,如果多个服务串行依赖的话那么整个API的耗时将会大大增加。通过什么手段来优化?

  1. 传输层面通过MQ的解耦特性来降低API的耗时
    • MQ通信效率没有grpc高(消息通过MQ服务器进行中转)
  2. 业务层面通过Go语言的WaitGroup工具来进行并发控制
    • 自行封装AddDone

实际业务场景中:

  • 如果接口的多个依赖有一个出错,则期望能立即返回且不必等待所有依赖都执行完毕。已经完成的接口调用也应该回滚
  • 多个依赖可能有部分依赖之间也存在着相互依赖,或者上下关系

go-zero的主要应用场景为:需要从不同的rpc服务中获取相应属性组装成复杂对象,比如要查询商品详情:

  1. 商品服务-查询商品属性
  2. 库存服务-查询库存属性
  3. 价格服务-查询价格属性
  4. 营销服务-查询营销属性

如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,简单场景下使用 WaitGroup 也能够满足需求,但如果要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?

go-zero通过mapreduce来处理这种对输入数据进行处理最后输出清洗数据的问题。是一种经典的模式:生产者消费者模式。将数据处理分为三个阶段:

  1. 数据生产 generate(查询,必选)
  2. 数据加工 mapper(加工,可选)
  3. 数据聚合 reducer(聚合,可选)

利用协程处理以及管道通信,实现数据的加速处理

应用场景

场景1

对数据批处理,比如对一批用户id,效验每个用户的合法性并且效验过程中有一个出错就认为效验失败,返回的结果为效验合法的用户id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package mapreduce

import (
"errors"
"log"

"github.com/zeromicro/go-zero/core/mr"
)

func Run(uids []int) ([]int, error) {
r, err := mr.MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
source <- uid
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
uid := item.(int)
ok, err := check(uid)
if err != nil { //如果校验逻辑有问题,这里执行cancel整个校验过程停止
cancel(err)
}
if ok { //如果校验失败,那么不返回该uid
writer.Write(uid)
}
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
var uids []int
for p := range pipe {
uids = append(uids, p.(int))
}
writer.Write(uids)
})
if err != nil {
log.Printf("check error: %v", err)
return nil, err
}

return r.([]int), nil
}

func check(uid int) (bool, error) {
// do something check user legal
if uid == 0 {
return false, errors.New("uid wrong")
}
return true, nil
}

其实是利用N个协程等待数据生产者的数据传输然后转交给聚合逻辑处理

mapReduce-1

场景2

某些功能的结果往往需要依赖多个服务,比如商品详情的结果往往会依赖用户服务、库存服务、订单服务等等,一般被依赖的服务都是以rpc的形式对外提供,为了降低依赖的耗时我们往往需要对依赖做并行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func productDetail(uid, pid int64) (*ProductDetail, error) {
var pd ProductDetail
err := mr.Finish(func() (err error) {
pd.User, err = userRpc.User(uid)
return
}, func() (err error) {
pd.Store, err = storeRpc.Store(pid)
return
}, func() (err error) {
pd.Order, err = orderRpc.Order(pid)
return
})

if err != nil {
log.Printf("product detail error: %v", err)
return nil, err
}

return &pd, nil
}

技术内幕

源码目录:core/mr/mapreduce.go

其中利用到的对外函数有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// MapReduce 包含数据生产、数据处理以及数据聚合阶段并返回结果
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {}

// MapReduceChan 包含数据生产、数据处理以及数据聚合阶段并返回结果。其中利用chan代替数据生产
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}

// ForEach 只包含数据生产和数据处理阶段,但没有任何输出
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {}

// FinishVoid 并行运行 fns
func FinishVoid(fns ...func()) {

// Finish 并行运行 fns,在任何错误时取消
func Finish(fns ...func() error) error {}

//WithWorkers 定义一个 mapreduce 有几个协程
func WithWorkers(workers int) Option {}

数据生产阶段

首先定义 buildSource使用协程进行数据生产

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//buildSource 使用协程执行generate 并将协程的参数一个非缓冲的channel返回。如果generate发生panic,则将错误写入 onceChan
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
source := make(chan interface{})
go func() {
defer func() {
if r := recover(); r != nil {
panicChan.write(r)
}
close(source)
}()

generate(source) //返回的非缓冲隧道也是数据生产的入口
}()

return source
}

其中的 onceChan则是一个非阻塞会缓冲的channel,当channel中还有数据没有处理完,则直接返回

1
2
3
4
5
6
7
8
9
10
11
12
type onceChan struct {
channel chan interface{}
wrote int32
}

func (oc *onceChan) write(val interface{}) {
if atomic.AddInt32(&oc.wrote, 1) > 1 {
return
}

oc.channel <- val
}

数据处理阶段

接着利用mapReduceWithPanicChan进行数据处理mapper和数据聚合reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
source := buildSource(generate, panicChan) //使用协程执行generate并返回数据生产无缓冲隧道source
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) //将隧道和处理函数传入
}

//source就是数据来源隧道
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
reducer ReducerFunc, opts ...Option) (interface{}, error) {
options := buildOptions(opts...)
// output is used to write the final result
output := make(chan interface{})
//...defer

//collector 用于从mapper收集数据,消费者是数据聚合
collector := make(chan interface{}, options.workers)

//done表示结束,所有的mappers和reducer都要结束
done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once
// use atomic.Value to avoid data race
//...

go func() { //起一个协程进行 数据聚合
//panic.wirte()
reducer(collector, writer, cancel)
}()

go executeMappers(mapperContext{ //进行 数据处理
ctx: options.ctx,
mapper: func(item interface{}, w Writer) {
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})

select { //等待结果
case <-options.ctx.Done():
cancel(context.DeadlineExceeded)
return nil, context.DeadlineExceeded
case v := <-panicChan.channel:
panic(v)
case v, ok := <-output:
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return v, nil
} else {
return nil, ErrReduceNoOutput
}
}
}

其中数据处理阶段又定义了一个协程进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func executeMappers(mCtx mapperContext) {
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(mCtx.collector)
drain(mCtx.source)
}()

var failed int32
pool := make(chan lang.PlaceholderType, mCtx.workers) //协程池
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
for atomic.LoadInt32(&failed) == 0 {
select {
case <-mCtx.ctx.Done():
return
case <-mCtx.doneChan:
return
case pool <- lang.Placeholder: //这里是定义的N个works的chan,也就是会在下面创建n个协程
item, ok := <-mCtx.source
if !ok { //如果来源关闭,那么将pool数据释放
<-pool
return
}

wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
atomic.AddInt32(&failed, 1)
mCtx.panicChan.write(r)
}
wg.Done()
<-pool
}()

//item:生产的数据
//writer: 数据处理对象
mCtx.mapper(item, writer) //执行map
}()
}
}
}

数据聚合阶段

最后来看一下是数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type guardedWriter struct {	//写入接口
ctx context.Context //保证超时退出
channel chan<- interface{} //接收数据,这里传输的就是 长度为 N 的 collect channel
done <-chan lang.PlaceholderType //主动结束退出
}

func newGuardedWriter(ctx context.Context, channel chan<- interface{},
done <-chan lang.PlaceholderType) guardedWriter {
return guardedWriter{
ctx: ctx,
channel: channel,
done: done,
}
}

Finish

Finish逻辑只进行并发处理,其实内部是将执行函数做为数据生产的生产的数据,然后又数据处理逻辑进行处理

mapreduce_finish

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func Finish(fns ...func() error) error {
if len(fns) == 0 { //n个外部调用
return nil
}

return MapReduceVoid(func(source chan<- interface{}) {
for _, fn := range fns {
source <- fn //数据生产者将函数传入
}
}, func(item interface{}, writer Writer, cancel func(error)) {
fn := item.(func() error)
if err := fn(); err != nil { //数据处理逻辑执行函数
cancel(err) //这里并没有写入,所以第三个函数其实并没有执行
}
}, func(pipe <-chan interface{}, cancel func(error)) {
}, WithWorkers(len(fns)))
}

//底层还是执行的MapReduce逻辑
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel)
}, opts...)
if errors.Is(err, ErrReduceNoOutput) {
return nil
}
return err
}

总结:

  1. 适用于并发无顺序依赖的并发调用,如果是多个调用具有前后依赖关系,依然需要有先后调用顺序(废话)
  2. 也不存在回滚的操作,内部只是将不在等待处理结果直接退出 select

参考链接

  1. https://talkgo.org/t/topic/1452
  2. https://zh.wikipedia.org/wiki/MapReduce