首先,让我们先来回顾 Go 运行时的 GPM 模型。这方面的介绍网上的资料都非常非常多了,但是我们也不妨回顾一下:
GPM模型中的G代表goroutine。每个goroutine只占用几KB的内存,可以轻松创建成千上万个。G包含了goroutine的栈、指令指针和其他信息,如阻塞channel的等待队列等。
P代表processor,可以理解为一个抽象的CPU核心。P的数量默认等于实际的CPU核心数,但可以通过环境变量进行调整。P维护了一个本地的goroutine队列,还负责执行goroutine并管理与之关联的上下文信息。
M代表machine,是操作系统线程。一个M必须绑定一个P才能执行goroutine。当一个M阻塞时,运行时会创建一个新的M或者复用一个空闲的M来保证P的数量总是等于GOMAXPROCS的值,从而充分利用CPU资源。
在这个模型中,P扮演了承上启下的角色。它连接了G和M,实现了用户层级的goroutine到操作系统线程的映射。这种设计允许Go在用户空间进行调度,避免了频繁的系统调用,大大提高了并发效率。
调度过程中,当一个goroutine被创建时,它会被放到P的本地队列或全局队列中。如果P的本地队列已满,一些goroutine会被放到全局队列。当P执行完当前的goroutine后,会优先从本地队列获取新的goroutine来执行。如果本地队列为空,P会尝试从全局队列或其他P的队列中偷取goroutine。
这种工作窃取(work-stealing)算法确保了负载的动态平衡。当某个P的本地队列为空时,它可以从其他P的队列中窃取一半的goroutine,这有效地平衡了各个P之间的工作负载。
Go 运行时这么做,主要还是减少 P 之间对获取 goroutine 之间的竞争。本地队列 runq 主要由持有它的 P 进行读写,只有在"被偷"的情况下,才可能有"数据竞争"的问题,而这种情况发生概率较少,所以它设计了一个高效的 runq
数据结构来应对这么场景。实际看起来和上面介绍的 PoolDequeue 有异曲同工之妙。
本文还会介绍 global queue 等数据结构,但不是本文的重点。
runq
在运行时中 P
是一个复杂的数据结构,下面列出了本文关注的它的几个字段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
type guintptr uintptr
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }
func (gp *guintptr) cas(old, new guintptr) bool {
return atomic.Casuintptr((*uintptr )(unsafe.Pointer(gp)), uintptr (old), uintptr (new ))
}
type p struct {
id int32
status uint32
link puintptr
schedtick uint32
syscalltick uint32
sysmontick sysmontick
m muintptr
mcache *mcache
pcache pageCache
raceprocctx uintptr
deferpool []*_defer
deferpoolbuf [32 ]*_defer
goidcache uint64
goidcacheend uint64
runqhead uint32
runqtail uint32
runq [256 ]guintptr
runnext guintptr
...
}
runq
是一个无锁循环队列,由数组实现,它的长度是 256,这个长度是固定的,不会动态调整。runqhead
和 runqtail
分别是队列的头和尾,runqhead
指向队列的头部,runqtail
指向队列的尾部。runq
数组的每个元素是一个 guintptr
类型,它是一个 uintptr
类型的别名,用来存储 g
的指针。
runq
的操作主要是 runqput
、runqputslow
、runqputbatch
、runqget
、runqdrain
、runqgrab
、runqsteal
等方法。
接下来我们捡重点的方法看一下它是怎么实现高效额度并发读写的.
runqput
runqput
方法是向 runq
中添加一个 g
的方法,它是一个无锁的操作,不会阻塞。它的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func runqput(pp *p, gp *g, next bool ) {
if !haveSysmon && next {
next = false
}
if randomizeScheduler && next && randn(2 ) == 0 {
next = false
}
if next {
retryNext:
oldnext := pp.runnext
if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&pp.runqhead)
t := pp.runqtail
if t-h < uint32 (len (pp.runq)) {
pp.runq[t%uint32 (len (pp.runq))].set(gp)
atomic.StoreRel(&pp.runqtail, t+1 )
return
}
if runqputslow(pp, gp, h, t) {
return
}
goto retry
}
runqput
方法的实现非常简单,它首先判断是否需要优先处理 runnext
,如果需要,就将 g
放到 runnext
中,然后再将 g
放到 runq
中。runq
的操作是无锁的,它通过 atomic
包提供的原子操作来实现。 这里使用的内部的更精细化的原子操作,这个也是我后面专门有一篇文章来讲解的。你现在大概把①、④ 理解为Load
、Store
操作即可。
②、⑤ 分别处理本地队列未满和队列已满的情况,如果队列未满,就将 g
放到队列中,然后更新队尾;如果队列已满,就调用 runqputslow
方法,将 g
放到全局队列中。
③ 处直接将 g
放到队列中,这是因为只有当前的 P
才能操作 runq
,所以不会有并发问题。 同时我们也可以看到,我们总是往尾部插入, t
总是一直增加的, 取余操作保证了循环队列的特性。
runqputslow
会把本地队列中的一半的 g
放到全局队列中,包括当前要放入的 g
。一旦涉及到全局队列,就会有一定的竞争,Go运行时使用了一把锁来控制并发,所以 runqputslow
方法是一个慢路径,是性能的瓶颈点。
runqputbatch
func runqputbatch(pp *p, q *gQueue, qsize int)
是批量往本地队列中放入 g
的方法,比如它从其它 P
那里偷来一批 g
,需要放到本地队列中,就会调用这个方法。它的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func runqputbatch(pp *p, q *gQueue, qsize int ) {
h := atomic.LoadAcq(&pp.runqhead)
t := pp.runqtail
n := uint32 (0 )
for !q.empty() && t-h < uint32 (len (pp.runq)) {
gp := q.pop()
pp.runq[t%uint32 (len (pp.runq))].set(gp)
t++
n++
}
qsize -= int (n)
if randomizeScheduler {
off := func (o uint32 ) uint32 {
return (pp.runqtail + o) % uint32 (len (pp.runq))
}
for i := uint32 (1 ); i < n; i++ {
j := cheaprandn(i + 1 )
pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)]
}
}
atomic.StoreRel(&pp.runqtail, t)
if !q.empty() {
lock(&sched.lock)
globrunqputbatch(q, int32 (qsize))
unlock(&sched.lock)
}
}
①获取队列头,使用原子操作获取队头。
它下面一行是获取队尾的值,你可以思考下为什么不需要使用atomic.LoadAcq
。
② 逐个的将 g
放到队列中,直到放完或者放满。
如果是随机调度器,则使用混淆算法将队列中的 g
随机打乱。
最后如果队列还有剩余的 g
,则调用 globrunqputbatch
方法,将剩余的 g
放到全局队列中。
runqget
runqget
方法是从 runq
中获取一个 g
的方法,它是一个无锁的操作,不会阻塞。它的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func runqget(pp *p) (gp *g, inheritTime bool ) {
next := pp.runnext
if next != 0 && pp.runnext.cas(next, 0 ) {
return next.ptr(), true
}
for {
h := atomic.LoadAcq(&pp.runqhead)
t := pp.runqtail
if t == h {
return nil , false
}
gp := pp.runq[h%uint32 (len (pp.runq))].ptr()
if atomic.CasRel(&pp.runqhead, h, h+1 ) {
return gp, false
}
}
}
① 如果有 runnext
,则优先处理 runnext
,将 runnext
中的 g
取出来。
② 获取队列头。 如果 ③ 队列为空,直接返回。
④ 获取队头的 g
,这就是要读取的 g
。
⑤ 更新队头,这里使用的是 atomic.CasRel
方法,它是一个原子的 Compare-And-Swap
操作,用来更新队头。
可以看到这里只使用到了队列头runqhead
。
runqdrain
runqdrain
方法是从 runq
中获取所有的 g
的方法,它是一个无锁的操作,不会阻塞。它的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func runqdrain(pp *p) (drainQ gQueue, n uint32 ) {
oldNext := pp.runnext
if oldNext != 0 && pp.runnext.cas(oldNext, 0 ) {
drainQ.pushBack(oldNext.ptr())
n++
}
retry:
h := atomic.LoadAcq(&pp.runqhead)
t := pp.runqtail
qn := t - h
if qn == 0 {
return
}
if qn > uint32 (len (pp.runq)) {
goto retry
}
if !atomic.CasRel(&pp.runqhead, h, h+qn) {
goto retry
}
for i := uint32 (0 ); i < qn; i++ {
gp := pp.runq[(h+i)%uint32 (len (pp.runq))].ptr()
drainQ.pushBack(gp)
n++
}
return
}
runqgrab
runqgrab
方法是从 runq
中获取一半的 g
的方法,它是一个无锁的操作,不会阻塞。它的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func runqgrab(pp *p, batch *[256 ]guintptr, batchHead uint32 , stealRunNextG bool ) uint32 {
for {
h := atomic.LoadAcq(&pp.runqhead)
t := atomic.LoadAcq(&pp.runqtail)
n := t - h
n = n - n/2
if n == 0 {
if stealRunNextG {
if next := pp.runnext; next != 0 {
if pp.status == _Prunning {
if !osHasLowResTimer {
usleep(3 )
} else {
osyield()
}
}
if !pp.runnext.cas(next, 0 ) {
continue
}
batch[batchHead%uint32 (len (batch))] = next
return 1
}
}
return 0
}
if n > uint32 (len (pp.runq)/2 ) {
continue
}
for i := uint32 (0 ); i < n; i++ {
g := pp.runq[(h+i)%uint32 (len (pp.runq))]
batch[(batchHead+i)%uint32 (len (batch))] = g
}
if atomic.CasRel(&pp.runqhead, h, h+n) {
return n
}
}
}
① 取一半的 g
,这里是一个简单的算法,取一半的 g
。
② 如果要偷取 runnext
中的 g
,则会尝试偷取 runnext
中的 g
。
③ 如果要偷取的 g
数量超过一半,则重试。
④ 将队列中至多一半的 g
放入 batch
中。
⑤ 更新队头,这里使用的是 atomic.CasRel
方法,它是一个原子的 Compare-And-Swap
操作,用来更新队头。
runqsteal
runqsteal
方法是从其它 P
的 runq
中偷取 g
的方法,它是一个无锁的操作,不会阻塞。它的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func runqsteal(pp, p2 *p, stealRunNextG bool ) *g {
t := pp.runqtail
n := runqgrab(p2, &pp.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
gp := pp.runq[(t+n)%uint32 (len (pp.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.LoadAcq(&pp.runqhead)
if t-h+n >= uint32 (len (pp.runq)) {
throw("runqsteal: runq overflow" )
}
atomic.StoreRel(&pp.runqtail, t+n)
return gp
}
它实际使用了 runqgrab
方法来偷取 g
,然后再从 runq
中取出一个 g
。
以上就是runq
的主要操作,它针对Go调度器的特点,设计了一套特定的队列操作的函数,这些函数都是无锁的,不会阻塞,保证了高效的并发读写。
gQueue
和 gList
gQueue
和 gList
是 Go 运行时中的两个队列,它们都是用来存储 g
的,但是它们的实现方式不同。
gQueue
是一个G的双端队列,可以从首尾增加gp, 通过g.schedlink链接。一个G只能在一个gQueue或gList上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
type gQueue struct {
head guintptr
tail guintptr
}
func (q *gQueue) empty() bool {
return q.head == 0
}
func (q *gQueue) push(gp *g) {
gp.schedlink = q.head
q.head.set(gp)
if q.tail == 0 {
q.tail.set(gp)
}
}
func (q *gQueue) pushBack(gp *g) {
gp.schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink.set(gp)
} else {
q.head.set(gp)
}
q.tail.set(gp)
}
func (q *gQueue) pushBackAll(q2 gQueue) {
if q2.tail == 0 {
return
}
q2.tail.ptr().schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink = q2.head
} else {
q.head = q2.head
}
q.tail = q2.tail
}
func (q *gQueue) pop() *g {
gp := q.head.ptr()
if gp != nil {
q.head = gp.schedlink
if q.head == 0 {
q.tail = 0
}
}
return gp
}
func (q *gQueue) popList() gList {
stack := gList{q.head}
*q = gQueue{}
return stack
}
而gList
是一个G的链表,通过g.schedlink链接。一个G只能在一个gQueue或gList上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type gList struct {
head guintptr
}
func (l *gList) empty() bool {
return l.head == 0
}
func (l *gList) push(gp *g) {
gp.schedlink = l.head
l.head.set(gp)
}
func (l *gList) pushAll(q gQueue) {
if !q.empty() {
q.tail.ptr().schedlink = l.head
l.head = q.head
}
}
func (l *gList) pop() *g {
gp := l.head.ptr()
if gp != nil {
l.head = gp.schedlink
}
return gp
}
这是常规的数据结构中链表的实现,你可以和教科书中的介绍和实现做对比,看看书本中的内容如何应用到显示的工程中的。
global runq
一个全局的runq
用来处理太多的goroutine, 在本地runq
中的goroutine太少的情况下,从全局队列中偷取goroutine。 主要用来处理P中goroutine不均的情况。
因为它直接使用一把锁(sched.lock
),而不是lock-free的数据结构,所以代码阅读和理解起来会相对简单一些。这里就不详细介绍了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
var (
sched schedt
)
type schedt struct {
...
runq gQueue
runqsize int32
...
}
func globrunqput(gp *g) {
assertLockHeld(&sched.lock)
sched.runq.pushBack(gp)
sched.runqsize++
}
func globrunqputhead(gp *g) {
assertLockHeld(&sched.lock)
sched.runq.push(gp)
sched.runqsize++
}
func globrunqputbatch(batch *gQueue, n int32 ) {
assertLockHeld(&sched.lock)
sched.runq.pushBackAll(*batch)
sched.runqsize += n
*batch = gQueue{}
}
func globrunqget(pp *p, max int32 ) *g {
assertLockHeld(&sched.lock)
if sched.runqsize == 0 {
return nil
}
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32 (len (pp.runq))/2 {
n = int32 (len (pp.runq)) / 2
}
sched.runqsize -= n
gp := sched.runq.pop()
n--
for ; n > 0 ; n-- {
gp1 := sched.runq.pop()
runqput(pp, gp1, false )
}
return gp
}