// Take blocks to ensure that the time spent between multiple // Take calls is on average time.Second/rate. func(t *atomicLimiter)Take()time.Time { var ( newState state // 状态 taken bool// 用于表示原子操作是否成功 interval time.Duration // 需要 sleep 的时间 ) for !taken { // 如果 CAS 操作不成功就一直尝试 now := t.clock.Now() //获取当前时间
// 如果 last 是零值的话,表示之前就没用过,直接保存返回即可 if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue }
// A RateLimiter limits the rate at which an action can be performed. It // applies neither smoothing (like one could achieve in a token bucket system) // nor does it offer any conception of warmup, wherein the rate of actions // granted are steadily increased until a steady throughput equilibrium is // reached. type RateLimiter struct { limit int interval time.Duration mtx sync.Mutex times list.List //双向链表 }
// New creates a new rate limiter for the limit and interval. funcNew(limit int, interval time.Duration) *RateLimiter { lim := &RateLimiter{ limit: limit, interval: interval, } lim.times.Init() return lim }
// Wait blocks if the rate limit has been reached. Wait offers no guarantees // of fairness for multiple actors if the allowed rate has been temporarily // exhausted. func(r *RateLimiter)Wait() { for { ok, remaining := r.Try() if ok { break } time.Sleep(remaining) } }
// Try returns true if under the rate limit, or false if over and the // remaining time before the rate limit expires. func(r *RateLimiter)Try()(ok bool, remaining time.Duration) { r.mtx.Lock() defer r.mtx.Unlock() now := time.Now() if l := r.times.Len(); l < r.limit { r.times.PushBack(now) returntrue, 0 } frnt := r.times.Front() if diff := now.Sub(frnt.Value.(time.Time)); diff < r.interval { returnfalse, r.interval - diff } frnt.Value = now r.times.MoveToBack(frnt) returntrue, 0 }