微服务-10-数据库缓存

问题背景

进程内缓存中说到的是直接在进程进行缓存的自管理,一般应用于业务生成的自定义数据(多副本情况下可能存在缓存一致性问题)。而随着业务量的增加,利用Redis构建专门的数据缓存,加快数据库访问。

由于数据库数据的特点,那么这里就有几个实现需要注意的点

  1. 数据一致性
  2. 缓存击穿、穿透、雪崩
  3. 缓存访问量、缓存命中率

实现原理

源码注释:core/stores/cache

构建缓存对象

根据缓存节点配置构建一个缓存对象用于业务层进行缓存处理

go-zero-cache10缓存支持通过多节点构建的缓存集群,也支持单个节点结构

  1. 多节点缓存集群通过一致性Hash进行访问,这里的多节点是指多个无关的缓存节点。而每个缓存节点可能都是一个Redis集群
  2. 共享调用防止缓存击穿
  3. 统计将环中命中,DB查询等情况上报给外部
1
2
3
4
5
//缓存集群对象
cacheCluster struct {
dispatcher *hash.ConsistentHash
errNotFound error
}

关于一致性Hash 在 《go-zero-一致性Hash》有说明

集群缓存操作

集群的缓存操作都是根据一致性Hash算法得出对应节点,然后演变成单节点的缓存操作

go-zero-cache13

与单点不同的是,缓存集群批量删除Key,而Key有可能存在于多个节点上

go-zero-cache14

keys数量大于1的时候,首先会 for Hash(Key) 找到所有的缓存节点,然后再通过 for del

  1. 单次映射Hash节点或者删除Key并不会影响后续操作,而是通过 BatchError记录每一次错误
  2. 通过 make(map[interface{}][]string)保存节点Key的关系

单点缓存操作

查询缓存

查询缓存的过程其实就是从Redis中获取数据的过程

go-zero-cachedb1

这里有两点需要注意

  1. 当加载数据完毕,有一个将结果与空占位符比较的过程 value = "*",是为了防止缓存穿透而故意设置的占位符。那么它是什么时候怎么插入的?
  2. 将结果反序列如果失败,那么会去Redis删除这个 Key,表示存储的缓存异常
设置缓存

设置缓存是直接按照redis语法设置 k/vexpire

go-zero-cacheDB2
删除缓存
go-zero-cachedb3
  1. 删除缓存时候,如果本身就是一个缓存集群,当对 keys进行批量删除的时候,需要依次删除每一个Key,而不是直接 del(keys...) 删除。
  2. 删除失败这里添加延时任务进行重试,但只会重试一次,失败后直接退出
获取缓存

获取缓存当缓存数据库没有的时候,就会直接从数据库加载并将数据保存到缓存数据库

go-zero-cachedb5
  1. 当缓存中没有且数据库中也没有的时候,那么这个时候就会设置占位符(防止缓存穿透),占位符的过期时间与普通的Key一致
  2. 关于查询数据库操作,这里仅仅是将结果实体传入,由数据层进行数据加载
  3. 当缓存没有的时候,是先查询数据库,然后更新缓存

技术内幕

代码:core/stores/cache/cachenode.go

cacheNode 表示是单个缓存节点的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//cacheNode 表示单个缓存节点
type cacheNode struct {
rds *redis.Redis //redis句柄
expiry time.Duration //缓存过期时间
notFoundExpiry time.Duration //
barrier syncx.SingleFlight //共享调用
r *rand.Rand
lock *sync.Mutex //原子锁
unstableExpiry mathx.Unstable
stat *Stat //统计(单个节点的统计)
errNotFound error
}


const (
notFoundPlaceholder = "*"
//避免缓存雪崩,这里加上随机过期时间随机值 [0.95, 1.05] * seconds
expiryDeviation = 0.05
)
  1. 这里几个过期分别有什么作用
    • expiry:
    • notFoundExpiry:
    • unstableExpiry:
  2. 使用共享调用 barrier减少缓存调用
  3. rand.Rand 的随机数

新建缓存节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
errNotFound error, opts ...Option) Cache {
o := newOptions(opts...)
return cacheNode{
rds: rds, //redis句柄
expiry: o.Expiry, //过期时间
notFoundExpiry: o.NotFoundExpiry, //设置占位符过期时间
barrier: barrier, //共享调用
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation), //一定范围内过期时间
stat: st, //统计逻辑
errNotFound: errNotFound, //找不到缓存
}
}

查询缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (c cacheNode) doGetCache(ctx context.Context, key string, v interface{}) error {
c.stat.IncrementTotal()
data, err := c.rds.GetCtx(ctx, key)
if err != nil {
c.stat.IncrementMiss()
return err
}

if len(data) == 0 { //数据为空
c.stat.IncrementMiss()
return c.errNotFound
}

c.stat.IncrementHit()
if data == notFoundPlaceholder { //防止缓存穿透
return errPlaceholder
}

return c.processCache(ctx, key, data, v) //缓存处理
}

func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error {
err := jsonx.Unmarshal([]byte(data), v) //反序列化
if err == nil {
return nil
}

//... 日志记录

if _, e := c.rds.DelCtx(ctx, key); e != nil { //删除缓存
logger.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
c.rds.Addr, key, data, e)
}

// returns errNotFound to reload the value by the given queryFn
return c.errNotFound
}

设置缓存

1
2
3
4
5
6
7
8
9
func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val interface{},
expire time.Duration) error {
data, err := jsonx.Marshal(val) //序列化
if err != nil {
return err
}

return c.rds.SetexCtx(ctx, key, string(data), int(expire.Seconds()))
}

获取缓存

获取缓存直接加载加载redis,并根据加载结果进行不同的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (c cacheNode) doTake(ctx context.Context, v interface{}, key string,
query func(v interface{}) error, cacheVal func(v interface{}) error) error {
logger := logx.WithContext(ctx)
val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) { //共享调用加载key
if err := c.doGetCache(ctx, key, v); err != nil {
if err == errPlaceholder { //如果是占位符直接返回找不到
return nil, c.errNotFound
} else if err != c.errNotFound {
//如果是其他错误,直接返回而不要继续将错误蔓延到dbs
//如果不这样,可能高并发导致redis奔溃之后,dbs也会跟着崩溃
return nil, err
}

//数据库查询
if err = query(v); err == c.errNotFound {
//没有找到就设置占位符
if err = c.setCacheWithNotFound(ctx, key); err != nil {
logger.Error(err)
}

return nil, c.errNotFound
} else if err != nil {
c.stat.IncrementDbFails()
return nil, err
}

if err = cacheVal(v); err != nil { //缓存找到的缓存
logger.Error(err)
}
}

return jsonx.Marshal(v)
})
if err != nil {
return err
}
if fresh {
return nil
}

//放在最后的原因是共享调用,这里共享调用不记录到总数中
c.stat.IncrementTotal()
c.stat.IncrementHit()

return jsonx.Unmarshal(val.([]byte), v) //反序列化
}

删除缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error {
if len(keys) == 0 {
return nil
}

logger := logx.WithContext(ctx)
if len(keys) > 1 && c.rds.Type == redis.ClusterType { //如果是集群则for循环批量删除
for _, key := range keys {
if _, err := c.rds.DelCtx(ctx, key); err != nil {
logger.Errorf("failed to clear cache with key: %q, error: %v", key, err)
c.asyncRetryDelCache(key) //失败添加重试任务
}
}
} else if _, err := c.rds.DelCtx(ctx, keys...); err != nil {
logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
c.asyncRetryDelCache(keys...)
}

return nil
}

缓存统计

代码:core/stores/cache/cachestat.go

用于统计缓存情况

新建统计对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// NewStat returns a Stat.
func NewStat(name string) *Stat {
ret := &Stat{
name: name,
}
go ret.statLoop()

return ret
}

//statLoop 开启统计
func (s *Stat) statLoop() {
ticker := time.NewTicker(statInterval)
defer ticker.Stop()

for range ticker.C {
total := atomic.SwapUint64(&s.Total, 0)
if total == 0 {
continue
}

//计算缓存情况
hit := atomic.SwapUint64(&s.Hit, 0)
percent := 100 * float32(hit) / float32(total) //命中率
miss := atomic.SwapUint64(&s.Miss, 0) //未命中
dbf := atomic.SwapUint64(&s.DbFails, 0) //数据库调用事变

//发送统计信息
logx.Statf("dbcache(%s) - qpm: %d, hit_ratio: %.1f%%, hit: %d, miss: %d, db_fails: %d", s.name, total, percent, hit, miss, dbf)
}
}

注意:

  1. 协程是一个常驻协程,缺少退出
  2. SwapUint64的作用是:将新的值写入 addr,而返回addr中旧的值

缓存清理

当缓存删除失败,这里添加一个重试机制

初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func init() {
var err error
//时间轮
timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
logx.Must(err)
//监听系统异常退出
proc.AddShutdownListener(func() {
timingWheel.Drain(clean)
})
}

//clean 时间轮操作
func clean(key, value interface{}) {
taskRunner.Schedule(func() {
dt := value.(delayTask)
err := dt.task() //执行任务
if err == nil {
return
}

next, ok := nextDelay(dt.delay) //重复任务设置
if ok {
dt.delay = next
timingWheel.SetTimer(key, dt, next)
} else {
msg := fmt.Sprintf("retried but failed to clear cache with keys: %q, error: %v",
formatKeys(dt.keys), err)
logx.Error(msg)
stat.Report(msg) //暂未实现
}
})
}

这里做了一个动态清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func nextDelay(delay time.Duration) (time.Duration, bool) {
switch delay {
case time.Second:
return time.Second * 5, true
case time.Second * 5:
return time.Minute, true
case time.Minute:
return time.Minute * 5, true
case time.Minute * 5:
return time.Hour, true
default:
return 0, false
}
}
添加清理任务
1
2
3
4
5
6
7
8
// AddCleanTask adds a clean task on given keys.
func AddCleanTask(task func() error, keys ...string) {
timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{
delay: time.Second,
task: task,
keys: keys,
}, time.Second)
}

总结

  1. 采用共享调用的方式防止缓存击穿
  2. 采用占位符方式缓存床头
  3. 设置范围过期时间防止缓存雪崩
  4. 增加重试删除机制(时间轮)

参考链接

  1. https://talkgo.org/t/topic/1716
  2. https://talkgo.org/t/topic/1505