微服务-09-进程内缓存

问题背景

缓存,目的是减少繁重的IO操作,增加系统并发能力。之前做的内存方案

  1. 同一类数据,设计一个双链表,然后添加一个定时器,动态删除过期内存。
  2. 实现一个简单的LRU缓存淘汰算法。

设计内进程内缓存主要需要考虑的是

  1. 可拓展性(缓存不同类型的数据)
  2. 过期处理,怎样才能友好的删除过期数据,不能无限存储
    • 定时删除,不断循环所有key。缺点:需要遍历所有key,空耗CPU
    • 惰性删除,访问的时候判断该键是否。缺点:如果不在访问,那么就会一直存在
  3. 如果保障进程内缓存一致性
    • 单节点通知:一个节点完成修改,通知其他节点(类似raft,但raft有主)。缺点:时效性、可靠性
    • MQ订阅通知:一个节点完成修改,通知其他节点进行修改。缺点:缓存维护更复杂
    • 定时更新:单个节点定时器,定时拉去新数据,更新内存数据。缺点:时效性

实现原理

这里看看go-zero是如何实现进程缓存设计的

go-zero-cache

从图中可以看出

  1. 缓存被存放在map
  2. LRU(使用双向链表+Map实现)用于对缓存的过期删除
  3. limit 进行缓存数量限制
  4. 定时逻辑则是通过时间轮进行管理
  5. 还包含一个统计逻辑,进行缓存命中情况的上报

接下来看看它们是如何运转起来的

更新缓存

设置缓存要做三件事情

  1. 更新缓存
  2. Key到LRU管理中
  3. 添加或更新定时器
go-zero-cache15

问题1:图中还有一个获取过期时间的过程,设置缓存的时候不是应该已经设置了过期时间了吗,为什么这个地方还要再次获取?

其实这里获取的与过期时间近似的时间,目的是防止缓存雪崩

deviation:表示一个正负差值

base:给定的过期时间

v=(1+deviation2deviationrandom)basev = (1 + deviation - 2 * deviation * random) * base

问题2:原理图中还有一个设置limit的过程,为什么在添加的过程中没有体现?

其实这是LRU内部实现的时候使用的,当LRU双向链表长度大于limit的时候,会直接删除尾部节点

查询缓存

当缓存存在的时候,需要更新LRU。这里还添加了一个命中的逻辑

go-zero-cache16

获取缓存

获取缓存与查询缓存不同的是,如果缓存中不存在,那么需要从DB获取数据并插入到缓存中

go-zero-cache18

注意:

  1. 使用共享调用防止缓存穿透
  2. 查询DB操作其实是自定义操作(所以缓存不关心数据来源)
  3. 当缓存不存在,启动共享调用逻辑的时候它再次查询了一次缓存。因为DB操作是IO操作,而查询缓存是 O(1)的map内存操作

删除缓存

删除分为定时删除以及主动删除,当未更新定时器的时候,那么到期就会主动删除 K/V

如果是主动删除,则直接删除缓存,LRU 和 定时器

go-zero-cache17

技术内幕

代码路径:core/collection/cache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 自定义缓存可选参数
CacheOption func(cache *Cache)

// 进程内缓存对象
Cache struct {
name string //缓存名称,默认为 defaultCacheName="proc"
lock sync.Mutex //map锁
data map[string]interface{} //存储缓存
expire time.Duration //过期时间
timingWheel *TimingWheel //时间轮
lruCache lru //LRU 缓存淘汰机制
barrier syncx.SingleFlight //共享调用
unstableExpiry mathx.Unstable //随机过期时间差值(防止雪崩)
stats *cacheStat //缓存统计
}

初始化缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
cache := &Cache{
data: make(map[string]interface{}),
expire: expire,
lruCache: emptyLruCache,
barrier: syncx.NewSingleFlight(), //共享调用
unstableExpiry: mathx.NewUnstable(expiryDeviation),
}

//自定义操作包含 limit设置、缓存过期时间范围 expiryDeviation
for _, opt := range opts {
opt(cache)
}

if len(cache.name) == 0 {
cache.name = defaultCacheName //默认名称可以忽略
}
cache.stats = newCacheStat(cache.name, cache.size)

//时间轮已经订好了1s时间间隔
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
key, ok := k.(string)
if !ok {
return
}

cache.Del(key) //到期删除缓存
})
if err != nil {
return nil, err
}

cache.timingWheel = timingWheel
return cache, nil
}

查询缓存

查询缓存很简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Get returns the item with the given key from c.
func (c *Cache) Get(key string) (interface{}, bool) {
value, ok := c.doGet(key)
if ok {
c.stats.IncrementHit()
} else {
c.stats.IncrementMiss()
}

return value, ok
}

func (c *Cache) doGet(key string) (interface{}, bool) {
c.lock.Lock()
defer c.lock.Unlock()

value, ok := c.data[key]
if ok {
c.lruCache.add(key)
}

return value, ok
}

设置缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Set 设置缓存,默认使用缓存过期时间
func (c *Cache) Set(key string, value interface{}) {
c.SetWithExpire(key, value, c.expire)
}

// SetWithExpire 设置缓存 自定义过期时间
func (c *Cache) SetWithExpire(key string, value interface{}, expire time.Duration) {
c.lock.Lock()
_, ok := c.data[key]
c.data[key] = value
c.lruCache.add(key)
c.lock.Unlock()

expiry := c.unstableExpiry.AroundDuration(expire) //获取过期时间
if ok {
c.timingWheel.MoveTimer(key, expiry)
} else {
c.timingWheel.SetTimer(key, value, expiry)
}
}

获取缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (c *Cache) Take(key string, fetch func() (interface{}, error)) (interface{}, error) {
if val, ok := c.doGet(key); ok { //查询缓存
c.stats.IncrementHit()
return val, nil
}

var fresh bool
val, err := c.barrier.Do(key, func() (interface{}, error) {
// because O(1) on map search in memory, and fetch is an IO query
// so we do double check, cache might be taken by another call
// 再进行一次缓存操作
if val, ok := c.doGet(key); ok {
return val, nil
}

v, e := fetch()
if e != nil {
return nil, e
}

fresh = true
c.Set(key, v)
return v, nil
})
if err != nil {
return nil, err
}
if fresh {
c.stats.IncrementMiss()
return val, nil
}

// got the result from previous ongoing query
c.stats.IncrementHit()
return val, nil
}

删除缓存

1
2
3
4
5
6
7
func (c *Cache) Del(key string) {
c.lock.Lock()
delete(c.data, key) //删除k/v
c.lruCache.remove(key) //删除LRU
c.lock.Unlock()
c.timingWheel.RemoveTimer(key) //删除定时器
}

总结

  1. 设置缓存过期时间的时候,设置一个范围时间防止缓存雪崩
  2. 使用共享调用去请求数据防止缓存击穿
  3. 通过LRU进行缓存淘汰,当多副本存在可能存在缓存一致性问题
  4. 统计逻辑用于上报,并没有一个开关(可能存在性能问题)

参考链接

  1. https://talkgo.org/t/topic/2263