go语言调度器

调度器就是将goroutine分配到工作线程中运行

涉及3种类型的对象:

G - goroutine

M - 工作线程即os线程

P - 处理器,一种用来运行go代码的抽象资源,最大数目不能超过GOMAXPROCS,在运行go代码时需要关联一个M

全局的运行队列:

G *runtime·sched.runqhead;
G *runtime·sched.runqtail;
int runtime·sched.runqsize;

P结构的主要成员:包含一个本地运行队列

struct P
{
    uint32      status;         // one of Pidle/Prunning/...
    uint32      schedtick;      // incremented on every scheduler call
 
    M*  m;              // back-link to associated M (nil if idle)
 
    // Queue of runnable goroutines.
    uint32      runqhead;
    uint32      runqtail;
    G*  runq[256];
};

schedule函数主要部分代码

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
static void
schedule(void)
{
    G *gp;
    uint32 tick;
 
    ......
top:
    ......
    
    gp = nil;
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
    tick = g->m->p->schedtick;
    // This is a fancy way to say tick%61==0,
    // it uses 2 MUL instructions instead of a single DIV and so is faster on modern processors.
    if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime·sched.runqsize > 0) {
        runtime·lock(&runtime·sched.lock);
        gp = globrunqget(g->m->p, 1);
        runtime·unlock(&runtime·sched.lock);
        if(gp)
                resetspinning();
    }
    if(gp == nil) {
        gp = runqget(g->m->p);
        if(gp && g->m->spinning)
                runtime·throw("schedule: spinning with local work");
    }
    if(gp == nil) {
        gp = findrunnable();  // blocks until work is available
        resetspinning();
    }
 
    execute(gp);
}

调度器在超过一定间隔时间的情况下,为了公平原则,首先会从全局的运行队列获取G

从本地的运行队列中获取G

等待新的G进入运行队列

globrunqget从全局运行队列获取G,同时它还会将一定数量的G转移到P的本地运行队列中.

runqget从本地运行队列获取G,本地运行队列的实现是无锁的:

// Get g from local runnable queue.
// Executed only by the owner P.
static G*
runqget(P *p)
{
    G *gp;
    uint32 t, h;
 
    for(;;) {
        h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with other consumers
        t = p->runqtail;
        if(t == h)
                return nil;
        gp = p->runq[h%nelem(p->runq)];
        if(runtime·cas(&p->runqhead, h, h+1))  // cas-release, commits consume
                return gp;
    }
}

findrunnable阻塞等待可运行的G

  • 检查本地运行队列

  • 检查全局运行队列

  • 以non-blocking的模式poll network

  • 检查其它P的本地运行队列

  • 如果最后依旧无法在系统内获取到G,那么就以blocking的模式poll network

    // Finds a runnable goroutine to execute.

    // Tries to steal from other P's, get g from global queue, poll network.

    static G*

    findrunnable(void)

    {

    G *gp;

    P *p;

    int32 i;

    top:

    if(runtime·sched.gcwaiting) {

    gcstopm();

    goto top;

    }

    if(runtime·fingwait && runtime·fingwake && (gp = runtime·wakefing()) != nil)

    runtime·ready(gp);

    // local runq

    gp = runqget(g->m->p);

    if(gp)

    return gp;

    // global runq

    if(runtime·sched.runqsize) {

    runtime·lock(&runtime·sched.lock);

    gp = globrunqget(g->m->p, 0);

    runtime·unlock(&runtime·sched.lock);

    if(gp)

    return gp;

    }

    // poll network

    gp = runtime·netpoll(false); // non-blocking

    if(gp) {

    injectglist(gp->schedlink);

    runtime·casgstatus(gp, Gwaiting, Grunnable);

    return gp;

    }

    // If number of spinning M's >= number of busy P's, block.

    // This is necessary to prevent excessive CPU consumption

    // when GOMAXPROCS>>1 but the program parallelism is low.

    if(!g->m->spinning && 2 * runtime·atomicload(&runtime·sched.nmspinning) >= runtime·gomaxprocs - runtime·atomicload(&runtime·sched.npidle)) // TODO: fast atomic

    goto stop;

    if(!g->m->spinning) {

    g->m->spinning = true;

    runtime·xadd(&runtime·sched.nmspinning, 1);

    }

    // random steal from other P's

    for(i = 0; i < 2*runtime·gomaxprocs; i++) {

    if(runtime·sched.gcwaiting)

    goto top;

    p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs];

    if(p == g->m->p)

    gp = runqget(p);

    else

    gp = runqsteal(g->m->p, p);

    if(gp)

    return gp;

    }

    stop:

    // return P and block

    runtime·lock(&runtime·sched.lock);

    if(runtime·sched.gcwaiting) {

    runtime·unlock(&runtime·sched.lock);

    goto top;

    }

    if(runtime·sched.runqsize) {

    gp = globrunqget(g->m->p, 0);

    runtime·unlock(&runtime·sched.lock);

    return gp;

    }

    p = releasep();

    pidleput(p);

    runtime·unlock(&runtime·sched.lock);

    if(g->m->spinning) {

    g->m->spinning = false;

    runtime·xadd(&runtime·sched.nmspinning, -1);

    }

    // check all runqueues once again

    for(i = 0; i < runtime·gomaxprocs; i++) {

    p = runtime·allp[i];

    if(p && p->runqhead != p->runqtail) {

    runtime·lock(&runtime·sched.lock);

    p = pidleget();

    runtime·unlock(&runtime·sched.lock);

    if(p) {

    acquirep(p);

    goto top;

    }

    break;

    }

    }

    // poll network

    if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {

    if(g->m->p)

    runtime·throw("findrunnable: netpoll with p");

    if(g->m->spinning)

    runtime·throw("findrunnable: netpoll with spinning");

    gp = runtime·netpoll(true); // block until new work is available

    runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());

    if(gp) {

    runtime·lock(&runtime·sched.lock);

    p = pidleget();

    runtime·unlock(&runtime·sched.lock);

    if(p) {

    acquirep(p);

    injectglist(gp->schedlink);

    runtime·casgstatus(gp, Gwaiting, Grunnable);

    return gp;

    }

    injectglist(gp);

    }

    }

    stopm();

    goto top;

    }