微服务-03-缓存一致性

问题背景

一致性有很多种

  • 强一致性:保证写入后立即可以读取
  • 弱一致性:在系统写入后,不承诺立即可以读到写入的值,也不承诺多久之后数据能够达到一致,但会尽可能地保证到某个时间级别(比如秒级别)后,数据能够达到一致状态
  • 最终一致性:最终一致性是弱一致性的一个特例,系统会保证在一定时间内,能够达到一个数据一致的状态

缓存可以提升性能、缓解数据库压力,使用缓存也会导致数据不一致性的问题

缓存系统的数据一致性通常包括持久化层和缓存层的一致性、以及多级缓存之间的一致性,这里讨论是前者。持久化层和缓存层的一致性问题也通常被称为双写一致性问题。

实现原理

引入 Cache 之后,延迟或程序失败等都会导致缓存和实际存储层数据不一致,下面几种模式减少不一致风险

  1. Cache-Aside Pattern,即旁路缓存模式
  2. Read-Through/Write-Through,读写穿透模式
  3. Write behind,异步缓存写入模式

Cache-Aside

读模式

当缓存命中则直接返回,否则从数据库读取数据并更新缓存

cache-same-6
写模式

首先更新数据库,然后删除缓存

cache-same-3

问题1:为什么是删除缓存,而不是更新缓存

  1. 如果缓存需要通过大量的计算(联表查询更新),那么更新缓存会是一笔不小的开销
  2. 另外如果写操作比较多,可能存在刚更新的缓存还没有读取就又要更新的情况(称为缓存扰动),所以此模式适用于读多写少的模式
  3. 等到读请求未命中再去更新,符合懒加载思路
  4. 并发更新可能导致缓存落后与数据库,读请求读到的仍然是旧缓存
cache-same-8

问题2:为什么是先更新数据库,而不是先删除缓存

cahche-same-10

数据库查询请求往往比更新请求更快,可能这种异常更容易出现

Read/Write Through

读模式

当缓存命中则直接返回,否则从数据库读取数据并更新缓存

cache-same-6

Read/Write Through模式中,服务端把缓存作为主要数据存储。应用程序跟数据库缓存交互,都是通过抽象缓存层完成的

cache-same-11
写模式

Write Through模式在发生Cache Miss的时候,只会在读请求中更新缓存。

  • 写请求在发生Cache Miss的时候不会更新缓存,而是直接写入数据库;

  • 如果命中缓存则先更新缓存,由缓存自己再将数据写回到数据库中

cache-same-12

注意这个时候如果命中缓存,是先更新缓存的。也就说和 Cache-Aside一样存在并发场景下的一致性问题

这个策略的核心原则:用户只与缓存打交道,由缓存组件和DB通信,写入或者读取数据。在一些本地进程缓存组件可以考虑这种策略

Write-Through 存在的缺陷:写数据时缓存和数据库同步,但是我们知道这两块存储介质的速度差几个数量级,对写入性能是有很大影响。那我们是否异步更新数据库

Write behind

Write behind 跟有相似的地方,都是由Cache Provider来负责缓存和数据库的读写。它两又有个很大的不同:Read/Write Through是同步更新缓存和数据的,Write Behind则是只更新缓存,不直接更新数据库,通过批量异步的方式来更新数据库

cache-same-13

缓存和数据库的一致性不强,对一致性要求高的系统要谨慎使用。但是它适合频繁写的场景,MySQL的InnoDB Buffer Pool机制就使用到这种模式

延时双删

延时双删主要用于 Redis主从节点的场景,延时的原因是,mysql 和 redis 主从节点数据不是实时同步的,同步数据需要时间。

  1. 服务节点删除 redis 主库数据
  2. 服务节点修改 mysql 主库数据
  3. 服务节点使得当前业务处理 等待一段时间,等 redis 和 mysql 主从节点数据同步成功。
  4. 服务节点从 redis 主库删除数据。
  5. 当前或其它服务节点读取 redis 从库数据,发现 redis 从库没有数据,从 mysql 从库读取数据,并写入 redis 主库

注意:

  1. 延时双删,有等待环节,如果系统要求低延时,这种场景就不合适了。
  2. 延时双删,不适合“秒杀”这种频繁修改数据和要求数据强一致的场景。
  3. 延时双删,延时时间是一个预估值,不能确保 mysql 和 redis 数据在这个时间段内都实时同步或持久化成功了

重试保障

方案1:服务自行订阅删除缓存消息
  1. 更新数据库数据;
  2. 缓存因为种种问题删除失败;
  3. 将需要删除的key发送至消息队列;
  4. 自己消费消息,获得需要删除的key;
  5. 继续重试删除操作,直到成功
方案2:利用第三方服务删除缓存
  1. 更新数据库数据;
  2. 数据库会将操作信息写入binlog日志当中;
  3. 订阅程序提取出所需要的数据以及key;
  4. 另起一段非业务代码,获得该信息;
  5. 尝试删除缓存操作,发现删除失败;
  6. 将这些信息发送至消息队列;
  7. 重新从消息队列中获得该数据,重试操作

注意:

  1. 删除缓存也可能存储缓存击穿的问题
  2. 使用方案1进行消息订阅的时候可能出现消息队列也失败的情况

强一致性肯定会有性能影响(比如 raft协议需要等待超过半数节点做出响应),另外强一致性的异常处理

技术内幕

来看看 rockscache 如何解决缓存一致性的,

地址:https://github.com/dtm-labs/rockscache

The First Redis Cache Library To Ensure Eventual Consistency And Strong Consistency With DB.

变量定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//rockscache client 可选参数
type Options struct {
//标记删除key的延时删除时间 默认10s
Delay time.Duration
//EmptyExpire 是空结果的过期时间。默认为 60 秒
EmptyExpire time.Duration
// LockExpire 是更新缓存时分配的锁的过期时间。默认为 3s
LockExpire time.Duration
//锁失败后的重试等待时间 100ms
LockSleep time.Duration
// 等待副本数量
WaitReplicas int
// 副本等待超时时间 默认300ms
WaitReplicasTimeout time.Duration
//随机过期时间,0.1的偏移(缓存雪崩)
RandomExpireAdjustment float64
// 标识缓存禁止读,默认关闭。用于缓存宕机时候的降级
DisableCacheRead bool
// 标识缓存删除,默认关闭。用于缓存宕机时候的降级
DisableCacheDelete bool
// 强一致性,默认关闭
StrongConsistency bool
}

lua脚本

使用脚本进行redis操作,lua的好处是一次性执行,执行过程其他脚本或命令无法执行(注意不确定参数)。

这里使用hash进行数据存储,同时保存 key/valuekey/lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (c *Client) luaGet(key string, owner string) ([]interface{}, error) {
res, err := callLua(c.rdb.Context(), c.rdb, ` -- luaGet
local v = redis.call('HGET', KEYS[1], 'value') //获取值
local lu = redis.call('HGET', KEYS[1], 'lockUtil') //获取过期时间
if lu ~= false and tonumber(lu) < tonumber(ARGV[1]) or lu == false and v == false then
redis.call('HSET', KEYS[1], 'lockUtil', ARGV[2]) //如果锁已经过期或者不存在,则更新锁
redis.call('HSET', KEYS[1], 'lockOwner', ARGV[3])
return { v, 'LOCKED' }
end
return {v, lu}
`, []string{key}, []interface{}{now(), now() + int64(c.Options.LockExpire/time.Second), owner})
debugf("luaGet return: %v, %v", res, err)
if err != nil {
return nil, err
}
return res.([]interface{}), nil
}

func (c *Client) luaSet(key string, value string, expire int, owner string) error {
_, err := callLua(c.rdb.Context(), c.rdb, `-- luaSet
local o = redis.call('HGET', KEYS[1], 'lockOwner')
if o ~= ARGV[2] then
return
end
redis.call('HSET', KEYS[1], 'value', ARGV[1])
redis.call('HDEL', KEYS[1], 'lockUtil')
redis.call('HDEL', KEYS[1], 'lockOwner')
redis.call('EXPIRE', KEYS[1], ARGV[3])
`, []string{key}, []interface{}{value, owner, expire})
return err
}

加锁和解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//加锁
func (c *Client) LockForUpdate(key string, owner string) error {
lockUtil := math.Pow10(10)
res, err := callLua(c.rdb.Context(), c.rdb, ` -- luaLock
local lu = redis.call('HGET', KEYS[1], 'lockUtil')
local lo = redis.call('HGET', KEYS[1], 'lockOwner')
if lu == false or tonumber(lu) < tonumber(ARGV[2]) or lo == ARGV[1] then
redis.call('HSET', KEYS[1], 'lockUtil', ARGV[2])
redis.call('HSET', KEYS[1], 'lockOwner', ARGV[1])
return 'LOCKED'
end
return lo
`, []string{key}, []interface{}{owner, lockUtil})
if err == nil && res != "LOCKED" {
return fmt.Errorf("%s has been locked by %s", key, res)
}
return err
}

//解锁
func (c *Client) UnlockForUpdate(key string, owner string) error {
_, err := callLua(c.rdb.Context(), c.rdb, ` -- luaUnlock
local lo = redis.call('HGET', KEYS[1], 'lockOwner')
if lo == ARGV[1] then
redis.call('HSET', KEYS[1], 'lockUtil', 0)
redis.call('HDEL', KEYS[1], 'lockOwner')
end
`, []string{key}, []interface{}{owner})
return err
}

读取缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// new a client for rockscache using the default options
rc := rockscache.NewClient(redisClient, NewDefaultOptions())

v, err := rc.Fetch("key1", 300, func()(string, error) {
// fetch data from database or other sources
return "value1", nil
})

func (c *Client) Fetch(key string, expire time.Duration, fn func() (string, error)) (string, error) {
ex := expire - c.Options.Delay - time.Duration(rand.Float64()*c.Options.RandomExpireAdjustment*float64(expire))
v, err, _ := c.group.Do(key, func() (interface{}, error) { //同样使用共享调用进行操作
if c.Options.DisableCacheRead { //缓存崩溃直接读数据库
return fn()
} else if c.Options.StrongConsistency { //强一致性
return c.strongFetch(key, ex, fn)
}
return c.weakFetch(key, ex, fn)
})
return v.(string), err
}

这里也提供了忽略锁的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *Client) RawGet(key string) (string, error) {
return c.rdb.HGet(c.rdb.Context(), key, "value").Result()
}

func (c *Client) RawSet(key string, value string, expire time.Duration) error {
err := c.rdb.HSet(c.rdb.Context(), key, "value", value).Err()
if err == nil {
//如果过期操作失败了,那么缓存可能永远不过期(根据AOF策略,默认每秒)
//操作失败可能是网络或者redis宕机,如果是宕机,那么key可能都还没有落盘。所以这里得考虑网络异常情况
err = c.rdb.Expire(c.rdb.Context(), key, expire).Err()
}
return err
}

强一致性获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (c *Client) weakFetch(key string, expire time.Duration, fn func() (string, error)) (string, error) {
debugf("weakFetch: key=%s", key)
owner := shortuuid.New()
r, err := c.luaGet(key, owner)
for err == nil && r[0] == nil && r[1].(string) != locked {
debugf("empty result for %s locked by other, so sleep %s", key, c.Options.LockSleep.String())
time.Sleep(c.Options.LockSleep)
r, err = c.luaGet(key, owner)
}
if err != nil {
return "", err
}
if r[1] != locked {
return r[0].(string), nil
}
if r[0] == nil {
return c.fetchNew(key, expire, owner, fn)
}
go withRecover(func() {
_, _ = c.fetchNew(key, expire, owner, fn)
})
return r[0].(string), nil
}

func (c *Client) strongFetch(key string, expire time.Duration, fn func() (string, error)) (string, error) {
debugf("strongFetch: key=%s", key)
owner := shortuuid.New()
r, err := c.luaGet(key, owner)
for err == nil && r[1] != nil && r[1] != locked { // locked by other
debugf("locked by other, so sleep %s", c.Options.LockSleep)
time.Sleep(c.Options.LockSleep)
r, err = c.luaGet(key, owner)
}
if err != nil {
return "", err
}
if r[1] != locked { // normal value
return r[0].(string), nil
}
return c.fetchNew(key, expire, owner, fn)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *Client) fetchNew(key string, expire time.Duration, owner string, fn func() (string, error)) (string, error) {
result, err := fn() //自定义读取数据
if err != nil { //成功则删除锁
_ = c.UnlockForUpdate(key, owner)
return "", err
}
if result == "" { //如果结果为空
if c.Options.EmptyExpire == 0 { // if empty expire is 0, then delete the key
err = c.rdb.Del(c.rdb.Context(), key).Err()
return "", err
}
expire = c.Options.EmptyExpire
}
err = c.luaSet(key, result, int(expire/time.Second), owner) //更新缓存
return result, err
}

总结

应该根据场景来设计合适的方案解决缓存一致性问题

  1. 读多写少的场景下,可以选择采用 Cache-Aside 结合消费数据库日志做补偿 的方案
  2. 写多的场景下,可以选择采用 Write-Through 结合分布式锁的方案
  3. 写多的极端场景下,可以选择采用 Write-Behind 的方案
  4. 可以通过读取 binlog (阿里云canal)异步删除缓存缓存

参考文档

  1. https://blog.csdn.net/qq_34827674/article/details/123463175
  2. https://learn.lianglianglee.com/专栏/300分钟吃透分布式缓存-完
  3. 分布式之数据库和缓存双写一致性方式解析
  4. Cache-Aside Pattern
  5. Scaling Memcache at Facebook
  6. https://www.w3cschool.cn/architectroad/architectroad-cache-architecture-design.html
  7. https://cloud.tencent.com/developer/article/1932934
  8. https://segmentfault.com/a/1190000040976439
  9. https://talkgo.org/t/topic/1505
  10. https://github.com/dtm-labs/rockscache/blob/main/helper/README-cn.md