问题背景
并发场景下,可能会有多个线程(协程)同时请求同一份资源,如果每个请求都要走一遍资源的请求过程,除了比较低效之外,还会对资源服务造成并发的压力。
例如:
- 缓存失效的同时多个请求同时到达某服务请求相同资源,这些请求会继续访问DB做查询,会引起数据库压力瞬间增大。而使用
SharedCalls
可以使得同时多个请求只需要发起一次拿结果的调用,其他请求"坐享其成"。有效减少了资源服务的并发压力,可以有效防止缓存击穿
- 云平台服务众多,使用
grpc
通信的时候不期望每个服务之间都建立链接,而只在向对端发送消息的时候,才在服务之间建立通信。老的逻辑是当开始建立连接的时候会将创建改为正在创建链接,后续消息会因为正在建立链接会直接返回错误或阻塞等待结果(自行实现),而使用SharedCalls
可以短时间内等待链接建立然后继续发送消息

演示代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func main() { const round = 5 var wg sync.WaitGroup barrier := syncx.NewSharedCalls()
wg.Add(round) for i := 0; i < round; i++ { go func() { defer wg.Done() val, err := barrier.Do("once", func() (interface{}, error) { time.Sleep(time.Second) return stringx.RandId(), nil }) if err != nil { fmt.Println(err) } else { fmt.Println(val) } }() } wg.Wait() }
|
技术内幕
文件目录:core/syncx/singleflight.go
SingleFlight
通过为并发的请求根据相同的key提供相同的结果
一共提供了 Do
与 DoEx
两种接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| var SingleFlight interface { Do(key string, fn func() (interface{}, error)) (interface{}, error) DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error) }
var call struct { wg sync.WaitGroup val interface{} err error }
var flightGroup struct { calls map[string]*call lock sync.Mutex }
|
首先查看两个基础函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (g *flightGroup) createCall(key string) (c *call, done bool) { g.lock.Lock() if c, ok := g.calls[key]; ok { g.lock.Unlock() c.wg.Wait() return c, true }
c = new(call) c.wg.Add(1) g.calls[key] = c g.lock.Unlock()
return c, false }
func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) { defer func() { g.lock.Lock() delete(g.calls, key) g.lock.Unlock() c.wg.Done() }()
c.val, c.err = fn() }
|
再看实现逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { c, done := g.createCall(key) if done { return c.val, c.err } g.makeCall(c, key, fn) return c.val, c.err }
func (g *flightGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) { c, done := g.createCall(key) if done { return c.val, false, c.err }
g.makeCall(c, key, fn) return c.val, true, c.err }
func NewSingleFlight() SingleFlight { return &flightGroup{ calls: make(map[string]*call), } }
|
DoEx
相较于 Do
中增加了一个 bool
类型的返回值,表示返回的值是共享的还是首次拿到的
参考链接
- https://talkgo.org/t/topic/968