算法之美-滑动窗口

在TCP原理中使用滑动窗口进行流量控制,平时的算法中也会使用双指针的滑动窗口进行问题处理。这里看看滑动窗口的具体原理

实现原理

滑动窗口不仅仅可以通过长度来限制流量,还能通过在窗口中保存最近一段时间数据,以此来进行更加复杂的实时调整

  1. 为什么不使用链表:使用切片构建类似环的结构,有效节省空间

windows

技术内幕

算法实现中的类似滑动窗口框架为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
string s, t;
// 在 s 中寻找 t 的「最小覆盖子串」
int left = 0, right = 0;
string res = s;

while(right < s.size()) {
window.add(s[right]);
right++;
// 如果符合要求,移动 left 缩小窗口
while (window 符合要求) {
// 如果这个窗口的子串更短,则更新 res
res = minLen(res, window);
window.remove(s[left]);
left++;
}
}
return res;

看看go-zero中滑动窗口的实现

代码路径:core/collection/rollingwindow.go

对象定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type (
//自定义可选参数
RollingWindowOption func(rollingWindow *RollingWindow)

// 滑动窗口结构
RollingWindow struct {
lock sync.RWMutex //窗口读写锁
size int //窗口大小
win *window //窗口对洗那个
interval time.Duration //时间间隔
offset int //当前位置
ignoreCurrent bool //忽略当前
lastTime time.Duration // start time of the last bucket
}
)

窗口中的每个槽可用于存储更加复杂的数据进行最近一段时间数据统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//槽
type Bucket struct {
Sum float64
Count int64
}

func (b *Bucket) add(v float64) {
b.Sum += v
b.Count++
}

func (b *Bucket) reset() {
b.Sum = 0
b.Count = 0
}

窗口操作

使用切片与取余进行类似环形操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//增加 % 类型唤醒操作
func (w *window) add(offset int, v float64) {
w.buckets[offset%w.size].add(v)
}

func (w *window) reduce(start, count int, fn func(b *Bucket)) {
for i := 0; i < count; i++ {
fn(w.buckets[(start+i)%w.size])
}
}

func (w *window) resetBucket(offset int) {
w.buckets[offset%w.size].reset()
}

滑动窗口操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//窗口移动
func (rw *RollingWindow) span() int {
offset := int(timex.Since(rw.lastTime) / rw.interval) //移动格子
if 0 <= offset && offset < rw.size {
return offset
}

return rw.size //当偏移量超过窗口大小,那么直接返回窗口大小(不需要移动offset的原因抛弃旧窗口即可)
}

//更新偏移量
func (rw *RollingWindow) updateOffset() {
span := rw.span() //窗口移动位置
if span <= 0 {
return
}

offset := rw.offset //当前窗口起始起始
for i := 0; i < span; i++ { //重置过期的槽
rw.win.resetBucket((offset + i + 1) % rw.size)
}

rw.offset = (offset + span) % rw.size //更新起始位置
now := timex.Now() //当前时间
// align to interval time boundary
rw.lastTime = now - (now-rw.lastTime)%rw.interval //最新移动时间
}

//滑动窗口更新槽
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.updateOffset() //更新索引
rw.win.add(rw.offset, v) //增加值
}

//对窗口进行 fn 操作
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
rw.lock.RLock()
defer rw.lock.RUnlock()

var diff int
span := rw.span()
// ignore current bucket, because of partial data
if span == 0 && rw.ignoreCurrent {
diff = rw.size - 1
} else {
diff = rw.size - span
}
if diff > 0 {
offset := (rw.offset + span + 1) % rw.size
rw.win.reduce(offset, diff, fn)
}
}