/* * Sketch of a global pool of per-CPU task workers. * * Every priority level at which there is a task queue has at least one * worker per CPU. We aim for one idle worker per CPU, by having every * idle worker try to start a new one as soon as it gets work to do, * but that's not always possible. * * XXX What we really want is one worker per CPU per task queue, if * there is a task to be running there. That way, if your tasks sleep, * mine don't block, even if they are running on the same CPU. * * So, each worker will work on only one task queues's tasks at a time. * Once done, it becomes idle and can be reassigned to another task * queue, or will commit suicide if there's nothing to be done for a * little while. * * Problem: Resource exhaustion. task_schedule needs to be cheap and * can't start kthreads. If a task queue has no worker on a CPU, and * there are no idle workers on that CPU, what to do? We can have one * overseer per CPU which only holds onto tasks until they can be * assigned to a queue, but does not execute them. * * Problem: Draining. If a task is on the overseer's queue, how do we * know whether to drain it or not? * * Lock order: * * 1. struct taskqueue_percpu::tqpc_lock * 2. struct taskworker_pool::twp_lock * 3. struct drain::d_lock */ struct taskqueue { ... struct percpu *tq_percpu; ... }; struct taskqueue_percpu { kmutex_t tqpc_lock; struct taskworker *tqpc_worker; struct task_head tqpc_overflow; TAILQ_ENTRY(taskqueue_percpu) tqpc_entry; /* * tqpc_cv is signalled when * * (a) there is a new task to be executed, or * (b) a task has finished executing. */ kcondvar_t tqpc_cv; }; TAILQ_HEAD(taskqueue_percpu_head, taskqueue_percpu); struct taskworker { struct taskqueue_percpu *tw_tqpc; TAILQ_ENTRY(taskworker) tw_entry; union { struct { struct lwp *lwp; } thread; } tw_u; struct task_head tw_queue; struct task *tw_current_task; int tw_flags; #define TASKWORKER_DONE 0x01 #define TASKWORKER_RESCHEDULED 0x02 #define TASKWORKER_EXIT 0x04 }; TAILQ_HEAD(taskworker_head, taskworker); struct taskworker_pool { kmutex_t twp_lock; struct taskworker *twp_overseer; struct taskqueue_percpu_head *twp_overflow; struct taskworker_head *twp_idle_workers; /* twp_cv is signalled when twp_idle_workers becomes non-empty. */ kcondvar_t twp_cv; }; static struct percpu *taskworker_pools[PRI_COUNT + 1]; static struct percpu * taskworker_pool_percpu(pri_t pri) { const unsigned int i = (pri == PRI_NONE? PRI_COUNT : pri); KASSERT(0 <= i); KASSERT(i < __arraycount(taskworker_pools)); return taskworker_pools[i]; } static struct taskworker_pool * taskworker_pool_get(pri_t pri) { return percpu_getref(taskworker_pool_percpu(pri)); } static void taskworker_pool_put(pri_t pri) { percpu_putref(taskworker_pool_percpu(pri)); } void taskqueue_destroy(struct taskqueue *taskqueue) { ... #if DIAGNOSTIC percpu_foreach(&taskqueue->tq_percpu, &taskqueue_destroy_percpu, taskqueue); #endif ... } #if DIAGNOSTIC static void taskqueue_destroy_percpu(void *tqpc_v, void *taskqueue_v, struct cpu_info *ci) { struct taskqueue_percpu *const tqpc = tqpc_v; struct taskqueue *const taskqueue = taskqueue_v; KASSERT(tqpc->tqpc_worker == NULL); KASSERT(TAILQ_EMPTY(&tqpc->tqpc_overflow)); } #endif struct drain { kmutex_t d_lock; kcondvar_t d_cv; unsigned int d_count; struct taskworker d_worker; struct drain_task *d_tasks; }; struct drain_task { struct task dt_task; struct drain *dt_drain; }; void taskqueue_drain(struct taskqueue *taskqueue) { xcfunc_t xcfunc; struct drain drain; /* We are going to sleep big-time. */ ASSERT_SLEEPABLE(); /* Set up some temporary state for draining. */ mutex_init(&drain->d_lock, MUTEX_DEFAULT, IPL_VM); cv_init(&drain->d_cv, "tqdrain"); drain->d_count = taskqueue->tq_nworkers; taskworker_init(&drain->d_worker, taskqueue->tq_pri, IPL_VM); drain->d_worker.tw_u.thread.lwp = curlwp; CTASSERT(MAXCPUS <= (SIZE_MAX / sizeof(drain->d_tasks[0]))); drain->d_tasks = kmem_alloc((sizeof(drain->d_tasks[0]) * ncpu), KM_SLEEP); /* Do the per-CPU draining setup. */ switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: xcfunc = &taskqueue_drain_softint_xc; break; case TASKQUEUE_THREAD: { xcfunc = &taskqueue_drain_thread_xc; break; } default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } xc_wait(xc_broadcast(0, xcfunc, taskqueue, &drain)); /* Wait for the tasks running on other CPUs to complete. */ mutex_spin_enter(&drain->d_lock); while (0 < drain->d_count) cv_wait(&drain->d_cv, &drain->d_lock); mutex_spin_exit(&drain->d_lock); /* Run all the tasks that couldn't run on other CPUs. */ while (!TAILQ_EMPTY(&drain->d_worker.tw_queue)) taskworker_run_1(&drain->d_worker, &drain->d_worker.tw_queue); /* Nuke the temporary drain state. */ taskworker_destroy(&drain->d_worker); KASSERT(drain->d_count == 0); cv_destroy(&drain->d_cv); mutex_destroy(&drain->d_lock); } static void taskqueue_drain_softint_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const dt = &drain->d_tasks[cpu_index(curcpu)]; /* Just enqueue the drain task and schedule the softint. */ kpreempt_disable(); taskworker_enqueue(...the local softint taskworker..., &dt->dt_task); softint_schedule(&taskqueue->tq_u.softint.cookie); kpreempt_enable(); } static void taskqueue_drain_thread_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const dt = &drain->d_tasks[cpu_index(curcpu)]; struct taskqueue_percpu *const tqpc = percpu_getref(taskqueue->tq_percpu); mutex_enter(&tqpc->tqpc_lock); /* If there are no tasks assigned to this CPU, we're done. */ if (tqpc->tqpc_worker == NULL) goto out; struct taskworker_pool *const pool = taskworker_pool_get(taskqueue->tq_pri); if (tqpc->tqpc_worker == pool->twp_overseer) { /* * Tasks are assigned to this CPU but there is no * worker to run them. We can't run them now because * we're in the xcall thread, which prohibits sleeps. * Take them from the overseer, move them to the drain * worker, and signal that this CPU is done. */ struct task *task, *next; mutex_enter(&pool->twp_lock); TAILQ_FOREACH_SAFE(task, &pool->twp_overseer->tw_queue, task_entry, next) { if (task->task_tqpc != tqpc) continue; TAILQ_REMOVE(&pool->twp_overseer->tw_queue, task, task_entry); mutex_enter(&drain->d_lock); TAILQ_INSERT_TAIL(&drain->d_worker->tw_queue, task, task_entry); mutex_exit(&drain->d_lock); } mutex_exit(&pool->twp_lock); mutex_enter(&drain->d_lock); if (--drain->d_count == 0) cv_signal(&drain->d_cv); mutex_exit(&drain->d_lock); } else { /* * Tasks are assigned to this CPU and there is a worker * to run them. Schedule the drain task. */ taskworker_schedule(tqpc, &dt->dt_task); } taskworker_pool_put(taskqueue->tq_pri); out: mutex_exit(&tqpc->tqpc_lock); percpu_putref(taskqueue->tq_percpu); } void task_schedule(struct taskqueue *taskqueue, struct task *task) { struct cpu_info *ci; struct taskqueue_percpu *tqpc; KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't schedule: %p", task); retry: tqpc = percpu_getref(&taskqueue->tq_percpu); ci = curcpu(); mutex_enter(&tqpc->tqpc_lock); if (__predict_false(curcpu() != ci)) { /* XXX Better way to avoid switching here? */ mutex_exit(&tqpc->tqpc_lock); percpu_putref(&taskqueue->tq_percpu); goto retry; } if (__predict_false(tqpc->tqpc_worker == NULL)) { struct taskworker_pool *const pool = taskworker_pool_get(taskqueue->tq_pri); mutex_enter(&pool->twp_lock); if (TAILQ_EMPTY(&pool->twp_idle_workers)) { tqpc->tqpc_worker = pool->twp_overseer; TAILQ_INSERT_TAIL(&pool->twp_overflow, tqpc, tqpc_entry); } else { tqpc->tqpc_worker = TAILQ_FIRST(&pool->twp_idle_workers); TAILQ_REMOVE(&pool->twp_idle_workers, tqpc->tqpc_worker, tw_entry); tqpc->tqpc_worker->tw_tqpc = tqpc; } cv_signal(&pool->twp_cv); mutex_exit(&pool->twp_lock); taskworker_pool_put(taskqeue->tq_pri); } KASSERT(tqpc->tqpc_worker != NULL); taskworker_schedule(tqpc, task); mutex_exit(&tqpc->tqpc_lock); percpu_putref(&taskqueue->tq_worker_percpu); } static void taskworker_schedule(struct taskqueue_percpu *tqpc, struct task *task) { struct taskqueue_percpu *tqpc0; KASSERT(mutex_owned(&tqpc->tqpc_lock)); KASSERT(tqpc->tqpc_worker != NULL); if (__predict_false((tqpc0 = atomic_cas_ptr(&task->task_tqpc, NULL, tqpc)) != NULL)) { if (tqpc0 != tqpc) { mutex_exit(&tqpc->tqpc_lock); mutex_enter(&tqpc0->tqpc_lock); } if (task->task_tqpc != tqpc0) /* tqpc0->tqpc_worker already ran it. */ goto out; if (tqpc0->tqpc_worker->tw_current_task != task) /* tqpc0->tqpc_worker is still scheduled to run it. */ goto out; if (ISSET(tqpc0->tqpc_worker->tw_flags, TASKWORKER_RESCHEDULED)) /* It's running and already rescheduled. */ goto out; /* * It's running. Put it back on the queue and notify * everyone else that it's been rescheduled. */ worker0->tw_flags |= TASKWORKER_RESCHEDULED; TAILQ_INSERT_TAIL(&worker0->tw_queue, task, task_entry); out: if (tqpc0 != tqpc) { mutex_exit(&tqpc0->tqpc_lock); mutex_enter(&tqpc->tqpc_lock); } return; } taskworker_enqueue(tqpc, task); } static void __dead taskworker_overseer_thread(void *arg) { struct taskworker *const worker = arg; struct taskworker_pool *const pool = taskworker_pool_get(worker->tw_pri); /* Dropping the reference is safe because we're bound to the CPU. */ taskworker_pool_put(worker->tw_pri); mutex_enter(&pool->tw_ for (;;) { while (TAILQ_EMPTY(&worker->tw_queue) && !suicide) ...; } mutex_exit(&worker->tw_lock); kthread_exit(0); } static void __dead taskworker_thread(void *arg) { struct taskworker *const worker = arg; struct taskworker_pool *pool; bool suicide = false; mutex_enter(&worker->tw_lock); for (;;) { while (TAILQ_EMPTY(&worker->tw_queue) && !suicide) suicide = taskworker_pool_wait(worker); if (suicide) break; KASSERT(!TAILQ_EMPTY(&worker->tw_queue)); /* * While we're working, move ourselves to the end of * the worker list, take ourselves off the idle list, * and if all workers are busy, create a new one. */ pool = percpu_getref(taskworker_pool(taskworker)); TAILQ_REMOVE(&pool->twp_workers, worker, tw_entry); TAILQ_INSERT_TAIL(&pool->twp_workers, worker, tw_entry); TAILQ_REMOVE(&pool->twp_idle_workers, worker, tw_idle_entry); const bool all_busy = TAILQ_EMPTY(&pool->twp_idle_workers); percpu_putref(taskworker_pool(taskworker)); /* * If all the workers in this pool are busy, create a * new one. We must drop the lock to do this because * it requires allocation to create a kthread. This is * not a problem: we rely on no invariants here, and * nobody will remove entries from our queue anyway. */ if (all_busy) { mutex_exit(&worker->tw_lock); taskworker_create(worker->tw_pri); mutex_enter(&worker->tw_lock); KASSERT(!TAILQ_EMPTY(&worker->tw_queue)); } /* * Run all the tasks we have to run. */ KASSERT(!TAILQ_EMPTY(&worker->tw_queue)); while (!TAILQ_EMPTY(&worker->tw_queue)) taskworker_run_1(worker, &worker->tw_queue); /* * Now that we're idle, move ourselves back to the * front of the worker list and put ourselves back on * the idle worker list. */ pool = percpu_getref(taskworker_pool(taskworker)); TAILQ_REMOVE(&pool->twp_workers, worker, tw_entry); TAILQ_INSERT_HEAD(&pool->twp_workers, worker, tw_entry); TAILQ_INSERT_TAIL(&pool->twp_idle_workers, worker, tw_idle_entry); percpu_putref(taskworker_pool(taskworker)); } mutex_exit(&worker->tw_lock); kthread_exit(0); } static bool taskworker_thread_wait(struct taskworker *worker) { struct taskworker_pool *pool; /* * If we're the last on the CPU, we want to wait indefinitely * until someone wakes us to schedule a task or kill us. * Otherwise, we wait only until we've been idle for a while * and then commit suicide. */ pool = taskworker_pool_get(worker); const bool last_idle = (TAILQ_NEXT(TAILQ_FIRST(&pool->twp_idle_workers)) == NULL); KASSERT(!last_idle || TAILQ_FIRST(&pool->twp_idle_workers) == worker); taskworker_pool_put(worker); /* Wait. If someone wakes us, continue. */ if (last_idle) { cv_wait(&worker->tw_cv, &worker->tw_lock); return false; } else if (cv_timedwait(&worker->tw_cv, &worker->tw_lock, TASKWORKER_IDLE_TICKS) == 0) { return false; } /* We timed out. Get ready to commit suicide. */ pool = taskworker_pool_get(worker); if (!TAILQ_EMPTY(&worker->tw_queue) || ISSET(worker->tw_flags, TASKWORKER_EXIT) || (TAILQ_NEXT(TAILQ_FIRST(&pool->twp_idle_workers)) == NULL)) { /* * Someone gave us a task or killed us, or the other * idle workers all committed suicide. */ taskworker_pool_put(worker); return false; } /* No more work. Commit suicide. */ TAILQ_REMOVE(&pool->twp_workers, worker, tw_entry); TAILQ_REMOVE(&pool->twp_idle_workers, worker, tw_idle_entry); while (!TAILQ_EMPTY(&worker->tw_taskqueues)) { struct taskqueue_percpu *const tqpc = TAILQ_FIRST(&worker->tw_taskqueues); KASSERT(tqpc->tqpc_worker == worker); tqpc->tqpc_worker = NULL; TAILQ_REMOVE(&worker->tw_taskqueues, tqpc, tqpc_entry); } taskworker_pool_put(worker); return true; } static void taskworker_create(pri_t pri) { int error; struct taskworker *const worker = pool_cache_get(&taskworker_pool, PR_NOWAIT); if (worker == NULL) return; taskworker_init(worker, pri); /* XXX KTHREAD_NOWAIT? */ error = kthread_create(pri, KTHREAD_MPSAFE, curcpu(), &taskworker_thread, worker, &worker->tw_u.thread.lwp, "taskw%d", pri); if (error) pool_cache_put(&taskworker_pool, worker); struct taskworker_pool *const pool = taskworker_pool_get(worker); TAILQ_INSERT_HEAD(&pool->twp_workers, worker, tw_entry); TAILQ_INSERT_TAIL(&pool->twp_idle_workers, worker, tw_entry); taskworker_pool_put(worker); }