问题背景
在 进程内缓存中说到的是直接在进程进行缓存的自管理,一般应用于业务生成的自定义数据(多副本情况下可能存在缓存一致性问题)。而随着业务量的增加,利用Redis构建专门的数据缓存,加快数据库访问。
由于数据库数据的特点,那么这里就有几个实现需要注意的点
- 数据一致性
- 缓存击穿、穿透、雪崩
- 缓存访问量、缓存命中率
实现原理
源码注释:core/stores/cache
构建缓存对象
根据缓存节点配置构建一个缓存对象用于业务层进行缓存处理
缓存支持通过多节点构建的缓存集群,也支持单个节点结构
- 多节点缓存集群通过一致性Hash进行访问,这里的多节点是指多个无关的缓存节点。而每个缓存节点可能都是一个Redis集群
- 共享调用防止缓存击穿
- 统计将环中命中,DB查询等情况上报给外部
1 2 3 4 5
| cacheCluster struct { dispatcher *hash.ConsistentHash errNotFound error }
|
关于一致性Hash 在 《go-zero-一致性Hash》有说明
集群缓存操作
集群的缓存操作都是根据一致性Hash
算法得出对应节点,然后演变成单节点的缓存操作
与单点不同的是,缓存集群批量删除Key
,而Key
有可能存在于多个节点上
当keys
数量大于1的时候,首先会 for Hash(Key)
找到所有的缓存节点,然后再通过 for del
- 单次映射
Hash
节点或者删除Key
并不会影响后续操作,而是通过 BatchError
记录每一次错误
- 通过
make(map[interface{}][]string)
保存节点与Key的关系
单点缓存操作
查询缓存
查询缓存的过程其实就是从Redis中获取数据的过程
这里有两点需要注意
- 当加载数据完毕,有一个将结果与空占位符比较的过程
value = "*"
,是为了防止缓存穿透而故意设置的占位符。那么它是什么时候怎么插入的?
- 将结果反序列如果失败,那么会去Redis删除这个
Key
,表示存储的缓存异常
设置缓存
设置缓存是直接按照redis语法设置 k/v
与 expire
删除缓存
- 删除缓存时候,如果本身就是一个缓存集群,当对
keys
进行批量删除的时候,需要依次删除每一个Key,而不是直接 del(keys...)
删除。
- 删除失败这里添加延时任务进行重试,但只会重试一次,失败后直接退出
获取缓存
获取缓存当缓存数据库没有的时候,就会直接从数据库加载并将数据保存到缓存数据库
- 当缓存中没有且数据库中也没有的时候,那么这个时候就会设置占位符(防止缓存穿透),占位符的过期时间与普通的
Key
一致
- 关于查询数据库操作,这里仅仅是将结果实体传入,由数据层进行数据加载
- 当缓存没有的时候,是先查询数据库,然后更新缓存
技术内幕
代码:core/stores/cache/cachenode.go
cacheNode
表示是单个缓存节点的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| type cacheNode struct { rds *redis.Redis expiry time.Duration notFoundExpiry time.Duration barrier syncx.SingleFlight r *rand.Rand lock *sync.Mutex unstableExpiry mathx.Unstable stat *Stat errNotFound error }
const ( notFoundPlaceholder = "*" expiryDeviation = 0.05 )
|
- 这里几个过期分别有什么作用
expiry
:
notFoundExpiry
:
unstableExpiry
:
- 使用共享调用
barrier
减少缓存调用
rand.Rand
的随机数
新建缓存节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat, errNotFound error, opts ...Option) Cache { o := newOptions(opts...) return cacheNode{ rds: rds, expiry: o.Expiry, notFoundExpiry: o.NotFoundExpiry, barrier: barrier, r: rand.New(rand.NewSource(time.Now().UnixNano())), lock: new(sync.Mutex), unstableExpiry: mathx.NewUnstable(expiryDeviation), stat: st, errNotFound: errNotFound, } }
|
查询缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (c cacheNode) doGetCache(ctx context.Context, key string, v interface{}) error { c.stat.IncrementTotal() data, err := c.rds.GetCtx(ctx, key) if err != nil { c.stat.IncrementMiss() return err }
if len(data) == 0 { c.stat.IncrementMiss() return c.errNotFound }
c.stat.IncrementHit() if data == notFoundPlaceholder { return errPlaceholder }
return c.processCache(ctx, key, data, v) }
func (c cacheNode) processCache(ctx context.Context, key, data string, v interface{}) error { err := jsonx.Unmarshal([]byte(data), v) if err == nil { return nil }
if _, e := c.rds.DelCtx(ctx, key); e != nil { logger.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v", c.rds.Addr, key, data, e) }
return c.errNotFound }
|
设置缓存
1 2 3 4 5 6 7 8 9
| func (c cacheNode) SetWithExpireCtx(ctx context.Context, key string, val interface{}, expire time.Duration) error { data, err := jsonx.Marshal(val) if err != nil { return err }
return c.rds.SetexCtx(ctx, key, string(data), int(expire.Seconds())) }
|
获取缓存
获取缓存直接加载加载redis,并根据加载结果进行不同的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func (c cacheNode) doTake(ctx context.Context, v interface{}, key string, query func(v interface{}) error, cacheVal func(v interface{}) error) error { logger := logx.WithContext(ctx) val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) { if err := c.doGetCache(ctx, key, v); err != nil { if err == errPlaceholder { return nil, c.errNotFound } else if err != c.errNotFound { return nil, err }
if err = query(v); err == c.errNotFound { if err = c.setCacheWithNotFound(ctx, key); err != nil { logger.Error(err) }
return nil, c.errNotFound } else if err != nil { c.stat.IncrementDbFails() return nil, err }
if err = cacheVal(v); err != nil { logger.Error(err) } }
return jsonx.Marshal(v) }) if err != nil { return err } if fresh { return nil }
c.stat.IncrementTotal() c.stat.IncrementHit()
return jsonx.Unmarshal(val.([]byte), v) }
|
删除缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (c cacheNode) DelCtx(ctx context.Context, keys ...string) error { if len(keys) == 0 { return nil }
logger := logx.WithContext(ctx) if len(keys) > 1 && c.rds.Type == redis.ClusterType { for _, key := range keys { if _, err := c.rds.DelCtx(ctx, key); err != nil { logger.Errorf("failed to clear cache with key: %q, error: %v", key, err) c.asyncRetryDelCache(key) } } } else if _, err := c.rds.DelCtx(ctx, keys...); err != nil { logger.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err) c.asyncRetryDelCache(keys...) }
return nil }
|
缓存统计
代码:core/stores/cache/cachestat.go
用于统计缓存情况
新建统计对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func NewStat(name string) *Stat { ret := &Stat{ name: name, } go ret.statLoop()
return ret }
func (s *Stat) statLoop() { ticker := time.NewTicker(statInterval) defer ticker.Stop()
for range ticker.C { total := atomic.SwapUint64(&s.Total, 0) if total == 0 { continue } hit := atomic.SwapUint64(&s.Hit, 0) percent := 100 * float32(hit) / float32(total) miss := atomic.SwapUint64(&s.Miss, 0) dbf := atomic.SwapUint64(&s.DbFails, 0) logx.Statf("dbcache(%s) - qpm: %d, hit_ratio: %.1f%%, hit: %d, miss: %d, db_fails: %d", s.name, total, percent, hit, miss, dbf) } }
|
注意:
- 协程是一个常驻协程,缺少退出
SwapUint64
的作用是:将新的值写入 addr
,而返回addr
中旧的值
缓存清理
当缓存删除失败,这里添加一个重试机制
初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func init() { var err error timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean) logx.Must(err) proc.AddShutdownListener(func() { timingWheel.Drain(clean) }) }
func clean(key, value interface{}) { taskRunner.Schedule(func() { dt := value.(delayTask) err := dt.task() if err == nil { return }
next, ok := nextDelay(dt.delay) if ok { dt.delay = next timingWheel.SetTimer(key, dt, next) } else { msg := fmt.Sprintf("retried but failed to clear cache with keys: %q, error: %v", formatKeys(dt.keys), err) logx.Error(msg) stat.Report(msg) } }) }
|
这里做了一个动态清理
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func nextDelay(delay time.Duration) (time.Duration, bool) { switch delay { case time.Second: return time.Second * 5, true case time.Second * 5: return time.Minute, true case time.Minute: return time.Minute * 5, true case time.Minute * 5: return time.Hour, true default: return 0, false } }
|
添加清理任务
1 2 3 4 5 6 7 8
| func AddCleanTask(task func() error, keys ...string) { timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{ delay: time.Second, task: task, keys: keys, }, time.Second) }
|
总结
- 采用共享调用的方式防止缓存击穿
- 采用占位符方式缓存床头
- 设置范围过期时间防止缓存雪崩
- 增加重试删除机制(时间轮)
参考链接
- https://talkgo.org/t/topic/1716
- https://talkgo.org/t/topic/1505