Go的schedt调度(runtime/proc.go)

1. 创建go的入口函数

// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {gp := getg()pc := sys.GetCallerPC()systemstack(func() {newg := newproc1(fn, gp, pc, false, waitReasonZero)pp := getg().m.p.ptr()runqput(pp, newg, true)//true代表放置为runnext优先级g;内部随机数使得优先级g降级为普通g;runnext位置仅1,cas进行新旧替换;普通g放置在p队列尾if mainStarted {wakep() //内部使用cas 和 判断自旋m数量 来保证不过度创建m 和 避免抢占}})
}

2.schedule函数

// 调度器的一轮操作:查找可运行的goroutine并执行它
// 该函数不会返回
func schedule() {mp := getg().m// 如果当前M持有锁,则抛出错误(锁的存在可能破坏调度逻辑)if mp.locks != 0 {throw("schedule: holding locks")}// 如果当前M被锁定到某个goroutine,需要先释放P再执行该Gif mp.lockedg != 0 {stoplockedm()execute(mp.lockedg.ptr(), false) // 该函数不会返回}// 我们不能从执行cgo调用的G中调度新的G,因为cgo调用正在使用M的g0栈if mp.incgo {throw("schedule: in cgo")}top:pp := mp.p.ptr()pp.preempt = false// 安全性检查:如果当前M处于自旋状态,本地运行队列应该为空// 在调用checkTimers之前执行此检查,因为checkTimers可能会调用goready// 将就绪的G放入本地运行队列if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {throw("schedule: spinning with local work")}// 找到可运行的G(findRunnable会阻塞直到有工作可用)gp, inheritTime, tryWakeP := findRunnable()// findRunnable可能收集了allp快照。快照仅在findRunnable内部需要,这里清除它// 以便GC可以回收该slicemp.clearAllpSnapshot()// 如果即将调度一个非普通G(如GCworker或tracereader),需要唤醒一个P(如果有的话)if tryWakeP {wakep()}// 如果G被锁定到某个M,则将当前P交给该锁定的M,然后阻塞等待新的Pif gp.lockedm != 0 {startlockedm(gp)goto top}// 执行G(该函数不会返回)execute(gp, inheritTime)
}

3.schedule中的findrunnable函数

// 查找可运行的goroutine执行
// 优先尝试从其他P窃取工作,或从本地/全局队列获取,或轮询网络
// tryWakeP表示返回的goroutine是非正常的(如GC工作线程、追踪读取器),调用者需要尝试唤醒P
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {mp := getg().m// 此处与handoffp中的条件需保持一致:如果findrunnable会返回可运行的G,handoffp必须启动一个M
top:// 可能已收集allp快照。快照仅在每次循环迭代时需要。清空快照以便GC回收切片mp.clearAllpSnapshot()pp := mp.p.ptr()// 如果处于gcwaitting状态 将当前m暂停 重新进行查找if sched.gcwaiting.Load() {gcstopm()goto top}if pp.runSafePointFn != 0 {runSafePointFn()}// now和pollUntil为后续工作窃取保存,可能窃取定时器// 在now到执行工作窃取期间不能阻塞,以确保这些数值相关now, pollUntil, _ := pp.timers.check(0, nil)// 尝试调度追踪读取器if traceEnabled() || traceShuttingDown() {gp := traceReader()if gp != nil {trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, true}}// 尝试调度GC工作线程if gcBlackenEnabled != 0 {gp, tnow := gcController.findRunnableGCWorker(pp, now)if gp != nil {return gp, false, true}now = tnow}// 偶尔检查全局可运行队列以确保公平性// 否则两个goroutine可能通过不断重启彼此完全占用本地队列if pp.schedtick%61 == 0 && !sched.runq.empty() {lock(&sched.lock)gp := globrunqget()unlock(&sched.lock)if gp != nil {return gp, false, false}}// 唤醒终结器Gif fingStatus.Load()&(fingWait|fingWake) == fingWait|fingWake {if gp := wakefing(); gp != nil {ready(gp, 0, true)}}// 唤醒一个或多个清理Gif gcCleanups.needsWake() {gcCleanups.wake()}if *cgo_yield != nil {asmcgocall(*cgo_yield, nil)}// 本地运行队列if gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, false}// 全局运行队列if !sched.runq.empty() {lock(&sched.lock)gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp != nil {if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}return gp, false, false}}// 网络轮询// 该netpoll是工作窃取前的优化操作// 如果没有等待者或线程已阻塞在netpoll中,可以安全跳过// 若与阻塞线程存在逻辑竞争(如已返回但未设置lastpoll),后续仍会处理// 为避免多核机器内核争用,每次仅允许一个线程进行轮询if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 && sched.pollingNet.Swap(1) == 0 {list, delta := netpoll(0)sched.pollingNet.Store(0)if !list.empty() { // 非阻塞gp := list.pop()injectglist(&list)netpollAdjustWaiters(delta)trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}}// 自旋线程:从其他P窃取工作// 自旋线程数量限制为忙碌P数量的一半// 这是为了防止GOMAXPROCS>>1但程序并行度低时的CPU过载if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {if !mp.spinning {mp.becomeSpinning()}gp, inheritTime, tnow, w, newWork := stealWork(now)if gp != nil {// 成功窃取return gp, inheritTime, false}if newWork {// 可能有新的定时器或GC工作,重启循环goto top}now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {// 更早的定时器需要等待pollUntil = w}}// 没有工作可做// 如果处于GC标记阶段且有安全扫描/染色对象的工作,则运行空闲时间标记而非放弃Pif gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) && gcController.addIdleMarkWorker() {node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())if node != nil {pp.gcMarkWorkerMode = gcMarkWorkerIdleModegp := node.gp.ptr()trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}gcController.removeIdleMarkWorker()}// WASM平台专用逻辑// 如果回调返回且没有其他goroutine唤醒,则唤醒事件处理goroutine// 该goroutine会暂停执行直到回调被触发gp, otherReady := beforeIdle(now, pollUntil)if gp != nil {trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}if otherReady {goto top}// 释放P前获取allp快照,该切片可能在不再阻塞安全点时被修改// 不需要快照内容因为直到cap(allp)都是不可变的// 通过mp.clearAllpSnapshot(在schedule中)和每次循环迭代后清除快照allpSnapshot := mp.snapshotAllp()// 同时快照掩码。值变化可以接受,但长度不能在我们处理时变化idlepMaskSnapshot := idlepMasktimerpMaskSnapshot := timerpMask// 释放P并阻塞lock(&sched.lock)if sched.gcwaiting.Load() || pp.runSafePointFn != 0 {unlock(&sched.lock)goto top}if !sched.runq.empty() {gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp == nil {throw("global runq empty with non-zero runqsize")}if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}return gp, false, false}if !mp.spinning && sched.needspinning.Load() == 1 {// 参考下方"Delicate dance"注释mp.becomeSpinning()unlock(&sched.lock)goto top}if releasep() != pp {throw("findrunnable: wrong p")}now = pidleput(pp, now)unlock(&sched.lock)// 精密的舞蹈:线程从自旋状态转为非自旋状态时,可能与新工作提交并发// 必须先减少nmspinning计数,再通过StoreLoad内存屏障检查所有源// 若顺序颠倒,其他线程可能在我们检查完所有源后提交工作但之前已减少nmspinning// 导致无人唤醒线程执行工作//// 适用于以下工作源:// * 加入全局或P本地运行队列的goroutine// * P本地定时器堆的新/修改定时器// * 空闲优先级的GC工作(除非golang.org/issue/19112)//// 如果发现新工作,需要恢复m.spinning状态以唤醒新工作线程// (因为可能有多个饥饿的goroutine)//// 但若发现新工作后又观察到没有空闲P(在此处或resetspinning中),则存在问题// 我们可能与上方非自旋M的释放P并发竞争,导致P进入空闲状态// 这会丢失工作守恒(空闲P时仍有可运行工作),极端情况下可能导致死锁//// 通过sched.needspinning与非自旋M同步// 当非自旋M准备释放P时,若发现needspinning被设置则中止释放并转为自旋// 若没有并发竞争且系统完全负载,则无需自旋线程,下一个自然转为自旋的线程会清除标志// 另见文件顶部的"Worker thread parking/unparking"注释wasSpinning := mp.spinningif mp.spinning {mp.spinning = falseif sched.nmspinning.Add(-1) < 0 {throw("findrunnable: negative nmspinning")}// 注意正确性要求:只有最后一个从自旋转为非自旋的线程需要重新检查// 但运行时存在一些nmspinning的临时增加未通过此路径减少的情况// 因此必须保守地对所有自旋线程执行检查// 参考https://go.dev/issue/43997// 再次检查全局和P运行队列lock(&sched.lock)if !sched.runq.empty() {pp, _ := pidlegetSpinning(0)if pp != nil {gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp == nil {throw("global runq empty with non-zero runqsize")}if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}acquirep(pp)mp.becomeSpinning()return gp, false, false}}unlock(&sched.lock)pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)if pp != nil {acquirep(pp)mp.becomeSpinning()goto top}// 再次检查空闲优先级GC工作pp, gp := checkIdleGCNoP()if pp != nil {acquirep(pp)mp.becomeSpinning()// 运行空闲工作线程pp.gcMarkWorkerMode = gcMarkWorkerIdleModetrace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}// 最后检查定时器创建/过期是否与自旋状态转换并发// 注意此处不能使用checkTimers因为它可能分配内存,而我们没有活动P时不允许分配pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)}// 此时不再需要allp快照,但没有P时不能清除(需写屏障)// 轮询网络直到下一个定时器if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {sched.pollUntil.Store(pollUntil)if mp.p != 0 {throw("findrunnable: netpoll with p")}if mp.spinning {throw("findrunnable: netpoll with spinning")}delay := int64(-1)if pollUntil != 0 {if now == 0 {now = nanotime()}delay = pollUntil - nowif delay < 0 {delay = 0}}if faketime != 0 {// 使用假时间时直接轮询delay = 0}list, delta := netpoll(delay) // 阻塞直到有新工作// 刷新时间戳(可能阻塞后)now = nanotime()sched.pollUntil.Store(0)sched.lastpoll.Store(now)if faketime != 0 && list.empty() {// 使用假时间且无就绪工作时停止M// 当所有M停止时,checkdead会调用timejumpstopm()goto top}lock(&sched.lock)pp, _ := pidleget(now)unlock(&sched.lock)if pp == nil {injectglist(&list)netpollAdjustWaiters(delta)} else {acquirep(pp)if !list.empty() {gp := list.pop()injectglist(&list)netpollAdjustWaiters(delta)trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}if wasSpinning {mp.becomeSpinning()}goto top}} else if pollUntil != 0 && netpollinited() {pollerPollUntil := sched.pollUntil.Load()if pollerPollUntil == 0 || pollerPollUntil > pollUntil {netpollBreak()}}stopm()goto top
}

3.1 适当检查全局队列策略

    // 偶尔检查全局可运行队列以确保公平性// 否则两个goroutine可能通过不断重启彼此完全占用本地队列if pp.schedtick%61 == 0 && !sched.runq.empty() {lock(&sched.lock)gp := globrunqget()unlock(&sched.lock)if gp != nil {return gp, false, false}}// local runqif gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, false}// global runqif !sched.runq.empty() {lock(&sched.lock)gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp != nil {if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}return gp, false, false}}

        这是findRunnable函数的一部分,其中检查全局队列,每61次检查全局队列,并取一个g,日常使用本地队列获取g。

定次数检查全局队列的状态转换图

转换前

转换后

检查本地队列的状态转换图

// 从本地可运行队列获取Goroutine
// 如果 inheritTime 为 true,gp 应该继承当前时间片中剩余的时间
// 否则,它应该开始一个新的时间片
// 该函数仅由当前P(逻辑处理器)的拥有者执行
func runqget(pp *p) (gp *g, inheritTime bool) {// 如果存在 runnext,则优先获取该Goroutinenext := pp.runnext// 如果 runnext 不为0且CAS(比较并交换)操作成功,说明该Goroutine未被其它P抢占// 注意:若CAS失败,可能是其它P将runnext置为0,无需重试// 因为只有当前P能将runnext设置为非0值if next != 0 && pp.runnext.cas(next, 0) {return next.ptr(), true}// 无限循环尝试从环形队列中获取Goroutinefor {// 原子读取队列头指针(load-acquire语义:与其它消费者同步)h := atomic.LoadAcq(&pp.runqhead)t := pp.runqtail// 如果队列为空(尾指针等于头指针),返回nilif t == h {return nil, false}// 计算Goroutine在环形队列中的索引// 并获取对应的Goroutine指针gp = pp.runq[h%uint32(len(pp.runq))].ptr()// 原子更新头指针(cas-release语义:提交消耗,保证操作可见性)// 若成功,则返回获取到的Goroutineif atomic.CasRel(&pp.runqhead, h, h+1) {return gp, false}}
}

        队列使用环形队列的方式进行存储,首先检查runnext优先级g是否存在(runnext只能被当前p置为0);然后检查本地队列,获取其中的g,并且返回。

转换前

转换后

检查全局队列状态转移图

// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqgetbatch(n int32) (gp *g, q gQueue) {assertLockHeld(&sched.lock) //必须拥有全局队列锁if sched.runq.size == 0 {return}n = min(n, sched.runq.size, sched.runq.size/gomaxprocs+1)gp = sched.runq.pop()n--for ; n > 0; n-- {gp1 := sched.runq.pop()q.pushBack(gp1)}return
}

执行到本函数时,前提是上一步骤本地队列为空的情况下,将从全局队列中取g,g的数量为给定的n(输入为本地队列长度的一半),全局队列长度,全局队列长度/最大设置p数量 + 1的最小值。

// runqputbatch tries to put all the G's on q on the local runnable queue.
// If the local runq is full the input queue still contains unqueued Gs.
// Executed only by the owner P.
func runqputbatch(pp *p, q *gQueue) {if q.empty() {return}h := atomic.LoadAcq(&pp.runqhead)t := pp.runqtailn := uint32(0)for !q.empty() && t-h < uint32(len(pp.runq)) {gp := q.pop()pp.runq[t%uint32(len(pp.runq))].set(gp)t++n++}// 随机化处理 打乱g加入队列的顺序 防止饥饿等问题if randomizeScheduler {// 计算偏移量off := func(o uint32) uint32 {return (pp.runqtail + o) % uint32(len(pp.runq))}for i := uint32(1); i < n; i++ {j := cheaprandn(i + 1)pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)]}}atomic.StoreRel(&pp.runqtail, t)return
}

将从全局队列获取的g,尽可能的存放入本地p队列当中。

转换前

转换后

3.2 网络轮询的处理

// 网络轮询
// 此netpoll仅作为优化措施,用于在工作窃取之前尝试获取网络事件。
// 如果没有等待者或线程已阻塞在netpoll中,可以安全地跳过此步骤。
// 当与阻塞线程存在逻辑竞争时(例如该线程已从netpoll返回但尚未设置lastpoll),
// 本线程仍会执行阻塞式netpoll操作。
// 为避免多核机器上的内核争用,我们确保同一时间只有一个线程在进行轮询。
if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 && sched.pollingNet.Swap(1) == 0 {list, delta := netpoll(0) // 非阻塞式轮询sched.pollingNet.Store(0) // 重置轮询状态标志if !list.empty() {gp := list.pop() // 获取等待的goroutineinjectglist(&list) // 将goroutine列表注入运行队列netpollAdjustWaiters(delta) // 调整等待计数trace := traceAcquire()// 原子状态转换:从等待状态转为可运行状态casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0) // 跟踪goroutine解除阻塞traceRelease(trace)  // 释放跟踪资源}return gp, false, false // 返回可运行的goroutine}
}func injectglist(glist *gList) {if glist.empty() {return}lock(&sched.lock)var n intfor n = 0; !glist.empty(); n++ {gp := glist.pop()casgstatus(gp, _Gwaiting, _Grunnable)globrunqput(gp) // 其余goroutine放入全局队列}unlock(&sched.lock)// 尝试启动新的M来处理这些goroutinefor ; n != 0 && sched.npidle.Load() != 0; n-- {startm(nil, false, false)}
}

状态转移前

状态转移后

3.3 窃取其他p的本地g操作

// 自旋的M:从其他P偷取工作
//
// 限制自旋M的数量不超过忙碌P的一半。
// 这是为了防止当GOMAXPROCS远大于1但程序并行度较低时
// 出现过高的CPU消耗。
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {if !mp.spinning {mp.becomeSpinning()}gp, inheritTime, tnow, w, newWork := stealWork(now)if gp != nil {// 成功偷取工作return gp, inheritTime, false}if newWork {// 可能有新的定时器或GC工作;需要重新启动以发现goto top}now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {// 有更早的定时器需要等待pollUntil = w}
}
核心操作函数

        在本地p队列为空,随机获取其他p的g(最大偷取数量为目标p队列长度的一半),持有目标p的runq.lock;在本地队列p为空且定时器堆也为空,则获取其他p的定时器来获取关联g,持有目标p的mu。

// stealWork 尝试从任何P中偷取可运行的goroutine或定时器
//
// 如果newWork为true,表示可能有新工作被就绪
//
// 如果now不为0,则表示传入的当前时间。stealWork返回传入的时间或
// 当now为0时返回当前时间
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {pp := getg().m.p.ptr()ranTimer := falseconst stealTries = 4for i := 0; i < stealTries; i++ {stealTimersOrRunNextG := i == stealTries-1for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {if sched.gcwaiting.Load() {// GC工作可能已就绪return nil, false, now, pollUntil, true}p2 := allp[enum.position()]if pp == p2 {continue}// 从p2偷取定时器。这个checkTimers调用是唯一可能持有// 其他P的定时器锁的地方。我们在这个循环的最后阶段检查// runnext之前先检查定时器,因为从其他P的runnext偷取// 应该是最后的选择,如果存在可偷取的定时器优先处理//// 我们只在其中一个偷取循环中检查定时器,因为now的值// 在这个循环中不会变化,多次检查相同时间点的定时器// 可能浪费性能//// timerpMask告诉我们P是否可能拥有定时器。如果P不可能// 拥有定时器,则无需检查if stealTimersOrRunNextG && timerpMask.read(enum.position()) {tnow, w, ran := p2.timers.check(now, nil)now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {pollUntil = w}if ran {// 运行定时器可能使任意数量的G就绪// 并将它们添加到本P的本地运行队列中// 这会破坏runqsteal的假设(运行队列有足够空间)// 所以现在需要检查本P本地队列是否有G可运行if gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, now, pollUntil, ranTimer}ranTimer = true}}// 如果p2处于空闲状态,无需尝试偷取if !idlepMask.read(enum.position()) {if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {return gp, false, now, pollUntil, ranTimer}}}}// 未找到可偷取的goroutine。不管怎样,运行定时器可能使// 某些goroutine就绪。指示需要等待的下一个定时器return nil, false, now, pollUntil, ranTimer
}

状态转移前

状态转移后

        最后,当所有的工作都找不到可用的g来运行,就会休眠m,等待唤醒。

3.4 execute唤醒

// 将gp调度到当前M上运行
// 如果inheritTime为true,则gp继承当前时间片的剩余时间,否则开始新的时间片
// 此函数不会返回// 允许写屏障操作,因为该函数在多个位置获取P后立即调用
//go:yeswritebarrierrec
func execute(gp *g, inheritTime bool) {mp := getg().mif goroutineProfile.active {// 确保gp的堆栈信息已记录到goroutine profile中// 记录时保持goroutine profiler首次暂停世界时的状态tryRecordGoroutineProfile(gp, nil, osyield)}// 在进入_Grunning状态前为运行中的G绑定Mmp.curg = gpgp.m = mpgp.syncSafePoint = false // 清除可能由morestack设置的标志casgstatus(gp, _Grunnable, _Grunning) // 原子操作修改goroutine状态gp.waitsince = 0         // 清除等待时间戳gp.preempt = false       // 清除抢占标志gp.stackguard0 = gp.stack.lo + stackGuard // 设置栈保护指针if !inheritTime {mp.p.ptr().schedtick++ // 时间片不继承时增加调度计数器}// 检查是否需要开启/关闭CPU profilerhz := sched.profilehzif mp.profilehz != hz {setThreadCPUProfiler(hz) // 设置线程CPU分析频率}trace := traceAcquire()if trace.ok() {trace.GoStart()       // 启动追踪事件traceRelease(trace)   // 释放追踪资源}gogo(&gp.sched) // 进入goroutine执行入口
}

状态转移前

状态转移

3.5 gosched 主动让出

// Gosched让出处理器,允许其他goroutine运行。该函数不会挂起当前goroutine,
// 因此当前goroutine会在后续自动恢复执行。
//
//go:nosplit // 编译器指令:禁止函数分割栈(保持原栈结构执行)
func Gosched() {// 检查是否有定时器需要处理(如超时事件)checkTimeouts()// 调用系统调用处理函数,将控制权交还调度器// mcall会切换到系统栈执行gosched_m函数mcall(gosched_m)
}// goschedImpl函数让出处理器,允许其他goroutine运行。preempted参数表示是否是被抢占的情况。
// 该函数不会挂起当前goroutine,执行会自动恢复。
func goschedImpl(gp *g, preempted bool) {// 获取追踪资源trace := traceAcquire()// 读取当前goroutine状态status := readgstatus(gp)// 验证当前状态是否为运行中状态(排除扫描状态)if status&^_Gscan != _Grunning {dumpgstatus(gp) // 打印状态信息用于调试throw("bad g status") // 状态异常时抛出错误}// 如果追踪可用,记录相关事件if trace.ok() {// 在状态转换前记录追踪事件,可能需要获取堆栈信息// 但转换后将不再拥有当前堆栈if preempted {trace.GoPreempt() // 抢占事件追踪} else {trace.GoSched()   // 主动让出处理器的追踪}}// 原子操作将当前goroutine状态从运行中改为可运行casgstatus(gp, _Grunning, _Grunnable)// 释放追踪资源if trace.ok() {traceRelease(trace)}// 从当前M的绑定关系中解绑goroutinedropg()// 加锁操作lock(&sched.lock)// 将当前goroutine放入全局运行队列globrunqput(gp)// 解锁操作unlock(&sched.lock)// 如果主程序已启动,唤醒空闲的处理器if mainStarted {wakep()}// 调度器开始寻找下一个要运行的goroutineschedule()
}// Gosched函数在g0栈上的执行入口(被抢占后执行)
func gosched_m(gp *g) {// 调用核心实现,传入preempted=false表示主动让出而非被抢占goschedImpl(gp, false)
}

转移前

转移后

3.6 系统调用

// 当前goroutine g 即将进入系统调用
// 记录其不再占用CPU的状态
// 此函数仅由Go系统调用库和cgo调用调用,不会被运行时的低层系统调用使用// entersyscall不能分割栈:保存操作必须使g->sched指向调用者的栈段,因为
// entersyscall将在立即返回后执行。在此期间g处于Gsyscall状态,但g.sched字段
// 的结构可能不完整,不能让GC观察到这种不一致状态// entersyscall调用的任何函数都不能分割栈
// 在活跃的系统调用期间,我们无法安全地移动栈,因为不知道uintptr参数中哪些
// 是实际的指针(指向栈内部)。实践中,这意味着快速路径必须使用无分割操作,
// 慢速路径则需要通过systemstack在系统栈上执行更大操作// reentersyscall是cgo回调使用的入口点,用于恢复显式保存的SP和PC
// 这在需要从调用栈更上层的函数调用exitsyscall时是必要的,因为g.syscallsp
// 必须始终指向有效的栈帧。下面的entersyscall是正常系统调用入口,从调用者获取SP和PC//go:nosplit
func reentersyscall(pc, sp, bp uintptr) {trace := traceAcquire()gp := getg()// 禁用抢占,因为此时g处于Gsyscall状态但g.sched字段可能不一致gp.m.locks++// entersyscall不能调用可能分割/扩展堆栈的函数(详情见上方注释)// 通过替换堆栈保护指针为会触发堆栈检查的值,并设置标志位让newstack终止gp.stackguard0 = stackPreemptgp.throwsplit = true// 保留SP用于GC和追踪回溯save(pc, sp, bp)gp.syscallsp = spgp.syscallpc = pcgp.syscallbp = bpcasgstatus(gp, _Grunning, _Gsyscall)if staticLockRanking {// 静态锁排序时,casgstatus可能调用systemstack并覆盖g.schedsave(pc, sp, bp)}if gp.syscallsp < gp.stack.lo || gp.stack.hi < gp.syscallsp {systemstack(func() {print("entersyscall inconsistent sp ", hex(gp.syscallsp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")throw("entersyscall")})}if gp.syscallbp != 0 && gp.syscallbp < gp.stack.lo || gp.stack.hi < gp.syscallbp {systemstack(func() {print("entersyscall inconsistent bp ", hex(gp.syscallbp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")throw("entersyscall")})}if trace.ok() {systemstack(func() {trace.GoSysCall()  // 记录系统调用事件traceRelease(trace) // 释放追踪资源})// systemstack本身会覆盖g.sched.{pc,sp},而我们可能需要这些信息// 当G真正被系统调用阻塞时save(pc, sp, bp)}if sched.sysmonwait.Load() {// 系统监控等待期间需要特殊处理systemstack(entersyscall_sysmon)save(pc, sp, bp)}if gp.m.p.ptr().runSafePointFn != 0 {// 在当前栈上执行runSafePointFn可能导致栈分割// 通过系统栈执行确保安全systemstack(runSafePointFn)save(pc, sp, bp)}// 记录当前系统调用计数gp.m.syscalltick = gp.m.p.ptr().syscalltickpp := gp.m.p.ptr()pp.m = 0gp.m.oldp.set(pp)gp.m.p = 0atomic.Store(&pp.status, _Psyscall) // 更新P状态为系统调用中if sched.gcwaiting.Load() {// 如果GC正在等待,需要特殊处理systemstack(entersyscall_gcwait)save(pc, sp, bp)}// 减少锁计数器,允许抢占恢复gp.m.locks--
}

转移前

转移后

3.7 网络阻塞

// 将当前goroutine置于等待状态,并在系统栈上调用unlockf函数
// 
// 如果unlockf返回false,则当前goroutine会被恢复执行
// 
// unlockf不能访问该G的栈,因为G可能在调用gopark和unlockf之间被移动到其他M
// 
// 注意:由于unlockf是在将G置于等待状态后调用的,调用时G可能已经被其他goroutine准备就绪
//       除非有外部同步机制阻止G被准备。如果unlockf返回false,必须保证G不能被外部准备
// 
// reason参数说明goroutine被阻塞的原因,会在堆栈跟踪和堆转储中显示
// 原因应该保持唯一性和描述性,不要复用原因,应添加新原因
// 
// gopark应该作为运行时内部实现细节
// 但广泛使用的包通过linkname指令访问它
// 羞耻堂成员包括:
//   - gvisor.dev/gvisor
//   - github.com/sagernet/gvisor
// 
// 不要删除或修改类型签名(见go.dev/issue/67401)
// 
//go:linkname gopark
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {if reason != waitReasonSleep {checkTimeouts() // 两个goroutine可能同时让调度器繁忙,此时需要检查超时}mp := acquirem()      // 获取当前Mgp := mp.curg         // 当前M绑定的Gstatus := readgstatus(gp)// 验证当前G状态是否合法(运行中或扫描运行中)if status != _Grunning && status != _Gscanrunning {throw("gopark: bad g status")}mp.waitlock = lock       // 保存等待锁mp.waitunlockf = unlockf // 保存解锁函数gp.waitreason = reason   // 设置等待原因mp.waitTraceBlockReason = traceReason // 设置追踪阻塞原因mp.waitTraceSkip = traceskip        // 设置追踪跳过层级releasem(mp)             // 释放当前M// 不能在此处执行可能导致G在多个M之间移动的操作mcall(park_m)            // 调用系统调用处理函数
}

状态转移前

状态转移后

3.8 定时器操作

// time.Sleep函数的实现
// 
// 该函数使当前goroutine休眠至少ns纳秒
// 
//go:linkname timeSleep time.Sleep
func timeSleep(ns int64) {if ns <= 0 {return}gp := getg() // 获取当前goroutinet := gp.timer // 获取当前goroutine的定时器if t == nil {// 如果没有定时器则创建新定时器t = new(timer)t.init(goroutineReady, gp) // 初始化定时器回调为goroutineReadyif gp.bubble != nil {      // 如果处于时间气泡中t.isFake = true // 标记为虚拟定时器}gp.timer = t // 绑定定时器到当前goroutine}var now int64if bubble := gp.bubble; bubble != nil {// 如果处于时间气泡中,使用气泡内的时间戳now = bubble.now} else {// 否则获取当前实际时间now = nanotime()}// 计算唤醒时间(当前时间+休眠时长)when := now + ns// 检查溢出情况if when < 0 {when = maxWhen // 设置为最大时间值}gp.sleepWhen = when // 保存唤醒时间// 根据是否为虚拟定时器选择不同处理方式if t.isFake {// 在协程内部调用定时器重置(因为处于时间气泡中)// 不需要担心定时器函数在协程挂起前执行,因为时间不会在挂起前推进resetForSleep(gp, nil)// 挂起当前协程,等待时间气泡中的时间推进gopark(nil, nil, waitReasonSleep, traceBlockSleep, 1)} else {// 使用系统调度器进行定时器重置gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1)}
}

定时器阻塞前

定时器唤醒

3.9 Goroutine正常结束

// Finishes execution of the current goroutine.
func goexit1() {if raceenabled {if gp := getg(); gp.bubble != nil {racereleasemergeg(gp, gp.bubble.raceaddr())}racegoend()}trace := traceAcquire()if trace.ok() {trace.GoEnd()traceRelease(trace)}mcall(goexit0)
}// goexit continuation on g0.
func goexit0(gp *g) {gdestroy(gp)schedule()
}

3.10 抢占式调度 - 信号/栈增长

func retake(now int64) uint32 {n := 0// 防止allp切片发生变化。这个锁只有在暂停世界时才可能被竞争// This lock will be completely uncontended unless we're already stopping the world.lock(&allpLock)// 我们不能在循环中使用range遍历allp,因为可能会// 临时释放allpLock。因此需要每次循环都重新获取allp。for i := 0; i < len(allp); i++ {pp := allp[i]if pp == nil {// 这可能发生在procresize已经扩容// allp但尚未创建新的P时continue}pd := &pp.sysmonticks := pp.statussysretake := falseif s == _Prunning || s == _Psyscall {// 如果某个G在同一个schedtick上运行太久就抢占// 可能是单个长时间运行的goroutine,或通过// runnext运行的一系列goroutine共享的调度时间片t := int64(pp.schedtick)if int64(pd.schedtick) != t {pd.schedtick = uint32(t)pd.schedwhen = now} else if pd.schedwhen+forcePreemptNS <= now {preemptone(pp)// 如果是系统调用状态,preemptone()可能失效// 因为此时M和P没有绑定sysretake = true}}if s == _Psyscall {// 如果系统调用时间超过1个sysmon tick(至少20us)就回收P// 一方面我们不希望在没有其他工作时回收P// 另一方面我们需要回收P以避免sysmon线程无法进入深度睡眠t := int64(pp.syscalltick)if !sysretake && int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)pd.syscallwhen = nowcontinue}// 在CAS操作前需要减少空闲锁定的M数量// 否则从系统调用返回的M可能增加nmidle并报告死锁// (假装有1个M在运行)unlock(&allpLock)incidlelocked(-1)trace := traceAcquire()if atomic.Cas(&pp.status, s, _Pidle) {if trace.ok() {trace.ProcSteal(pp, false)traceRelease(trace)}n++pp.syscalltick++handoffp(pp)} else if trace.ok() {traceRelease(trace)}incidlelocked(1)lock(&allpLock)}}unlock(&allpLock)return uint32(n)
}// 通知在处理器P上运行的goroutine停止
// 该函数是尽力而为的,可能会失败或通知错误的goroutine
// 即使通知了正确的goroutine,如果它同时正在执行newstack操作,也可能忽略请求
// 无需持有任何锁
// 返回true表示已发起抢占请求
// 实际抢占将在未来某个时刻发生,并通过gp->status不再是Grunning状态来体现
func preemptone(pp *p) bool {mp := pp.m.ptr()if mp == nil || mp == getg().m {return false}gp := mp.curgif gp == nil || gp == mp.g0 {return false}gp.preempt = true// 每个goroutine中的调用都会检查栈溢出// 通过比较当前栈指针和gp->stackguard0的值// 将gp->stackguard0设置为StackPreempt值// 可以将抢占请求合并到正常的栈溢出检查流程中gp.stackguard0 = stackPreempt// 请求对这个P进行异步抢占if preemptMSupported && debug.asyncpreemptoff == 0 {pp.preempt = truepreemptM(mp)}return true
}

        检测是否有g占用过久cpu,通过信号机制强制进行调度切换。

抢占检测

抢占执行

4.其他操作原因

4.1 cgo调用为什么不能抢占

(1) 栈隔离

  • Go 调用 C 时,会创建一个新的 C 栈,与 Go 栈分离。
  • C 代码无法直接操作 Go 栈,反之亦然。

(2) 调度器的限制

  • 抢占机制失效:Go 的抢占式调度依赖 g0 栈。在 cgo 调用期间,g0 栈被占用,调度器无法中断当前 goroutine。
  • M 与 P 的绑定:执行 cgo 调用的 M 会与 P(逻辑处理器)解绑,直到 C 调用返回。

(3) 内存和垃圾回收

  • C 代码不能被 Go 的垃圾回收器(GC)管理,因此需要手动处理内存(如 C.free 释放 C 分配的内存)。
  • 如果 C 代码中分配的内存未释放,可能导致内存泄漏。

4.2 为什么需要clearAllpSnapshot()

  1. 避免内存泄漏
    allpSnapshot是对所有P(逻辑处理器)的引用快照。若在循环中重复保留旧快照,会导致内存中存在大量不再使用的P引用。Go的GC无法回收被强引用占用的内存,长期积累可能引发内存泄漏。

  2. 解除GC压力
    快照作为slice类型,其底层数组会持有P对象的引用。即使当前迭代结束后不再使用该快照,GC仍需跟踪这些引用以判断是否可回收。主动清零(clearAllpSnapshot())可立即解除引用关系,允许GC回收相关内存。

  3. 确保数据一致性
    P的状态在运行时可能被动态修改(如迁移、销毁)。若保留旧快照,后续操作可能基于过期数据,导致逻辑错误。每次迭代后清理快照,可强制下一次迭代重新获取最新状态。

  4. 并发安全需求
    在多线程环境中,未清理的快照可能被其他goroutine访问。通过及时释放快照,减少竞态条件的风险,确保每次迭代的数据来源独立且最新。

4.3 为什么stealwork里需要窃取定时器

        每个p都有独属于自己的定时器和定时器队列,结构体如下,会维护以g的过期时间作为值维护的最小堆,保证最早到期的g优先处理。同时每个p私有化定时器,也可以减少锁的竞争。

// A timers is a per-P set of timers.
// timers 是每个P(逻辑处理器)的定时器集合
type timers struct {// mu protects timers; timers are per-P, but the scheduler can// access the timers of another P, so we have to lock.// mu 用于保护定时器;虽然定时器是每个P私有的,但调度器可能访问其他P的定时器// 因此需要锁来保证并发安全mu mutex// heap is the set of timers, ordered by heap[i].when.// Must hold lock to access.// heap 是定时器数组,按 heap[i].when(触发时间)排序// 访问此字段时必须持有锁heaptimerWhen// len is an atomic copy of len(heap).// len 是 heap 长度的原子副本,用于并发读取len atomic.Uint32// zombies is the number of timers in the heap// that are marked for removal.// zombies 是堆中标记为待移除的定时器数量zombies atomic.Int32// raceCtx is the race context used while executing timer functions.// raceCtx 是执行定时器函数时使用的竞态检测上下文raceCtx uintptr// minWhenHeap is the minimum heap[i].when value (= heap[0].when).// The wakeTime method uses minWhenHeap and minWhenModified// to determine the next wake time.// If minWhenHeap = 0, it means there are no timers in the heap.// minWhenHeap 是堆中最小的触发时间(即 heap[0].when)// wakeTime 方法会结合 minWhenHeap 和 minWhenModified// 计算下一个唤醒时间// 若 minWhenHeap = 0,表示堆中无定时器minWhenHeap atomic.Int64// minWhenModified is a lower bound on the minimum// heap[i].when over timers with the timerModified bit set.// If minWhenModified = 0, it means there are no timerModified timers in the heap.// minWhenModified 是所有标记为 timerModified 的定时器中最小的触发时间的下界// 若 minWhenModified = 0,表示堆中无 timerModified 的定时器// timerModified 标志用于表示定时器被修改过(例如重新设置触发时间)minWhenModified atomic.Int64
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/98031.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/98031.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu 服务器配置转发网络访问

配置文档&#xff1a;Ubuntu 服务器转发网络访问 一、网络拓扑以以下网络拓扑为示例Ubuntu 服务器&#xff08;两个网卡&#xff09; eth1 10.66.71.222 &#xff08;接入内网&#xff09;eno1 192.168.2.100 &#xff08;直连相机&#xff09; 相机ip 192.168.2.1 Windows 客…

为什么企业需要高防IP

1. 抵御日益猖獗的DDoS攻击 现代DDoS攻击规模已突破Tbps级别 传统防火墙无法应对大规模流量攻击 高防IP采用分布式清洗中心&#xff0c;可轻松抵御300Gbps以上的攻击流量 2. 保障业务连续性 网络中断1小时可能造成数百万损失 高防IP确保服务99.99%可用性 智能切换机制实…

CSS基础 - 选择器备忘录 --笔记5

目录基础选择器组合器伪类选择器属性选择器选择器可以选中页面上的特定元素并为其指定样式。 CSS有多种选择器。 基础选择器 标签选择器 – tagname&#xff1a;匹配目标元素的标签名。优先级是0,0,1。如&#xff1a;p、h1、div类选择器 – .class&#xff1a;匹配class属性中…

自动驾驶中的传感器技术46——Radar(7)

卫星雷达&#xff08;又称为分布式雷达&#xff09;主要讲当前雷达的雷达信号处理计算以及雷达目标相关的一些感知算法都迁移到中央域控进行&#xff0c;雷达端基本只负责数据采集&#xff0c;这样做的影响如下&#xff1a; 雷达端成本与功耗降低&#xff1b; 雷达端采样得到的…

【论文阅读】Diff-Privacy: Diffusion-based Face Privacy Protection

基于扩散模型的人脸隐私保护方法——DiffPrivacy&#xff0c;解决了两类人脸隐私任务&#xff1a;匿名化&#xff08;anonymization&#xff09;和视觉身份信息隐藏&#xff08;visual identity information hiding&#xff09;。1. 研究背景随着人工智能和大数据技术的普及&am…

React 原理篇 - 深入理解虚拟 DOM

一、什么是虚拟 DOM&#xff1f; 在前端开发中&#xff0c;“虚拟 DOM” 是一个高频出现的术语&#xff0c;尤其在 React 生态中被广泛讨论。但很多开发者对它的理解往往停留在 “JS 对象” 这个表层认知上。 实际上&#xff0c;虚拟 DOM 是一种编程概念—— 在这个概念里&…

对汇编的初理解

此处是一个简单的函数&#xff0c;里面将调用了一个函数add&#xff08;&#xff09;函数这里是函数的原型这里是调用lcd函数产生的汇编语言&#xff0c;翻译过来就是r11&#xff0c;r0cnt(r4cnt,前文有提及)&#xff0c;然后调用add函数&#xff0c;此处BL是指会回到指令的下一…

《Python 自动化实战:从零构建一个文件同步工具》

《Python 自动化实战:从零构建一个文件同步工具》 一、开篇引入:为什么我们需要文件同步? 你是否有过这样的困扰: 公司电脑和家里电脑上都有工作项目,每次更新都要手动复制? U 盘频繁传输文件,不仅麻烦还容易出错? 项目文件夹动辄几 G,每次同步都耗时长、效率低? 在…

工业相机与镜头的靶面尺寸详解:选型避坑指南

在机器视觉系统中&#xff0c;相机与镜头的靶面尺寸匹配是一个非常关键却又经常被忽略的细节。选错了&#xff0c;不但影响图像质量&#xff0c;还可能导致画面“黑角”、视野不符、镜头浪费等问题。 今天我们就用通俗易懂的方式&#xff0c;聊一聊相机与镜头靶面尺寸的那些事儿…

使用 Go 和 go-commons 实现内存指标采集并对接 Prometheus

文章目录一、准备工作二、编写内存采集代码三、运行 Exporter四、接入 Prometheus五、可扩展思路总结在运维和监控领域&#xff0c;资源指标采集 是必不可少的一环。CPU、内存、磁盘、网络这些系统资源&#xff0c;需要实时采集并上报到监控系统中。 本文以 内存指标采集 为例&…

webrtc弱网-IntervalBudget类源码分析与算法原理

一、核心功能 IntervalBudget 类用于基于时间窗口的带宽预算管理。它根据设定的目标比特率&#xff08;kbps&#xff09;和一个固定时间窗口&#xff08;500ms&#xff09;&#xff0c;计算在该时间窗口内可用的字节数&#xff08;即“预算”&#xff09;&#xff0c;并支持预…

深度学习基本模块:RNN 循环神经网络

循环神经网络&#xff08;RNN&#xff09;是一种专门用于处理序列数据的神经网络架构。与处理空间数据的卷积神经网络&#xff08;Conv2D&#xff09;不同&#xff0c;RNN通过引入循环连接使网络具有"记忆"能力&#xff0c;能够利用之前的信息来影响当前的输出&#…

React18学习笔记(二) React的状态管理工具--Redux,案例--移动端外卖平台

文章目录一.Redux的基础用法1.示例:普通网页中的Redux计步器2.Redux管理数据的流程3.配套工具和环境准备3.1.配套工具3.2.环境准备4.示例:React项目中的Redux计步器思路步骤step1:创建子模块step2:导入子模块step3:注入store实例step4:React组件内使用store中的数据step5:在组件…

34.Socket编程(UDP)(上)

点分十进制字符串IP 转 32位网络序列IP 分析&#xff1a;1&#xff09;IP转成4字节 2&#xff09;4字节转成网络序列 思路&#xff1a; "192.168.1.1" 进行字符串划分&#xff0c;以 "." 为分割符&#xff0c;分割出"192"&#xff0c;&qu…

Redis的持久化工具包—RDB AOF

文章目录 前言 一、RDB 持久化&#xff08;快照持久化&#xff09; 1. 定义 2. RDB 触发机制 &#xff08;1&#xff09;手动触发 &#xff08;2&#xff09;自动触发 3. RDB 持久化流程 4. RDB 核心配置 5. RDB 优缺点 二、AOF 持久化&#xff08;日志持久化&#xff09; 1. 定…

【Web安全】XXL-JOB框架SRC高频漏洞分析总结

文章目录前言一、核心漏洞分类与技术细节二、漏洞关联利用与攻击路径三、版本演进与修复策略四、安全运维建议五、典型漏洞复现环境搭建六、总结前言 XXL-JOB是国内主流的开源分布式任务调度框架&#xff0c;由徐雪里开发维护&#xff0c;以轻量易用、高可用、适配分布式场景等…

Capacitor 打包后接口访问不到的排查经历

我最近在用 Quasar Capacitor 6 做一个 Android App&#xff0c;前端用的是 Vue3 Quasar&#xff0c;打包交给 Capacitor 去跑在手机的 WebView 里&#xff0c;后端是 FastAPI 提供接口。开发模式下一切顺利&#xff0c;浏览器里访问接口没有任何问题&#xff0c;我甚至觉得打…

【正点原子】Linux应用编程入门~概念及环境介绍

应用编程概念 应用编程&#xff08;也可称为系统编程&#xff09;与驱动编程、裸机编程有何不同&#xff1f;系统调用&#xff1b;何为库函数&#xff1b;应用程序的 main()函数&#xff1b;应用程序开发环境的介绍&#xff1b;系统调用 定义系统调用&#xff08;system call&a…

一、HTML 完全指南:从零开始构建网页

文章目录前言一、 HTML 结构认识 HTML 标签HTML 文件基本结构标签层次结构快速生成代码框架二、 HTML 常见标签详解2.1 注释标签2.2 标题标签 (h1 - h6)2.3 段落标签 (p)2.4 换行标签 (br)2.5 格式化标签2.6 图片标签 (img)2.7 超链接标签 (a)2.8 表格标签基本使用合并单元格2.…

基于POI-TL实现动态Word模板的数据填充:【散点图】特殊处理方案

基于POI-TL实现动态Word模板的数据填充:散点图特殊处理方案 在使用POI-TL进行Word模板动态数据填充时,图表生成是一个常见需求。最近在项目中使用POI-TL处理散点图时遇到了一个特殊问题,经过研究后找到了解决方案,特此记录分享。 问题背景 POI-TL作为一款优秀的Java Wor…