GMP
协程
协程是用户态的概念。多个协程实际上映射为1个线程。
协程是用户态概念,因此创建、销毁、调度都在用户态完成,不需要切换内核态。
由于协程从属于同一个内核级线程,因此实际上无法并行;而一个协程的阻塞最终也会导致整个线程下的所有协程阻塞。
Goroutine
Go解耦了协程和线程的绑定关系,从而使线程变为一个中间层,协程可以灵活地映射到不同的线程上,相当于“虚拟线程”。
好处如下:
- 可以利用多个线程,实现并行
- 通过调度器,实现灵活的映射
- 栈空间动态扩展(线程大小固定,会产生内存浪费)
GMP
Goroutine Machine Processor
GMP就是协程调度器。
GMP有一个全局队列存储Goroutine;不过实际上Processor都会优先在自己的本地队列调度Goroutine(没有则向全局队列获取),并映射Goroutine到Machine上执行。
如果全局队列没有Goroutine,那么会尝试获取就绪态(正在IO)的协程。
如果仍然失败,那么会从其他Processor中窃取一半的Goroutine,实现负载均衡。
全局队列是互斥的,获取Goroutine要防止获取多次。
type schedt struct { ... lock mutex runq gQueue runqsize int32 }
|
G
Goroutine需要绑定到Processor才能运行,Processor就是对CPU资源的抽象。
type g struct { ... m *m sched gobuf }
type gobuf struct { sp uintptr pc uintptr ret uintptr bp uintptr }
|
M
Machine是对线程的抽象。
Machine不能直接执行Goroutine,而需要首先与Processor绑定,由Processor实现代理。
同时,由于Processor中间层的存在,Goroutine与Machine不是紧耦合的,Goroutine完全可以跨Machine运行。
type m struct { ... g0 *g tls }
|
P
Processor是Golang的调度器。Processor代理Machine执行,提供一个透明(不可见)的调度机制。Processor的数量决定了Goroutine的并行程度。(当然,最终由CPU核数决定)
type p struct { ... runqhead uint32 runqtail uint32 runq [256]guintptr
runnext guintptr }
|
g0与g的转换
g0与m一对一绑定,负责执行g之间的切换调度
Goroutine调度
g0 -> schedule() -> execute() -> gogo() -> g
g -> m_call() m_call() -> gosched_m() -> schedule()
m_call() -> park_m(): 暂停goroutine park_m() -> schedule()
m_call() -> goexit0(): Monitor g goexit0() -> schedule()
|
主动调度
用户发起调度,主动执行让渡
func Gosched() { checkTimeouts() mcall(gosched_m) }
|
被动调度
互斥锁、等待等状态
func gopark()
func goready()
|
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { mcall(park_m) }
func park_m(gp *g) { _g_ := getg()
casgstatus(gp, _Grunning, _Gwaiting) dropg()
... schedule() }
|
func goready(gp *g, traceskip int) { systemstack(func()) { ready(gp, traceskip, true) } }
func ready(gp *g, traceskip int, next bool) { ... _g_ := getg() ... casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) ... }
|
正常调度
Goroutine正常执行结束,通过m_call()
返回控制权给g0
func Gosched() { ... mcall(gosched_m) }
func gosched_m(gp *g) { goshedImpl(gp) }
func goschedImpl(gp *g) { status := readgstatus(gp) if (status&^_Gscan != _Grunning) { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) dropg() lock(&sched.lock) globrunqput(gp) unlock(&sched.lock)
schedule() }
|
抢占调度
Monitor g全局监控完成
如果某个Goroutine发起系统调用,并过长时间占据Processor(如恶意抢占系统资源),Monitor g将会转移这个Goroutine所在的Processor与Machine的绑定,从而避免该Processor的阻塞。
注意,Monitor g并没有办法中断系统调用中的Goroutine(此时已经在内核态)。
schedule()
调度流程主干方法:
- 寻找下一个可执行的Goroutine
- 执行Goroutine
func schedule() { ... gp, inheritTime, tryWakeP := findRunnable()
... execute(gp, inheritTime) }
|
findRunnable()
为了防止Processor过于繁忙,全局队列的Goroutine饿死,每61次调度后Processor就会优先从全局队列取Goroutine。
此时,如果本地Processor队列满了,会将本地Goroutine踢出,以换取全局Goroutine入队,负载均衡。
如果全局队列是空的,会获取因为IO操作而处于就绪态的Goroutine。
如果没有获取到IO中的Goroutine,当前Processor将会为其他Processor负载均衡,获取其他Processor队列中一半的Goroutine到本地队列。
func findRunnable() (gp *g, inheritTime, tryWakeP bool) { _g_ := getg()
top: _p_ := _g_.m.p.ptr() ... if _p_.schedtick % 61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_p_, 1) unlock(&sched.lock) if gp != nil { return gp, false, false } }
... if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime, false }
... if sched.runqsize != 0 { lock(&sched.lock) gp = globrunqget(_p_, 0) unlock(&sched.lock) }
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll != 0) { if list := netpoll(0); !list.empty() { gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, +Grunnable) return gp, false, false } }
... procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) }
gp, inheritTime, tnow, w, newWork := stealWork(now) now = tnow if gp != nil { return gp, inheritTime, false } if newWork { goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w } } }
|
stealWork()
负载均衡,从其他Processor中获取Goroutine时,stealWork
- 最多遍历4次队列。其中一次成功就会return
- 每一次尝试获取Processor之前,都会对队列局部加锁(锁住队列头和队尾即可)
execute()
func execute(gp *g, inheritTime bool) { _g_ := getg()
_g_.m.curg = gp gp.m = _g_.m casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { _g_.m.p.ptr().schedtick++ }
gogo(&gp.sched) }
|
GeoHash