Go-05-Map

什么是Map

Map是一种通过key来获取value的一个数据结构,其底层存储方式为数组。

  • 一种特殊的数据结构:一种元素对(pair)的无序集合,pair 的一个元素是 key,对应的另一个元素是 value

  • 一种引用类型,未初始化的 map 的值是 nil,可以通过如下方式声明:

1
2
var map1 map[keytype]valuetype
var map1 map[string]int

key 可以是任意可以用 == 或者 != 操作符比较的类型,比如 string、int、float。所以数组、切片和结构体不能作为 key,但是指针和接口类型可以。如果要用结构体作为 key 可以提供 Key()Hash() 方法,这样可以通过结构体的域计算出唯一的数字或者字符串的 key。

value 可以是任意类型的;通过使用空接口类型,可以存储任意值,但是使用这种类型作为值时需要先做一次类型断言

Hash表

使用Hash需要解决哈希碰撞的问题,常见方法的就是开放寻址法和拉链法。

需要注意的是,这里提到的哈希碰撞不是多个键对应的哈希完全相等,可能是多个哈希的部分相等,例如:两个键对应哈希的前四个字节相同

开放寻址法

开放寻址法核心思想是依次探测和比较数组中的元素以判断目标键值对是否存在于哈希表中,使用开放寻址法来实现哈希表,那么实现哈希表底层的数据结构就是数组,不过因为数组的长度有限,想哈希表写入 (author, draven) 这个键值对时会从如下的索引开始遍历

1
index := hash("author") % array.len

向当前哈希表写入新的数据时,如果发生了冲突,就会将键值对写入到下一个索引不为空的位置

image-20201220151651759

当 Key3 与已经存入哈希表中的两个键值对 Key1 和 Key2 发生冲突时,Key3 会被写入 Key2 后面的空闲位置。当我们再去读取 Key3 对应的值时就会先获取键的哈希并取模,这会先帮助我们找到 Key1,找到 Key1 后发现它与 Key 3 不相等,所以会继续查找后面的元素,直到内存为空或者找到目标元素

当需要查找某个键对应的值时,会从索引的位置开始线性探测数组,找到目标键值对或者空内存就意味着这一次查询操作的结束。

开放寻址法中对性能影响最大的是装载因子,它是数组中元素的数量与数组大小的比值。随着装载因子的增加,线性探测的平均用时就会逐渐增加,这会影响哈希表的读写性能。当装载率超过 70% 之后,哈希表的性能就会急剧下降,而一旦装载率达到 100%,整个哈希表就会完全失效,这时查找和插入任意元素的时间复杂度都是 𝑂(𝑛)O(n) 的,这时需要遍历数组中的全部元素,所以在实现哈希表时一定要关注装载因子的变化

拉链法

与开放地址法相比,平均查找的长度也比较短,各个用于存储节点的内存都是动态申请的,可以节省比较多的存储空间。

实现拉链法一般会使用数组加上链表,一些编程语言会在拉链法的哈希中引入红黑树以优化性能,拉链法会使用链表数组作为哈希底层的数据结构,可以将它看成可以扩展的二维数组

image-20201220151533745

如上图所示,当我们需要将一个键值对 (Key6, Value6) 写入哈希表时,键值对中的键 Key6 都会先经过一个哈希函数,哈希函数返回的哈希会帮助我们选择一个桶,和开放地址法一样,选择桶的方式是直接对哈希返回的结果取模:

1
index := hash("Key6") % array.len

选择了 2 号桶后就可以遍历当前桶中的链表了,在遍历链表的过程中会遇到以下两种情况:

  1. 找到键相同的键值对 — 更新键对应的值;
  2. 没有找到键相同的键值对 — 在链表的末尾追加新的键值对;

如果要在哈希表中获取某个键对应的值,会经历如下的过程

image-20201220152455731

Key11 展示了一个键在哈希表中不存在的例子,当哈希表发现它命中 4 号桶时,它会依次遍历桶中的链表,然而遍历到链表的末尾也没有找到期望的键,所以哈希表中没有该键对应的值。

在一个性能比较好的哈希表中,每一个桶中都应该有 0~1 个元素,有时会有 2~3 个,很少会超过这个数量。计算哈希、定位桶和遍历链表三个过程是哈希表读写操作的主要开销,使用拉链法实现的哈希也有装载因子这一概念:

装载因子:=元素数量÷桶数量装载因子:=元素数量÷桶数量

与开放地址法一样,拉链法的装载因子越大,哈希的读写性能就越差。在一般情况下使用拉链法的哈希表装载因子都不会超过 1,当哈希表的装载因子较大时会触发哈希的扩容,创建更多的桶来存储哈希中的元素,保证性能不会出现严重的下降。如果有 1000 个桶的哈希表存储了 10000 个键值对,它的性能是保存 1000 个键值对的 1/10,但是仍然比在链表中直接读写好 1000 倍

Map结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// A header for a Go map.
type hmap struct {
count int //表示当前hash表元素个数
flags uint8 //记录当前hash表状态,map是非线程安全的
B uint8 //表示当前哈希表持有的 buckets 数量,桶的数量都是2的倍数,该字段存储对数,也就是len(buckets) == 2^B
noverflow uint16 // overflow 的 bucket 近似数
hash0 uint32 // 哈希种子,能为哈希函数的结果引入随机性, 值在创建哈希表时确定,并在调用哈希函数时作为参数传入

buckets unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
oldbuckets unsafe.Pointer // 哈希在扩容时用于保存之前 buckets 的字段,它的大小是当前 buckets 的一半
nevacuate uintptr // 指示扩容进度,小于此地址的 buckets 迁移完成

extra *mapextra // optional fields
}

type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
nextOverflow *bmap
}

这里B是map的bucket数组长度的对数,每个bucket里面存储了kv对。buckets是一个指针,指向实际存储的bucket数组的首地址。 bucket的结构体如下:

1
2
3
4
5
6
7
8
9
// A bucket for a Go map.
type bmap struct {
//top hash通常包含该bucket中每个键的hash值的高八位
//如果tophash[0]小于mintophash,则tophash[0]为桶疏散状态
//bucketCnt 的初始值是8
tophash [bucketCnt]uint8
//注意:将所有键打包在一起,然后将所有值打包在一起,使得代码比交替key/elem/key/elem/...更复杂。
//但它允许我们消除可能需要的填充,例如map[int64]int8./后面跟一个溢出指针}
}

上面这个数据结构并不是 golang runtime 时的结构,在编译时候编译器会给它动态创建一个新的结构,如下:

1
2
3
4
5
6
7
type bmap struct {
topbits [8]uint8
keys [8]keytype
values [8]valuetype
pad uintptr
overflow uintptr
}

bmap 就是我们常说的“bucket”结构,每个 bucket 里面最多存储 8 个 key,这些 key 之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果是“一类”的。在桶内,又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个位置(一个桶内最多有8个位置)

hashmap bmap

当map的key和value

image-20201220153852637

每个 bmap 都能存储 8 个键值对,当哈希表中存储的数据过多,单个桶无法装满时就会使用 extra.nextOverflow 中桶存储溢出数据

当作为函数传参时候,传递的实例其实是指针

  1. map是一个hash表,数据被存放在一组buckets(滚筒)中,每个buckets包含8个键值对,低位的buckets用来给数据选择存储的buckets,每个存储buckets包含每个哈希的一些高阶位,以区分单个存储buckets中的条目
  2. 如果一个buckets中超过8个,那么将使用其他buckets存储
  3. 当哈希表增长时,分配一组新buckets是原来的两倍大,并且会将旧数据复制到新buckets中存储
  4. 映射迭代器遍历存储桶数组,并按行走顺序返回键(存储桶编号,然后是溢出链顺序,然后是存储桶索引)。 为了维持迭代语义,我们绝不会在键的存储桶中移动键(如果这样做,键可能会返回0或2次)。 在增加表时,迭代器将继续在旧表中进行迭代,并且必须检查新表是否将要迭代的存储桶(“撤离”)到新表中

常用常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
const (
// 一个桶能装的最大键值对 1<<3
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits

//负载因子计算 uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
loadFactorNum = 13
loadFactorDen = 2

/*
if t.key.size > maxKeySize && (!t.indirectkey() || t.keysize != uint8(sys.PtrSize)) ||
t.key.size <= maxKeySize && (t.indirectkey() || t.keysize != uint8(t.key.size)) {
throw("key size wrong")
}
if t.elem.size > maxElemSize && (!t.indirectelem() || t.elemsize != uint8(sys.PtrSize)) ||
t.elem.size <= maxElemSize && (t.indirectelem() || t.elemsize != uint8(t.elem.size)) {
throw("elem size wrong")
}
*/
maxKeySize = 128
maxElemSize = 128

//bmap 偏移 int64 topHash
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)

emptyRest = 0 // 这bmap中这一格为空,在更高的索引中没有更多的非空细胞
emptyOne = 1 // 这bmap中这一格为空
evacuatedX = 2 // 元素可得,但已经迁移到新桶的前半部分
evacuatedY = 3 // 元素可得,但已经迁移到新桶的后半部分
evacuatedEmpty = 4 // 格子为空,bucket已经迁移.
minTopHash = 5 // ophash 的最小正常值

// flags
iterator = 1 // 可能有迭代器在使用buckets
oldIterator = 2 // 可能有迭代器在使用oldbuckets
hashWriting = 4 // 有协程正在写
sameSizeGrow = 8 // 等量扩容

// sentinel bucket ID for iterator checks
noCheck = 1<<(8*sys.PtrSize) - 1
)

常用函数

1
2
3
4
//查找下一个overflow 的bmap
func (b *bmap) overflow(t *maptype) *bmap {
return *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-sys.PtrSize))
}

创建Map

通过调用make来创建map,底层为 makemap 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}

// 初始化 hmap
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()

// 找到一个 B使 hint > B && hint > (2^B) * 6.5 为false 否则B就增大
// 当hint为1 则 B == 0 表示存1个数,只需要用一个桶
// 当hint为6 则 B == 0 表示存6个数,也只需要一个桶
// 当hint为7 则 B == 1 表示存7个树,就需要用两个桶
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B

//这里使用了懒加载
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}

注意:

  1. 返回的是*hmap而 func makeslice(et *_type, len, cap int) slice返回的是结构体

    虽然Go 语言中的函数传参都是值传递,在函数内部,map参数会影响外部

    而slice部分会,因为slice内部的数组是指针类型,对数组的修改会影响外部,但是长度和容量不会

  2. 判断装载因子,13 * 1<<B / 2 即 桶数目的6.5倍

1
2
3
4
//判断装载因子 
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

Key定位

key 经过哈希计算后得到哈希值,共 64 个 bit 位(64位机,32位机忽略),计算它到底要落在哪个桶时,只会用到最后 B 个 bit 位。如果 B = 5,则桶数量即 buckets 数组的长度是 2^5 = 32

mapacess

Key的Hash值计算为:

1
10010111 | 000011110110110010001111001010100010010110010101010 │ 00110

低5 位 00110,6号桶

高 8 位10010111,十进制为151 在6好bucket中找到tophash值(HOBhash)为151的key,对应2号槽位,

这是在寻找已有的 key。最开始桶内还没有 key,新加入的 key 会找到第一个空位,放入

  1. 如果冲突了怎么办

  2. 是怎么存入的,什么顺序

  3. 查找怎么办

    如果在 bucket 中没找到,并且 overflow 不为空,还要继续去 overflow bucket 中寻找,直到找到或是所有的 key 槽位都找遍了,包括所有的 overflow bucket

  4. 迁移过程中怎么查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
//如果map为空,或者map中数量为0 则返回0值
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0])
}
//如果写冲突,则直接报错
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
// 计算哈希值,加入hash0引入随机值
hash := t.hasher(key, uintptr(h.hash0))
// 比如 B=5,那 m 就是31,二进制是全 1
// m = 1<<B - 1
m := bucketMask(h.B)
// b 就是 bucket 的地址,hash&m*uintptr(t.bucketsize) 第几个bucket
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
//如果oldbuckets不为空,说明发生了扩容
if c := h.oldbuckets; c != nil {
// 如果不是同 size 扩容(看后面扩容的内容)
// 对应条件 1 的解决方案
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
// 新 bucket 数量是老的 2 倍
m >>= 1
}
// 求出 key 在老的 map 中的 bucket 位置
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
// 如果 oldb 没有搬迁到新的 bucket
// 那就在老的 bucket 中寻找
if !evacuated(oldb) {
b = oldb
}
}
// 计算出高 8 位的 hash
// 相当于右移 56 位,只取高8位
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) { //如果b中没有,则查看b的overflow
for i := uintptr(0); i < bucketCnt; i++ { //遍历topHash
if b.tophash[i] != top { //如果topHash不等于top
if b.tophash[i] == emptyRest { //如果位置为空,后面都没有非空元素
break bucketloop //说明不是顺序填充,相等的topHash值会存到下一个bmap
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
//key 是指针
if t.indirectkey() {
//解引用
k = *((*unsafe.Pointer)(k))
}
//如果key相等(可能存在topHash相等,key不想等的情况)
if t.key.equal(key, k) {
//定位value的位置
//b bmap 地址
// dataOffset(数据对齐)
// bucketCnt*t.keysize 8个key的大小(固定8个)
//i*t.elemsize 第i个元素
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// val解引用
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
return unsafe.Pointer(&zeroVal[0])
}
  1. 这里的dataOffsetkey 相对于 bmap 起始地址的偏移,包含了topHash数组 int64
1
2
3
4
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)

因此 bucket 里 key 的起始地址就是 unsafe.Pointer(b)+dataOffset

  1. minTopHash

minTopHash当一个 cell 的 tophash 值小于 minTopHash 时,标志这个 cell 的状态。因为这个状态值是放在 tophash 数组里,为了和正常的哈希值区分开,会给 key 计算出来的哈希值一个增量:minTopHash。这样就能区分正常的 tophash 值和表示状态的哈希值

  1. 源码里判断是否搬迁完毕使用函数evacuated,当第一个元素状态为迁移标识符中的三个
1
2
3
4
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}

Map获取

Go 语言中读取 map 有两种语法:带 comma 和 不带 comma,两种分别对应不同的函数

1
2
3
// src/runtime/hashmap.go
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
//这里不会panic
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0]), false
}
//并发读写错误
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
hash := t.hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + (hash&m)*uintptr(t.bucketsize)))
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
m >>= 1
}
oldb := (*bmap)(unsafe.Pointer(uintptr(c) + (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if t.key.equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
return e, true
}
}
}
return unsafe.Pointer(&zeroVal[0]), false
}

Map扩容

使用哈希表的目的就是要快速查找到目标 key,然而,随着向 map 中添加的 key 越来越多,key 发生碰撞的概率也越来越大。bucket 中的 8 个 cell 会被逐渐塞满,查找、插入、删除 key 的效率也会越来越低。

Go有一个衡量标准 装载因子 来描述存储情况

1
loadFactor := count / (2^B)

触发 map 扩容的时机:在向 map 插入新 key 的时候,会进行条件检测,符合下面这 2 个条件,就会触发扩容:

  1. 装载因子超过阈值,源码里定义的阈值是 6.5。
  2. overflow 的 bucket 数量过多:
    • 当 B < 15,即 bucket 总数 2^B < 2^15 时,如果 overflow 的 bucket 数量超过 2^B;
    • 当 B >= 15,即 bucket 总数 2^B >= 2^15,如果 overflow 的 bucket 数量超过 2^15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}

// 装载因子超过 6.5
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

// overflow buckets 太多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
return noverflow >= uint16(1)<<(B&15)
}

第 1 点:每个 bucket 有 8 个空位,在没有溢出,且所有的桶都装满了的情况下,装载因子算出来的结果是 8。因此当装载因子超过 6.5 时,表明很多 bucket 都快要装满了。在这个时候进行扩容是有必要的。

扩容策略:将 B 加 1,bucket 最大数量(2^B)直接变成原来 bucket 数量的 2 倍。于是,就有新老 bucket 了。注意,这时候元素都在老 bucket 里,还没迁移到新的 bucket 来。而且,新 bucket 只是最大数量变为原来最大数量(2^B)的 2 倍(2^B * 2)

B 还没有变,新bucket怎么记录大小呢?

第 2 点:是对第 1 点的补充。当 map 里元素总数少,但是 bucket 数量多(真实分配的 bucket 数量多,包括大量的 overflow bucket)

扩容策略:开辟一个新 bucket 空间,将老 bucket 中的元素移动到新 bucket,使得同一个 bucket 中的 key 排列地更紧密。原来在 overflow bucket 中的 key 可以移动到 bucket 中来。

一个极端的情况:如果插入 map 的 key 哈希都一样,就会落到同一个 bucket 里,超过 8 个就会产生 overflow bucket,结果也会造成 overflow bucket 数过多。移动元素其实解决不了问题,因为这时整个哈希表已经退化成了一个链表,操作效率变成了 O(n)

Go map 的扩容采取了一种称为“渐进式”地方式,并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket。

hashGrow() 函数实际上并没有真正地“搬迁”,它只是分配好了新的 buckets,并将老的 buckets 挂到了 oldbuckets 字段上。真正搬迁 buckets 的动作在 growWork() 函数中,而调用 growWork() 函数的动作是在 mapassign 和 mapdelete 函数中。也就是插入或修改、删除 key 的时候,都会尝试进行搬迁 buckets 的工作。先检查 oldbuckets 是否搬迁完毕,具体来说就是检查 oldbuckets 是否为 nil。

准备扩容

hashGrow() 函数表示开始扩容前的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) { //判断是否超过负载因子
bigger = 0
h.flags |= sameSizeGrow //没有说明是第二种情况
}
oldbuckets := h.buckets //先将桶指向旧的指针
//h.B+bigger 说明是桶是原来的两倍
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

//&^按位置为0
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 提交 grow 的动作
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
//搬迁进度为0
h.nevacuate = 0
//overflow buckets数量为0
h.noverflow = 0

//搬迁extra到老的extra
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}

// the actual copying of the hash table data is done incrementally
// by growWork() and evacuate().
}

扩容工作

1
2
3
4
5
6
7
8
9
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 确认搬迁老的 bucket 对应正在使用的 bucket
evacuate(t, h, bucket&h.oldbucketmask())

// 再搬迁一个 bucket,以加快搬迁进程
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
  1. h.growing 用来判断是否在搬迁
1
2
3
4
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}
//当oldbuckets非空,表示正在扩容
  1. bucket&h.oldbucketmask()是为了确认搬迁的 bucket 是我们正在使用的 bucket。oldbucketmask() 函数返回扩容前的 map 的 bucketmask。

    bucketmask,作用就是将 key 计算出来的哈希值与 bucketmask 相与,得到的结果就是 key 应该落入的桶。比如 B = 5,则返回11111

搬迁核心evacuate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
type evacDst struct {
b *bmap // 当前搬迁的桶
i int // b中的k/v索引
k unsafe.Pointer // 指针指向当前k存储位置
e unsafe.Pointer // 指针指向当前v存储位置
}

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
//定位老的buckets地址(桶的开始位置+桶的数目*桶的大小)
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
//返回旧的桶数,如果B为5,那么结果为32
newbit := h.noldbuckets()
//如果没有搬迁过
if !evacuated(b) {
//xy 包含高低两个搬迁目的地
var xy [2]evacDst
//低搬迁目的地
x := &xy[0]
//记录新的buckets地址(桶开始的位置+桶数目*大小) ==> 地址是连续的? TODO
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
//k的位置为bmap开始的位置+topHash
x.k = add(unsafe.Pointer(x.b), dataOffset)
//v的位置为k结束的位置
x.e = add(x.k, bucketCnt*uintptr(t.keysize))

//如果不是等量扩容
if !h.sameSizeGrow() {
//使用y来进行搬迁
y := &xy[1]
//y.b表示获取新桶的地址
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
//y.b表示key开始的地址
y.k = add(unsafe.Pointer(y.b), dataOffset)
//y.e表示v开始的地址
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}

// 遍历所有的 bucket,包括 overflow buckets
// b 是老的 bucket 地址
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
// 遍历 bucket 中的所有 cell
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
// 当前 cell 的 top hash 值
top := b.tophash[i]
// 如果 cell 为空,即没有 key
if isEmpty(top) { //x <= emptyOne
b.tophash[i] = evacuatedEmpty // 那就标志它被"搬迁"过
continue // 继续下个 cell
}
// 正常不会出现这种情况
// 未被搬迁的 cell 只可能是 emptyRest 或者 emptyOne
// 正常的 top hash(大于 minTopHash)
if top < minTopHash {
throw("bad map state")
}
k2 := k
// 如果 key 是指针,则解引用
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
//如果不是等量扩容
if !h.sameSizeGrow() {
//计算hash值
hash := t.hasher(k2, uintptr(h.hash0))
//如果正在遍历map 且 出现过相同key 的两者相同(float变得NAN)
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
useY = top & 1
top = tophash(hash)
} else {
if hash&newbit != 0 {
useY = 1
}
}
}

if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}

b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // evacuation destination
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.key, dst.k, k) // copy elem
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
// These updates might push these pointers past the end of the
// key or elem arrays. That's ok, as we have the overflow pointer
// at the end of the bucket to protect against pointing past the
// end of the bucket.
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}

// 如果没有协程在使用老的 buckets,就把老 buckets 清除掉,帮助gc
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// Preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}

// 更新搬迁进度
// 如果此次搬迁的 bucket 等于当前进度
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
//进度+1
h.nevacuate++
// Experiments suggest that 1024 is overkill by at least an order of magnitude.
// Put it in there as a safeguard anyway, to ensure O(1) behavior.
//尝试往后看 1024 个 bucket
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
// 寻找没有搬迁的 bucket
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 现在 h.nevacuate 之前的 bucket 都被搬迁完毕
if h.nevacuate == newbit { // newbit == # of oldbuckets
h.oldbuckets = nil
// 清除老的 overflow bucket
// 回忆一下:[0] 表示当前 overflow bucket
// [1] 表示 old overflow bucket
if h.extra != nil {
h.extra.oldoverflow = nil
}
// 清除正在扩容的标志位
h.flags &^= sameSizeGrow
}
}

搬迁的目的就是将老的 buckets 搬迁到新的 buckets。

  • 应对条件 1,新的 buckets 数量是之前的一倍,所以要重新计算 key 的哈希,才能决定它到底落在哪个 bucket。例如,原来 B = 5,计算出 key 的哈希后,只用看它的低 5 位,就能决定它落在哪个 bucket。扩容后,B 变成了 6,因此需要多看一位,它的低 6 位决定 key 落在哪个 bucket。这称为 rehash
  • 应对条件 2,新的 buckets 数量和之前相等。因此可以按序号来搬,比如原来在 0 号 bucktes,到新的地方后,仍然放在 0 号 buckets。
map rehash

因此,某个 key 在搬迁前后 bucket 序号可能和原来相等,也可能是相比原来加上 2^B(原来的 B 值),取决于 hash 值 第 6 bit 位是 0 还是 1。

那么为什么遍历 map 是无序的?

第一种情况插入时遍历:map 在扩容后,会发生 key 的搬迁,原来落在同一个 bucket 中的 key,可能就落在了不同的槽中

第二种情况不插入遍历:在遍历 map 时,并不是固定地从 0 号 bucket 开始遍历,每次都是从一个随机值序号的 bucket 开始遍历,并且是从这个 bucket 的一个随机序号的 cell 开始遍历。这样,即使你是一个写死的 map,仅仅只是遍历它,也不太可能会返回一个固定序列的 key/value 对了

1
2
3
4
5
6
7
8
9
10
11
12
//测试 map中存入6个值,则按照规则,只会有一个桶,通过十次遍历
d := make(map[int]int)
for i := 0; i < 6; i++ {
d[i] = i
}
for i := 0; i < 10; i++ {
fmt.Printf("count:%d \n", i)
for _, v := range d {
fmt.Print(v)
}
}
//结果发现,每次开始的位置没有变化,但是整体的前后顺序不变

“迭代 map 的结果是无序的”这个特性是从 go 1.0 开始加入的

再明确一个问题:如果扩容后,B 增加了 1,意味着 buckets 总数是原来的 2 倍,原来 1 号的桶“裂变”到两个桶。

例如,原始 B = 2,1号 bucket 中有 2 个 key 的哈希值低 3 位分别为:010,110。由于原来 B = 2,所以低 2 位 10 决定它们落在 2 号桶,现在 B 变成 3,所以 010110 分别落入 2、6 号桶

bucket split

evacuate 函数每次只完成一个 bucket 的搬迁工作会有 2 层循环,外层遍历 bucket 和 overflow bucket,内层遍历 bucket 的所有 cell

源码里提到 X, Y part,桶的数量是原来的 2 倍,前一半桶被称为 X part,后一半桶被称为 Y part。

一个 bucket 中的 key 可能会分裂落到 2 个桶,一个位于 X part,一个位于 Y part。所以在搬迁一个 cell 之前,需要知道这个 cell 中的 key 是落到哪个 Part。很简单,重新计算 cell 中 key 的 hash,并向前“多看”一位,决定落入哪个 Part,如果 tophash 的最低位是 0 ,分配到 X part;如果是 1 ,则分配到 Y part

有一个特殊情况是:有一种 key,每次对它计算 hash,得到的结果都不一样。这个 key 就是 math.NaN() 的结果,它的含义是 not a number,类型是 float64。当它作为 map 的 key,在搬迁的时候,会遇到一个问题:再次计算它的哈希值和它当初插入 map 时的计算出来的哈希值不一样

你可能想到了,这样带来的一个后果是,这个 key 是永远不会被 Get 操作获取的!当我使用 m[math.NaN()] 语句的时候,是查不出来结果的。这个 key 只有在遍历整个 map 的时候,才有机会现身。所以,可以向一个 map 插入任意数量的 math.NaN() 作为 key

1
2
3
4
5
6
7
8
9
10
11
12
13
//通过tophash值与新算出来的哈希值进行运算得到
if top&1 != 0 {
// top hash 最低位为 1
// 新算出来的 hash 值的 B 位置 1
hash |= newbit
} else {
// 新算出来的 hash 值的 B 位置 0
hash &^= newbit
}

// hash 值的 B 位为 0,则搬迁到 x part
// 当 B = 5时,newbit = 32,二进制低 6 位为 10 0000
useX = hash&newbit == 0

确定了要搬迁到的目标 bucket 后,搬迁操作就比较好进行了。将源 key/value 值 copy 到目的地相应的位置。

设置 key 在原始 buckets 的 tophash 为 evacuatedX 或是 evacuatedY,表示已经搬迁到了新 map 的 x part 或是 y part。新 map 的 tophash 则正常取 key 哈希值的高 8 位

下面通过图来宏观地看一下扩容前后的变化

扩容前,B = 2,共有 4 个 buckets,lowbits 表示 hash 值的低位。假设我们不关注其他 buckets 情况,专注在 2 号 bucket。并且假设 overflow 太多,触发了等量扩容(对应于前面的条件 2)

扩容前

扩容完成后,overflow bucket 消失了,key 都集中到了一个 bucket,更为紧凑了,提高了查找的效率。

same size 扩容

假设触发了 2 倍的扩容,那么扩容完成后,老 buckets 中的 key 分裂到了 2 个 新的 bucket。一个在 x part,一个在 y 的 part。依据是 hash 的 lowbits。新 map 中 0-3 称为 x part,4-7 称为 y part

2倍扩容

注意,上面的两张图忽略了其他 buckets 的搬迁情况,表示所有的 bucket 都搬迁完毕后的情形。实际上,我们知道,搬迁是一个“渐进”的过程,并不会一下子就全部搬迁完毕。所以在搬迁过程中,oldbuckets 指针还会指向原来老的 []bmap,并且已经搬迁完毕的 key 的 tophash 值会是一个状态值,表示 key 的搬迁去向。

Map遍历

正常情况下,遍历所有的 bucket 以及它后面挂的 overflow bucket,然后挨个遍历 bucket 中的所有 cell。每个 bucket 中包含 8 个 cell,从有 key 的 cell 中取出 key 和 value,这个过程就完成了。

而扩容过程不是一个原子的操作,它每次最多只搬运 2 个 bucket,所以如果触发了扩容操作,那么在很长时间里,map 的状态都是处于一个中间态:有些 bucket 已经搬迁到新家,而有些 bucket 还待在老地方,它又是怎样遍历的呢?

先是调用 mapiterinit 函数初始化迭代器,然后循环调用 mapiternext 函数进行 map 迭代,其中迭代器的结构体定义是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type hiter struct {
// key 指针
key unsafe.Pointer
// value 指针
value unsafe.Pointer
// map 类型,包含如 key size 大小等
t *maptype
// map header
h *hmap
// 初始化时指向的 bucket
buckets unsafe.Pointer
// 当前遍历到的 bmap
bptr *bmap
overflow [2]*[]*bmap
// 起始遍历的 bucet 编号
startBucket uintptr
// 遍历开始时 cell 的编号(每个 bucket 中有 8 个 cell)
offset uint8
// 是否从头遍历了
wrapped bool
// B 的大小
B uint8
// 指示当前 cell 序号
i uint8
// 指向当前的 bucket
bucket uintptr
// 因为扩容,需要检查的 bucket
checkBucket uintptr
}

之前说到每次遍历都是无序的

1
2
3
4
5
6
7
8
9
10
// 生成随机数 r
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}

// 从哪个 bucket 开始遍历
it.startBucket = r & (uintptr(1)<<h.B - 1)
// 从 bucket 的哪个 cell 开始遍历
it.offset = uint8(r >> h.B & (bucketCnt - 1))

例如,B = 2,那 uintptr(1)<<h.B - 1 结果就是 3,低 8 位为 0000 0011,将 r 与之相与,就可以得到一个 0~3 的 bucket 序号;bucketCnt - 1 等于 7,低 8 位为 0000 0111,将 r 右移 2 位后,与 7 相与,就可以得到一个 0~7 号的 cell。

于是,在 mapiternext 函数中就会从 it.startBucket 的 it.offset 号的 cell 开始遍历,取出其中的 key 和 value,直到又回到起点 bucket,完成遍历过程。

假设我们有下图所示的一个 map,起始时 B = 1,有两个 bucket,后来触发了扩容(这里不要深究扩容条件,只是一个设定),B 变成 2。并且, 1 号 bucket 中的内容搬迁到了新的 bucket,1 号裂变成 1 号3 号0 号 bucket 暂未搬迁。老的 bucket 挂在在 *oldbuckets 指针上面,新的 bucket 则挂在 *buckets 指针上面

map origin

这时,我们对此 map 进行遍历。假设经过初始化后,startBucket = 3,offset = 2。于是,遍历的起点将是 3 号 bucket 的 2 号 cell,下面这张图就是开始遍历时的状态

map init

标红的表示起始位置,bucket 遍历顺序为:3 -> 0 -> 1 -> 2。

因为 3 号 bucket 对应老的 1 号 bucket,因此先检查老 1 号 bucket 是否已经被搬迁过。判断方法就是:

1
2
3
4
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > empty && h < minTopHash
}

如果 b.tophash[0] 的值在标志值范围内,即在 (0,4) 区间里,说明已经被搬迁过了

在本例中,老 1 号 bucket 已经被搬迁过了。所以它的 tophash[0] 值在 (0,4) 范围内,因此只用遍历新的 3 号 bucket。

依次遍历 3 号 bucket 的 cell,这时候会找到第一个非空的 key:元素 e。到这里,mapiternext 函数返回,这时我们的遍历结果仅有一个元素:

iter res

由于返回的 key 不为空,所以会继续调用 mapiternext 函数。

继续从上次遍历到的地方往后遍历,从新 3 号 overflow bucket 中找到了元素 f 和 元素 g。

遍历结果集也因此壮大:

iter res

新 3 号 bucket 遍历完之后,回到了新 0 号 bucket。0 号 bucket 对应老的 0 号 bucket,经检查,老 0 号 bucket 并未搬迁,因此对新 0 号 bucket 的遍历就改为遍历老 0 号 bucket。那是不是把老 0 号 bucket 中的所有 key 都取出来呢?

并没有这么简单,回忆一下,老 0 号 bucket 在搬迁后将裂变成 2 个 bucket:新 0 号、新 2 号。而我们此时正在遍历的只是新 0 号 bucket(注意,遍历都是遍历的 *bucket 指针,也就是所谓的新 buckets)。所以,我们只会取出老 0 号 bucket 中那些在裂变之后,分配到新 0 号 bucket 中的那些 key。

因此,lowbits == 00 的将进入遍历结果集:

iter res

和之前的流程一样,继续遍历新 1 号 bucket,发现老 1 号 bucket 已经搬迁,只用遍历新 1 号 bucket 中现有的元素就可以了。结果集变成:

iter res

继续遍历新 2 号 bucket,它来自老 0 号 bucket,因此需要在老 0 号 bucket 中那些会裂变到新 2 号 bucket 中的 key,也就是 lowbit == 10 的那些 key。

这样,遍历结果集变成:

iter res

最后,继续遍历到新 3 号 bucket 时,发现所有的 bucket 都已经遍历完毕,整个迭代过程执行完毕。

顺便说一下,如果碰到 key 是 math.NaN() 这种的,处理方式类似。核心还是要看它被分裂后具体落入哪个 bucket。只不过只用看它 top hash 的最低位。如果 top hash 的最低位是 0 ,分配到 X part;如果是 1 ,则分配到 Y part。据此决定是否取出 key,放到遍历结果集里。

map 遍历的核心在于理解 2 倍扩容时,老 bucket 会分裂到 2 个新 bucket 中去。而遍历操作,会按照新 bucket 的序号顺序进行,碰到老 bucket 未搬迁的情况时,要在老 bucket 中找到将来要搬迁到新 bucket 来的 key

Map插入

通过汇编语言可以看到,向 map 中插入或者修改 key,最终调用的是 mapassign 函数。

插入或修改 key 的语法是一样的,前者操作的 key 在 map 中不存在,而后者操作的 key 存在 map 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if h.flags&hashWriting != 0 { //表示正在写
throw("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0)) //获取hash值

//在调用t.hasher之后设置hashWriting,因为t.hasher可能会出现紧急情况,在这种情况下,我们实际上并未执行写操作
h.flags ^= hashWriting //设置标记位,hashWriting

//如果bucket数组一开始为空,则初始化
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}

again:
bucket := hash & bucketMask(h.B) // 定位存储在哪一个bucket中
if h.growing() { //如果现正在扩容,则扩容
growWork(t, h, bucket)
}
//得到bucket的结构体
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
top := tophash(hash) //获取高八位hash值

var inserti *uint8
var insertk unsafe.Pointer
var elem unsafe.Pointer
bucketloop:
for { //死循环
for i := uintptr(0); i < bucketCnt; i++ { //循环bucket中的tophash数组
if b.tophash[i] != top { //如果hash不相等
if isEmpty(b.tophash[i]) && inserti == nil { //判断是否为空,为空则插入
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
if b.tophash[i] == emptyRest { //为空,且后续也没有非空cell
break bucketloop
}
continue
}
//到这里说明高八位hash一样,获取已存在的key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
//判断两个key是否相等,不相等就循环下一个
if !t.key.equal(key, k) {
continue
}
// already have a mapping for key. Update it.
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
//获取已存在的value
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
goto done
}
//如果上一个bucket没能插入,则通过overflow获取链表上的下一个bucket
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}

//如果超过负载,则扩容
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}

//如果当前bmap满了,就新建一个
if inserti == nil {
// all current buckets are full, allocate a new one.
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}

//存储元素
if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.key, insertk, key)
*inserti = top
h.count++

done:
//更新状态,返回元素
if h.flags&hashWriting == 0 { //如果是状态异常
throw("concurrent map writes")
}
h.flags &^= hashWriting //设置标志位状态表示完成
if t.indirectelem() {
elem = *((*unsafe.Pointer)(elem))
}
return elem
}

整体来看,核心还是一个双层循环,外层遍历 bucket 和它的 overflow bucket,内层遍历整个 bucket 的各个 cell:对 key 计算 hash 值,根据 hash 值按照之前的流程,找到要赋值的位置(可能是插入新 key,也可能是更新老 key),对相应位置进行赋值。

注意:

  1. 函数首先会检查 map 的标志位 flags。如果 flags 的写标志位此时被置 1 了,说明有其他协程在执行“写”操作,进而导致程序 panic。这也说明了 map 对协程是不安全的。

  2. 通过前文我们知道扩容是渐进式的,如果 map 处在扩容的过程中,那么当 key 定位到了某个 bucket 后,需要确保这个 bucket 对应的老 bucket 完成了迁移过程。即老 bucket 里的 key 都要迁移到新的 bucket 中来(分裂到 2 个新 bucket),才能在新的 bucket 中进行插入或者更新的操作。

  3. 上面说的操作是在函数靠前的位置进行的,只有进行完了这个搬迁操作后,我们才能放心地在新 bucket 里定位 key 要安置的地址,再进行之后的操作。

  4. 现在到了定位 key 应该放置的位置了,所谓找准自己的位置很重要。准备两个指针,一个(inserti)指向 key 的 hash 值在 tophash 数组所处的位置,另一个(insertk)指向 cell 的位置(也就是 key 最终放置的地址),当然,对应 value 的位置就很容易定位出来了。这三者实际上都是关联的,在 tophash 数组中的索引位置决定了 key 在整个 bucket 中的位置(共 8 个 key),而 value 的位置需要“跨过” 8 个 key 的长度。

  5. 在循环的过程中,inserti 和 insertk 分别指向第一个找到的空闲的 cell。如果之后在 map 没有找到 key 的存在,也就是说原来 map 中没有此 key,这意味着插入新 key。那最终 key 的安置地址就是第一次发现的“空位”(tophash 是 empty)。

  6. 如果这个 bucket 的 8 个 key 都已经放置满了,那在跳出循环后,发现 inserti 和 insertk 都是空,这时候需要在 bucket 后面挂上 overflow bucket。当然,也有可能是在 overflow bucket 后面再挂上一个 overflow bucket。这就说明,太多 key hash 到了此 bucket。

  7. 在正式安置 key 之前,还要检查 map 的状态,看它是否需要进行扩容。如果满足扩容的条件,就主动触发一次扩容操作。

  8. 这之后,整个之前的查找定位 key 的过程,还得再重新走一次。因为扩容之后,key 的分布都发生了变化。

  9. 最后,会更新 map 相关的值,如果是插入新 key,map 的元素数量字段 count 值会加 1;在函数之初设置的 hashWriting 写标志出会清零。

  10. 另外,有一个重要的点要说一下。前面说的找到 key 的位置,进行赋值操作,实际上并不准确。我们看 mapassign 函数的原型就知道,函数并没有传入 value 值,所以赋值操作是什么时候执行的呢?

1
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer

答案还得从汇编语言中寻找。我直接揭晓答案,有兴趣可以私下去研究一下。mapassign 函数返回的指针就是指向的 key 所对应的 value 值位置,有了地址,就很好操作赋值了。

Map删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
//如果map为空,直接panic
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return
}
//如果正在写,则异常并发读写
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}

//计算hash值
hash := t.hasher(key, uintptr(h.hash0))

//设置标志位
h.flags ^= hashWriting

//获取槽
bucket := hash & bucketMask(h.B)
if h.growing() { //如果在扩容,直接触发扩容
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top { //双循环,如果topHash不一致,且不为emptyRest则继续
if b.tophash[i] == emptyRest { //否则下一个overflow
break search
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.key.equal(key, k2) {
continue
}
// Only clear key if there are pointers in it.
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
memclrHasPointers(k, t.key.size)
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
memclrNoHeapPointers(e, t.elem.size)
}
b.tophash[i] = emptyOne
// If the bucket now ends in a bunch of emptyOne states,
// change those to emptyRest states.
// It would be nice to make this a separate function, but
// for loops are not currently inlineable.
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
for {
b.tophash[i] = emptyRest
if i == 0 {
if b == bOrig {
break // beginning of initial bucket, we're done.
}
// Find previous bucket, continue at its last entry.
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {

}
i = bucketCnt - 1
} else {
i--
}
if b.tophash[i] != emptyOne {
break
}
}
notLast:
h.count-- //数量减1
break search
}
}
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
//读写恢复
h.flags &^= hashWriting
}

当然,我们只关心 mapdelete 函数。它首先会检查 h.flags 标志,如果发现写标位是 1,直接 panic,因为这表明有其他协程同时在进行写操作。

计算 key 的哈希,找到落入的 bucket。检查此 map 如果正在扩容的过程中,直接触发一次搬迁操作。

删除操作同样是两层循环,核心还是找到 key 的具体位置。寻找过程都是类似的,在 bucket 中挨个 cell 寻找。

找到对应位置后,对 key 或者 value 进行“清零”操作

最后,将 count 值减 1,将对应位置的 tophash 值置成 Empty

Map类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type maptype struct {
typ _type
key *_type
elem *_type
bucket *_type // internal type representing a hash bucket
// function for hashing keys (ptr to key, seed) -> hash
hasher func(unsafe.Pointer, uintptr) uintptr
keysize uint8 // size of key slot
elemsize uint8 // size of elem slot
bucketsize uint16 // size of bucket
flags uint32
}

type _type struct {
size uintptr
ptrdata uintptr // size of memory prefix holding all pointers
hash uint32
tflag tflag
align uint8
fieldAlign uint8
kind uint8
// function for comparing objects of this type
// (ptr to object A, ptr to object B) -> ==?
equal func(unsafe.Pointer, unsafe.Pointer) bool
// gcdata stores the GC type data for the garbage collector.
// If the KindGCProg bit is set in kind, gcdata is a GC program.
// Otherwise it is a ptrmask bitmap. See mbitmap.go for details.
gcdata *byte
str nameOff
ptrToThis typeOff
}

Map进阶

  1. Key可以是float型

    从语法上看,是可以的。Go 语言中只要是可比较的类型都可以作为 key。除开 slice,map,functions 这几种类型,其他类型都是 OK 的。具体包括:布尔值、数字、字符串、指针、通道、接口类型、结构体、只包含上述类型的数组。这些类型的共同特征是支持 ==!= 操作符k1 == k2 时,可认为 k1 和 k2 是同一个 key。如果是结构体,则需要它们的字段值都相等,才被认为是相同的 key

    chan是怎么比较的?

  2. 删除Key之后,内存是直接释放吗?

    首先看看删除逻辑代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    if t.indirectkey() {	//如果key是指针
    *(*unsafe.Pointer)(k) = nil //则将key置位nil
    } else if t.key.ptrdata != 0 { //key中含有指针
    memclrHasPointers(k, t.key.size)
    }
    e := add(unsafe.Pointer(b),dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
    if t.indirectelem() { //如果val是指针
    *(*unsafe.Pointer)(e) = nil
    } else if t.elem.ptrdata != 0 { //如果val含有指针
    memclrHasPointers(e, t.elem.size)
    } else {
    memclrNoHeapPointers(e, t.elem.size) //执行飞对指针
    }
    b.tophash[i] = emptyOne

    其中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func memclrHasPointers(ptr unsafe.Pointer, n uintptr) {
    bulkBarrierPreWrite(uintptr(ptr), 0, n) //添加写屏障
    memclrNoHeapPointers(ptr, n) //clears n bytes starting at ptr 非堆
    }

    func typedmemclr(typ *_type, ptr unsafe.Pointer) {
    if writeBarrier.needed && typ.ptrdata != 0 { //类型含有指针字段
    bulkBarrierPreWrite(uintptr(ptr), 0, typ.ptrdata)
    }
    memclrNoHeapPointers(ptr, typ.size)
    }

    首先 memclrHasPointerstypedmemclr 的区别仅仅是 调用者是否直到清理的类型中是否含有堆指针

参考文献

  1. 如何设计并实现一个线程安全的Map
  2. https://www.kancloud.cn/kancloud/the-way-to-go/72489
  3. https://blog.csdn.net/u010853261/article/details/99699350
  4. https://cloud.tencent.com/developer/article/1468799
  5. https://www.jianshu.com/p/7782d82f5154
  6. https://qcrao.com/2019/05/22/dive-into-go-map/
  7. https://github.com/golang/go/issues/20135