Advanced Go

GMP

协程

协程是用户态的概念。多个协程实际上映射为1个线程。

协程是用户态概念,因此创建、销毁、调度都在用户态完成,不需要切换内核态。
由于协程从属于同一个内核级线程,因此实际上无法并行;而一个协程的阻塞最终也会导致整个线程下的所有协程阻塞。

Goroutine

Go解耦了协程和线程的绑定关系,从而使线程变为一个中间层,协程可以灵活地映射到不同的线程上,相当于“虚拟线程”。

好处如下:

  • 可以利用多个线程,实现并行
  • 通过调度器,实现灵活的映射
  • 栈空间动态扩展(线程大小固定,会产生内存浪费)

GMP

Goroutine Machine Processor
GMP就是协程调度器。
GMP有一个全局队列存储Goroutine;不过实际上Processor都会优先在自己的本地队列调度Goroutine(没有则向全局队列获取),并映射Goroutine到Machine上执行。
如果全局队列没有Goroutine,那么会尝试获取就绪态(正在IO)的协程。
如果仍然失败,那么会从其他Processor中窃取一半的Goroutine,实现负载均衡。

全局队列是互斥的,获取Goroutine要防止获取多次。

type schedt struct {
...
lock mutex
runq gQueue
runqsize int32
}

G

Goroutine需要绑定到Processor才能运行,Processor就是对CPU资源的抽象。

type g struct {
...
m *m // g与m映射
sched gobuf
}

type gobuf struct {
sp uintptr
pc uintptr
ret uintptr
bp uintptr
}

M

Machine是对线程的抽象。
Machine不能直接执行Goroutine,而需要首先与Processor绑定,由Processor实现代理。
同时,由于Processor中间层的存在,Goroutine与Machine不是紧耦合的,Goroutine完全可以跨Machine运行。

type m struct {
...
g0 *g // Goroutine,特殊的协程调度,与m一对一绑定,负责执行g之间的切换调度
tls // Thread Local Storage,m.tls[0]存储当前运行的g
}

P

Processor是Golang的调度器。Processor代理Machine执行,提供一个透明(不可见)的调度机制。Processor的数量决定了Goroutine的并行程度。(当然,最终由CPU核数决定)

type p struct {
...
// Head of Queue
runqhead uint32
runqtail uint32
runq [256]guintptr // Runnable Goroutine Queue

runnext guintptr // 下一个Runnable状态的Goroutine
}

g0与g的转换

g0与m一对一绑定,负责执行g之间的切换调度

// g0 -> g,g0将执行权交给对应的g
func gogo()

// g -> g0,g阻塞或协程切换使,交换控制权
func m_call()

Goroutine调度

g0 -> schedule() -> execute() -> gogo() -> g

g -> m_call()
m_call() -> gosched_m() -> schedule()

m_call() -> park_m(): 暂停goroutine
park_m() -> schedule()

m_call() -> goexit0(): Monitor g
goexit0() -> schedule()

主动调度

用户发起调度,主动执行让渡

func Gosched() {
checkTimeouts()
mcall(gosched_m)
}

被动调度

互斥锁、等待等状态

// 暂停goroutine,与processor解绑
func gopark()
// 唤醒,processor优先运行唤醒goroutine
func goready()
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
mcall(park_m)
}

func park_m(gp *g) {
_g_ := getg()

// 改变Goroutine状态
casgstatus(gp, _Grunning, _Gwaiting)
// 出队
dropg()

...
// 新一轮调度
schedule()
}
func goready(gp *g, traceskip int) {
systemstack(func()) {
ready(gp, traceskip, true)
}
}

func ready(gp *g, traceskip int, next bool) {
...
_g_ := getg()
...
// CAS状态切换
casgstatus(gp, _Gwaiting, _Grunnable)
// 入队
runqput(_g_.m.p.ptr(), gp, next)
...
}

正常调度

Goroutine正常执行结束,通过m_call()返回控制权给g0

func Gosched() {
...
mcall(gosched_m)
}

func gosched_m(gp *g) {
goshedImpl(gp)
}

func goschedImpl(gp *g) {
status := readgstatus(gp)
if (status&^_Gscan != _Grunning) {
dumpgstatus(gp)
throw("bad g status")
}
// CAS切换Goroutine状态
casgstatus(gp, _Grunning, _Grunnable)
// 解绑当前Goroutine和Processor
dropg()
// 加锁入队全局队列
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)

schedule()
}

抢占调度

Monitor g全局监控完成
如果某个Goroutine发起系统调用,并过长时间占据Processor(如恶意抢占系统资源),Monitor g将会转移这个Goroutine所在的Processor与Machine的绑定,从而避免该Processor的阻塞。
注意,Monitor g并没有办法中断系统调用中的Goroutine(此时已经在内核态)。

schedule()

调度流程主干方法:

  1. 寻找下一个可执行的Goroutine
  2. 执行Goroutine
func schedule() {
...
gp, inheritTime, tryWakeP := findRunnable()

...
execute(gp, inheritTime)
}

findRunnable()

为了防止Processor过于繁忙,全局队列的Goroutine饿死,每61次调度后Processor就会优先从全局队列取Goroutine。
此时,如果本地Processor队列满了,会将本地Goroutine踢出,以换取全局Goroutine入队,负载均衡。
如果全局队列是空的,会获取因为IO操作而处于就绪态的Goroutine。
如果没有获取到IO中的Goroutine,当前Processor将会为其他Processor负载均衡,获取其他Processor队列中一半的Goroutine到本地队列。

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
_g_ := getg()

top:
_p_ := _g_.m.p.ptr()
...
// 每61次调度,优先从全局队列获取
if _p_.schedtick % 61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_p_, 1)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}

...
// 正常情况下,从当前Processor本地队列获取Goroutine
// go特殊语法,初始化后判断
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime, false
}

...
// 本地Processor队列没有获取成功,未返回(如队列为空)
// 此时尝试从全局队列获取Goroutine
if sched.runqsize != 0 {
lock(&sched.lock)
gp = globrunqget(_p_, 0)
unlock(&sched.lock)
}

// 全局队列获取Goroutine失败
// 尝试从IO流获取
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll != 0) {
if list := netpoll(0); !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, +Grunnable)
return gp, false, false
}
}

...
// 从IO流获取失败,未返回
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}

// 负载均衡,从其他Processor中获取一半Goroutine
gp, inheritTime, tnow, w, newWork := stealWork(now)
now = tnow
if gp != nil {
return gp, inheritTime, false
}
if newWork {
goto top
}
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
}
}

stealWork()

负载均衡,从其他Processor中获取Goroutine时,stealWork

  • 最多遍历4次队列。其中一次成功就会return
  • 每一次尝试获取Processor之前,都会对队列局部加锁(锁住队列头和队尾即可)

execute()

func execute(gp *g, inheritTime bool) {
_g_ := getg()

_g_.m.curg = gp
// 映射Processor与Machine
gp.m = _g_.m
// CAS切换状态
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
// 更新调度次数
_g_.m.p.ptr().schedtick++
}

// 执行Goroutine任务
gogo(&gp.sched)
}

GeoHash