微服务-04-共享调用

问题背景

并发场景下,可能会有多个线程(协程)同时请求同一份资源,如果每个请求都要走一遍资源的请求过程,除了比较低效之外,还会对资源服务造成并发的压力。

例如:

  1. 缓存失效的同时多个请求同时到达某服务请求相同资源,这些请求会继续访问DB做查询,会引起数据库压力瞬间增大。而使用 SharedCalls 可以使得同时多个请求只需要发起一次拿结果的调用,其他请求"坐享其成"。有效减少了资源服务的并发压力,可以有效防止缓存击穿
  2. 云平台服务众多,使用grpc通信的时候不期望每个服务之间都建立链接,而只在向对端发送消息的时候,才在服务之间建立通信。老的逻辑是当开始建立连接的时候会将创建改为正在创建链接,后续消息会因为正在建立链接会直接返回错误或阻塞等待结果(自行实现),而使用SharedCalls可以短时间内等待链接建立然后继续发送消息

sharecalls

演示代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
const round = 5
var wg sync.WaitGroup
barrier := syncx.NewSharedCalls()

wg.Add(round)
for i := 0; i < round; i++ {
// 多个线程同时执行
go func() {
defer wg.Done()
// 可以看到,多个线程在同一个 key 上去请求资源,获取资源的实际函数只会被调用一次
val, err := barrier.Do("once", func() (interface{}, error) {
// sleep 1秒,为了让多个线程同时取 once 这个 key 上的数据
time.Sleep(time.Second)
// 生成了一个随机的 id
return stringx.RandId(), nil
})
if err != nil {
fmt.Println(err)
} else {
fmt.Println(val)
}
}()
}
wg.Wait()
}

技术内幕

文件目录:core/syncx/singleflight.go

SingleFlight 通过为并发的请求根据相同的key提供相同的结果

一共提供了 DoDoEx 两种接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var SingleFlight interface {
Do(key string, fn func() (interface{}, error)) (interface{}, error)
DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
}

var call struct { //代指一次调用
wg sync.WaitGroup //用于等待call结束
val interface{}
err error
}

var flightGroup struct {
calls map[string]*call
lock sync.Mutex
}

首先查看两个基础函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (g *flightGroup) createCall(key string) (c *call, done bool) {
// 先申请加锁
g.lock.Lock()
if c, ok := g.calls[key]; ok { //如果key存在,那么等待
// 拿到 call 以后,释放锁,此处 call 可能还没有实际数据,只是一个空的内存占位
g.lock.Unlock()
//调用 wg.Wait,判断是否有其他 goroutine 正在申请资源,如果阻塞,说明有其他 goroutine 正在获取资源
c.wg.Wait() //等待相同的call的结束
// 当 wg.Wait 不再阻塞,表示资源获取已经结束,可以直接返回结果
return c, true
}

c = new(call) //创建一个新的call
c.wg.Add(1) //并为这个call添加一个的等待
g.calls[key] = c
g.lock.Unlock()

return c, false
}

func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
defer func() {
g.lock.Lock()
delete(g.calls, key) //删除
g.lock.Unlock()
c.wg.Done() //结束call
}()

c.val, c.err = fn() //执行函数并返回结果
}

再看实现逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
c, done := g.createCall(key) //根据key定义一个call
if done { //如果是等待结束了,且等待结束了,则返回Call的value
return c.val, c.err
}
//如果是新的,那么就首次执行call,并返回结果
g.makeCall(c, key, fn)
return c.val, c.err
}

func (g *flightGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
c, done := g.createCall(key)
if done { //等待结束
return c.val, false, c.err
}

g.makeCall(c, key, fn)
return c.val, true, c.err //新的结束
}

// NewSingleFlight returns a SingleFlight.
func NewSingleFlight() SingleFlight {
return &flightGroup{
calls: make(map[string]*call),
}
}

DoEx 相较于 Do 中增加了一个 bool 类型的返回值,表示返回的值是共享的还是首次拿到的

参考链接

  1. https://talkgo.org/t/topic/968