/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code is not tested! It is a draft of an idea. See below the * copyright notice for a summary. * * Third draft, with a shared pool of per-CPU task workers. */ /*- * Copyright (c) 2014 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /* * Tasks and task queues. * * For each priority at which there are task queues in use, and for * each CPU, there is a worker thread pool. A worker thread pool * consists of an overseer thread and any number of worker threads. * Each worker thread is either idle or running tasks from a single * task queue. * * When a task is scheduled, if there is a thread assigned to the task * queue on the CPU, then the task will be queued up for that thread to * run. If there is no thread assigned, and there are idle threads * available, then one of the idle threads will be chosen. If there * are no idle threads, then the overseer is asked to create a worker * thread for the task queue. * * XXX Haven't (yet?) implemented single-threaded task queues. Will be * more bookkeeping, but could reduce the footprint of seldom-used task * queues by a factor of ncpu -- effectively serving as a single lwp * cache. * * XXX Hmm... Can the worker queues be managed without locks, by * restricting access to the local CPU? That would make task_cancel * difficult, and cross-CPU requeueing logic would be hairy. * * Lock order: * * 1. callers' locks, including task_cancel interlock * 2. struct taskworker::tw_lock for one normal worker at a time * 3. struct taskworker::tw_lock for one drain worker at a time * 4. struct taskthread_pool::ttp_lock for one pool */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define container_of(PTR, TYPE, FIELD) \ ((void)sizeof((PTR) - \ &((TYPE *)(((char *)(PTR)) - \ offsetof(TYPE, FIELD)))->FIELD), \ ((TYPE *)(((char *)(PTR)) - offsetof(TYPE, FIELD)))) #define TASKWORKER_IDLE_TICKS mstohz(100) /* Data structures */ TAILQ_HEAD(task_head, task); TAILQ_HEAD(taskworker_head, taskworker); TAILQ_HEAD(taskthread_head, taskthread); struct taskqueue { struct percpu *tq_percpu; /* struct taskworker * */ union { struct { void *cookie; } softint; } tq_u; enum taskqueue_type { TASKQUEUE_SOFTINT, TASKQUEUE_THREAD, } tq_type; int tq_flags; pri_t tq_pri; int tq_ipl; const char *tq_name; }; struct taskworker { kmutex_t tw_lock; kcondvar_t tw_cv; struct taskqueue *tw_taskqueue; union { struct { struct taskthread_pool *pool; struct taskthread *thread; } thread; } tw_u; TAILQ_ENTRY(taskworker) tw_entry; struct task_head tw_tasks; struct task *tw_current_task; unsigned int tw_refcnt; int tw_flags; #define TASKWORKER_DONE 0x01 #define TASKWORKER_REQUEUED 0x02 #define TASKWORKER_CALLOUT_ACK 0x04 /* XXX KLUDGE */ #define TASKWORKER_DYING 0x08 }; struct taskthread { struct lwp *tt_lwp; struct taskthread_pool *tt_pool; struct taskworker *tt_worker; kcondvar_t tt_cv; TAILQ_ENTRY(taskthread) tt_entry; }; struct taskthread_pool { kmutex_t ttp_lock; struct taskthread ttp_overseer; struct taskworker_head ttp_workers; struct taskthread_head ttp_idle_threads; unsigned int ttp_refcnt; int ttp_flags; pri_t ttp_pri; #define TASKTHREAD_POOL_DYING 0x01 }; /* Forward declarations */ static size_t taskqueue_pri_index(pri_t, pri_t); static void task_error(struct task *); static int _taskqueue_create(struct taskqueue **, const char *, pri_t, int, int); static void _taskqueue_destroy(struct taskqueue *); static void taskworker_init(struct taskworker *, const char *, int, struct taskqueue *); static void taskworker_destroy(struct taskworker *); static int taskworker_hold(struct taskworker *); static void taskworker_rele(struct taskworker *); static int taskthread_pool_percpu_get(struct percpu **, pri_t); static void taskthread_pool_percpu_put(pri_t); static int taskthread_pool_percpu_create(struct percpu **, pri_t); static void taskthread_pool_percpu_destroy(struct percpu *); static int taskthread_pool_create(struct taskthread_pool **, struct cpu_info *, pri_t); static void taskthread_pool_destroy(struct taskthread_pool *); static int taskthread_pool_hold(struct taskthread_pool *); static void taskthread_pool_rele(struct taskthread_pool *); static int nstohz(nsec_t, jitter_t); static void taskworker_maybe_switch(struct taskworker **, struct task *); static void taskworker_schedule(struct taskworker *, struct task *); static void taskworker_assign_thread(struct taskworker *); static void taskworker_run_1(struct taskworker *, struct task_head *); static void taskworker_done(struct taskworker *, struct task *); static void taskworker_softintr(void *); static void taskworker_thread(void *) __dead; static void taskworker_overseer_thread(void *) __dead; static task_fn_t drain_task; static void taskqueue_drain_softint_xc(void *, void *); static void taskqueue_drain_thread_xc(void *, void *); /* Global state and initialization */ static pool_cache_t taskqueue_pc __read_mostly; static pool_cache_t taskworker_pc __read_mostly; static pool_cache_t taskthread_pool_pc __read_mostly; static pool_cache_t taskthread_pc __read_mostly; static kmutex_t taskqueues_lock __cacheline_aligned; static struct { struct taskqueue *taskqueue; unsigned int refcnt; } taskqueues[PRI_COUNT + 1]; /* XXX Where should these definitions go? */ #define MAXPRI_TASKQUEUE MAXPRI_SOFTINT #define MAXPRI_SOFTINT MAXPRI_KERNEL_RT #define MINPRI_SOFTINT PRI_SOFTCLOCK #define PRI_COUNT_KTHREAD (MAXPRI_KTHREAD + 1) static kmutex_t taskthread_pools_lock __cacheline_aligned; static struct { struct percpu *percpu; /* struct taskthread_pool * */ unsigned int refcnt; } taskthread_pools[PRI_COUNT_KTHREAD + 1]; struct taskqueue *system_taskqueue __read_mostly; /* * tasks_init: Initialize the task subsystem. */ void tasks_init(void) { int error; taskqueue_pc = pool_cache_init(sizeof(struct taskqueue), 0, 0, 0, "taskqueue", NULL, IPL_NONE, NULL, NULL, NULL); taskworker_pc = pool_cache_init(sizeof(struct taskworker), 0, 0, 0, "taskworker", NULL, IPL_NONE, NULL, NULL, NULL); taskthread_pool_pc = pool_cache_init(sizeof(struct taskthread_pool), 0, 0, 0, "taskthread_pool", NULL, IPL_NONE, NULL, NULL, NULL); taskthread_pc = pool_cache_init(sizeof(struct taskthread), 0, 0, 0, "taskthread", NULL, IPL_NONE, NULL, NULL, NULL); mutex_init(&taskqueues_lock, MUTEX_DEFAULT, IPL_NONE); mutex_init(&taskthread_pools_lock, MUTEX_DEFAULT, IPL_NONE); error = taskqueue_get(&system_taskqueue, PRI_NONE); if (error) panic("failed to create system task queue: %d", error); } static size_t taskqueue_pri_index(pri_t pri, pri_t maxpri __diagused) { if (pri == PRI_NONE) { return PRI_COUNT; } else { KASSERTMSG((0 <= pri), "negative priority: %d", (int)pri); KASSERTMSG((pri <= maxpri), "priority out of range: %d", (int)pri); return pri; } } /* Tasks */ static void task_error(struct task *task) { panic("destroyed task got run: %p", task); } /* * task_destroy: Destroy a task initialized with task_init. Must no * longer be scheduled. Use task_cancel first if this is not already * guaranteed. task_cancel_async is not enough. If you want to call * this inside a task callback, you must use task_done first. * * Not strictly necessary, but this is part of the documented API in * case it ever becomes necessary. */ void task_destroy(struct task *task) { KASSERTMSG((task->task_fn != &task_error), "task already destroyed: %p", task); KASSERTMSG((task->task_worker == NULL), "task still scheduled or running: %p", task); task->task_fn = task_error; } /* * task_done: To be called in a task callback to notify the task worker * that the task is done and can be scheduled again. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct task. */ void task_done(struct task *task) { struct taskworker *const worker = task->task_worker; KASSERTMSG((worker != NULL), "task not running: %p", task); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't be completing: %p", task); mutex_enter(&worker->tw_lock); KASSERTMSG((task->task_worker == worker), "task %p changed workers: %p -> %p", task, worker, task->task_worker); KASSERTMSG((worker->tw_current_task == task), "task %p is not current on worker %p", task, worker); KASSERTMSG(!ISSET(worker->tw_flags, TASKWORKER_DONE), "task %p is already done on worker %p", task, worker); taskworker_done(worker, task); mutex_exit(&worker->tw_lock); } /* Delayed tasks */ /* * delayed_task_destroy: Destroy a delayed task initialized with * delayed_task_init. Must no longer be scheduled. Use * delayed_task_cancel first if this is not already guaranteed. * delayed_task_cancel_async is not enough. If you want to call this * inside a task callback, you must use delayed_task_done first. */ void delayed_task_destroy(struct delayed_task *dt) { KASSERTMSG((!callout_pending(&dt->dt_callout)), "delayed task still pending timeout: %p", dt); KASSERTMSG((dt->dt_task.task_worker == NULL), "delayed task still scheduled or running: %p", dt); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task already destroyed: %p", dt); callout_destroy(&dt->dt_callout); task_destroy(&dt->dt_task); } /* * delayed_task_done: To be called in a delayed task callback to notify * the task worker that the task is done and can be scheduled again. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct delayed_task. */ void delayed_task_done(struct delayed_task *dt) { task_done(&dt->dt_task); } /* Reference-counted shared system task queues */ /* * taskqueue_get: Obtain a reference to a shared system task queue * running at the specified priority. The resulting task queue can be * used at any IPL up to and including IPL_VM. It must be released * with taskqueue_put -- do not pass it to taskqueue_destroy. May * sleep. */ int taskqueue_get(struct taskqueue **taskqueue_ret, pri_t pri) { const size_t i = taskqueue_pri_index(pri, PRI_COUNT); char buf[9]; struct taskqueue *taskqueue, *tmp = NULL; int error; ASSERT_SLEEPABLE(); /* Try to get it. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Not there. Drop the lock to create a new one. */ KASSERT(taskqueues[i].refcnt == 0); mutex_exit(&taskqueues_lock); /* Three digits will fit in the buffer. */ (void)snprintf(buf, sizeof buf, "taskq%d", (int)pri); error = _taskqueue_create(&tmp, buf, pri, IPL_VM, TASKQUEUE_PERCPU); if (error) return error; /* Try again, but we may have raced. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Nobody raced us. Commit tmp. */ KASSERT(taskqueues[i].refcnt == 0); taskqueue = taskqueues[i].taskqueue = tmp; tmp = NULL; } } /* Bump the reference count. */ if (taskqueues[i].refcnt == UINT_MAX) { mutex_exit(&taskqueues_lock); if (tmp != NULL) _taskqueue_destroy(tmp); return EBUSY; } taskqueues[i].refcnt++; mutex_exit(&taskqueues_lock); /* If we created tmp but didn't end up using it, destroy it. */ if (tmp != NULL) _taskqueue_destroy(tmp); KASSERT(taskqueue != NULL); *taskqueue_ret = taskqueue; return 0; } /* * taskqueue_put: Release a reference to a shared system taskqueue * obtained with taskqueue_get. If this is the last one, destroy it. * Do not pass a taskqueue created with taskqueue_create. May sleep. */ void taskqueue_put(struct taskqueue *taskqueue) { const size_t i = taskqueue_pri_index(taskqueue->tq_pri, PRI_COUNT); bool destroy = false; ASSERT_SLEEPABLE(); mutex_enter(&taskqueues_lock); KASSERT(taskqueues[i].taskqueue == taskqueue); KASSERT(0 < taskqueues[i].refcnt); if (--taskqueues[i].refcnt == 0) { taskqueues[i].taskqueue = NULL; destroy = true; } mutex_exit(&taskqueues_lock); if (destroy) _taskqueue_destroy(taskqueue); } /* Task queue creation */ /* * taskqueue_create: Create a task queue to run tasks at priority * task_pri which can be scheduled from IPL schedule_ipl. The task * queue must be destroyed with taskqueue_destroy -- do not pass it to * taskqueue_put. May sleep. * * For soft interrupt priorities, you must use taskqueue_get to get a * shared system task queue, not taskqueue_create. */ int taskqueue_create(struct taskqueue **taskqueue_ret, const char *name, pri_t task_pri, int schedule_ipl, int flags) { KASSERTMSG(((task_pri == PRI_NONE) || (task_pri <= MAXPRI_KTHREAD)), "priority too high for taskqueue_create(%s): %d", name, (int)task_pri); return _taskqueue_create(taskqueue_ret, name, task_pri, schedule_ipl, flags); } static int _taskqueue_create(struct taskqueue **taskqueue_ret, const char *name, pri_t task_pri, int schedule_ipl, int flags) { struct taskqueue *taskqueue; struct cpu_info *ci; CPU_INFO_ITERATOR cii; int task_ipl; int error; ASSERT_SLEEPABLE(); taskqueue = pool_cache_get(taskqueue_pc, PR_WAITOK); taskqueue->tq_percpu = percpu_alloc(sizeof(struct taskworker *)); if (taskqueue->tq_percpu == NULL) { error = ENOMEM; goto fail0; } taskqueue->tq_flags = flags; taskqueue->tq_pri = task_pri; taskqueue->tq_ipl = schedule_ipl; taskqueue->tq_name = name; KASSERT(MAXPRI_TASKQUEUE < task_pri); if (MINPRI_SOFTINT <= task_pri) { KASSERT(task_pri <= MAXPRI_SOFTINT); /* XXX This table should go somewhere else. */ if (PRI_SOFTSERIAL <= task_pri) task_ipl = SOFTINT_SERIAL; else if (PRI_SOFTNET <= task_pri) task_ipl = SOFTINT_NET; else if (PRI_SOFTBIO <= task_pri) task_ipl = SOFTINT_BIO; else task_ipl = SOFTINT_CLOCK; taskqueue->tq_type = TASKQUEUE_SOFTINT; } else { KASSERT(task_pri <= MAXPRI_KTHREAD); taskqueue->tq_type = TASKQUEUE_THREAD; } switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: taskqueue->tq_u.softint.cookie = softint_establish((task_ipl | SOFTINT_MPSAFE), &taskworker_softintr, taskqueue); if (taskqueue->tq_u.softint.cookie == NULL) { error = ENOMEM; goto fail1; } for (CPU_INFO_FOREACH(cii, ci)) { struct taskworker *const worker = pool_cache_get(taskworker_pc, PR_WAITOK); taskworker_init(worker, name, schedule_ipl, taskqueue); percpu_traverse_enter(); struct taskworker **const workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); *workerp = worker; percpu_traverse_exit(); } break; case TASKQUEUE_THREAD: { struct percpu *pool_percpu; error = taskthread_pool_percpu_get(&pool_percpu, task_pri); if (error) goto fail1; for (CPU_INFO_FOREACH(cii, ci)) { struct taskworker *const worker = pool_cache_get(taskworker_pc, PR_WAITOK); taskworker_init(worker, name, schedule_ipl, taskqueue); percpu_traverse_enter(); struct taskworker **const workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); struct taskthread_pool *const pool = percpu_getptr_remote(pool_percpu, ci); *workerp = worker; worker->tw_u.thread.pool = pool; worker->tw_u.thread.thread = NULL; percpu_traverse_exit(); } } default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } /* Success! */ *taskqueue_ret = taskqueue; return 0; fail1: percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); fail0: pool_cache_put(taskqueue_pc, taskqueue); return error; } /* Task queue destruction */ /* * taskqueue_destroy: Wait for all tasks on taskqueue to complete and * destroy it. The taskqueue must have been created with * taskqueue_create. May sleep. */ void taskqueue_destroy(struct taskqueue *taskqueue) { #if DIAGNOSTIC { const size_t i = taskqueue_pri_index(taskqueue->tq_pri, PRI_COUNT); mutex_enter(&taskqueues_lock); KASSERTMSG((taskqueues[i].taskqueue != taskqueue), "taskqueue_destroy used with shared taskqueue at pri %d", (int)taskqueue->tq_pri); mutex_exit(&taskqueues_lock); } #endif _taskqueue_destroy(taskqueue); } static void _taskqueue_destroy(struct taskqueue *taskqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); for (CPU_INFO_FOREACH(cii, ci)) { percpu_traverse_enter(); struct taskworker **const workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); struct taskworker *const worker = *workerp; percpu_traverse_exit(); taskworker_destroy(worker); pool_cache_put(taskworker_pc, worker); } switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: softint_disestablish(taskqueue->tq_u.softint.cookie); break; case TASKQUEUE_THREAD: taskthread_pool_percpu_put(taskqueue->tq_pri); break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); pool_cache_put(taskqueue_pc, taskqueue); } /* Task worker initialization and destruction */ static void taskworker_init(struct taskworker *worker, const char *name, int ipl, struct taskqueue *taskqueue) { mutex_init(&worker->tw_lock, MUTEX_DEFAULT, ipl); cv_init(&worker->tw_cv, name); worker->tw_taskqueue = taskqueue; /* worker->tw_u is initialized separately in taskqueue_create. */ TAILQ_INIT(&worker->tw_tasks); worker->tw_current_task = NULL; worker->tw_refcnt = 0; worker->tw_flags = 0; } static void taskworker_destroy(struct taskworker *worker) { mutex_enter(&worker->tw_lock); worker->tw_flags |= TASKWORKER_DYING; while (0 < worker->tw_refcnt) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); KASSERT(worker->tw_flags == 0); KASSERT(worker->tw_refcnt == 0); KASSERT(worker->tw_current_task == NULL); KASSERT(TAILQ_EMPTY(&worker->tw_tasks)); worker->tw_taskqueue = NULL; cv_destroy(&worker->tw_cv); mutex_destroy(&worker->tw_lock); } static int taskworker_hold(struct taskworker *worker) { unsigned int refcnt; do { refcnt = worker->tw_refcnt; if (refcnt == UINT_MAX) return EBUSY; } while (atomic_cas_uint(&worker->tw_refcnt, refcnt, (refcnt + 1)) != refcnt); return 0; } static void taskworker_rele(struct taskworker *worker) { unsigned int refcnt; do { refcnt = worker->tw_refcnt; KASSERT(0 < refcnt); if (refcnt == 1) { mutex_enter(&worker->tw_lock); refcnt = atomic_dec_uint_nv(&worker->tw_refcnt); KASSERT(refcnt != UINT_MAX); if (refcnt == 0) cv_signal(&worker->tw_cv); mutex_exit(&worker->tw_lock); return; } } while (atomic_cas_uint(&worker->tw_refcnt, refcnt, (refcnt - 1)) != refcnt); } /* Task thread pool per-CPU state reference counting */ static int taskthread_pool_percpu_get(struct percpu **percpup, pri_t pri) { const size_t i = taskqueue_pri_index(pri, PRI_COUNT_KTHREAD); struct percpu *percpu, *tmp = NULL; int error; ASSERT_SLEEPABLE(); mutex_enter(&taskthread_pools_lock); percpu = taskthread_pools[i].percpu; if (percpu == NULL) { mutex_exit(&taskthread_pools_lock); error = taskthread_pool_percpu_create(&tmp, pri); if (error) return error; KASSERT(tmp != NULL); mutex_enter(&taskthread_pools_lock); percpu = taskthread_pools[i].percpu; if (percpu == NULL) { percpu = taskthread_pools[i].percpu = tmp; tmp = NULL; } } KASSERT(percpu != NULL); if (taskthread_pools[i].refcnt == UINT_MAX) { mutex_exit(&taskthread_pools_lock); if (tmp != NULL) taskthread_pool_percpu_destroy(tmp); return EBUSY; } taskthread_pools[i].refcnt++; mutex_exit(&taskthread_pools_lock); if (tmp != NULL) taskthread_pool_percpu_destroy(tmp); KASSERT(percpu != NULL); *percpup = percpu; return 0; } static void taskthread_pool_percpu_put(pri_t pri) { const size_t i = taskqueue_pri_index(pri, PRI_COUNT_KTHREAD); struct percpu *pool_percpu = NULL; ASSERT_SLEEPABLE(); mutex_enter(&taskthread_pools_lock); KASSERT(0 < taskthread_pools[i].refcnt); if (--taskthread_pools[i].refcnt == 0) { pool_percpu = taskthread_pools[i].percpu; taskthread_pools[i].percpu = NULL; } mutex_exit(&taskthread_pools_lock); if (pool_percpu) taskthread_pool_percpu_destroy(pool_percpu); } /* Task thread pool per-CPU state creation and destruction */ static int taskthread_pool_percpu_create(struct percpu **percpup, pri_t pri) { struct percpu *percpu; struct taskthread_pool **poolp, *pool; struct cpu_info *ci; CPU_INFO_ITERATOR cii; unsigned int i, j; int error; percpu = percpu_alloc(sizeof(struct taskthread_pool *)); if (percpu == NULL) return ENOMEM; for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) { error = taskthread_pool_create(&pool, ci, pri); if (error) goto fail; percpu_traverse_enter(); poolp = percpu_getptr_remote(percpu, ci); *poolp = pool; percpu_traverse_exit(); } /* Success! */ *percpup = percpu; return 0; fail: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) { if (i <= j) break; percpu_traverse_enter(); poolp = percpu_getptr_remote(percpu, ci); pool = *poolp; percpu_traverse_exit(); taskthread_pool_destroy(pool); } percpu_free(percpu, sizeof(struct taskthread_pool *)); return error; } static void taskthread_pool_percpu_destroy(struct percpu *percpu) { struct taskthread_pool **poolp, *pool; struct cpu_info *ci; CPU_INFO_ITERATOR cii; for (CPU_INFO_FOREACH(cii, ci)) { percpu_traverse_enter(); poolp = percpu_getptr_remote(percpu, ci); pool = *poolp; percpu_traverse_exit(); taskthread_pool_destroy(pool); } percpu_free(percpu, sizeof(struct taskthread_pool *)); } /* Task thread pool creation and destruction */ static int taskthread_pool_create(struct taskthread_pool **poolp, struct cpu_info *ci, pri_t pri) { struct taskthread_pool *pool; struct lwp *lwp; int error; pool = pool_cache_get(taskthread_pool_pc, PR_WAITOK); mutex_init(&pool->ttp_lock, MUTEX_DEFAULT, IPL_VM); TAILQ_INIT(&pool->ttp_workers); TAILQ_INIT(&pool->ttp_idle_threads); pool->ttp_refcnt = 0; pool->ttp_flags = 0; pool->ttp_pri = pri; error = taskthread_pool_hold(pool); KASSERT(error == 0); pool->ttp_overseer.tt_lwp = NULL; pool->ttp_overseer.tt_pool = pool; pool->ttp_overseer.tt_worker = NULL; cv_init(&pool->ttp_overseer.tt_cv, "taskseer"); error = kthread_create(pri, KTHREAD_MPSAFE, ci, &taskworker_overseer_thread, &pool->ttp_overseer, &lwp, "taskoverseer/%u@%d", cpu_index(ci), (int)pri); if (error) goto fail0; /* Notify the overseer that it can start running. */ mutex_enter(&pool->ttp_lock); pool->ttp_overseer.tt_lwp = lwp; cv_signal(&pool->ttp_overseer.tt_cv); mutex_exit(&pool->ttp_lock); /* Success! */ *poolp = pool; return 0; fail0: cv_destroy(&pool->ttp_overseer.tt_cv); mutex_destroy(&pool->ttp_lock); pool_cache_put(taskthread_pool_pc, pool); return error; } static void taskthread_pool_destroy(struct taskthread_pool *pool) { struct taskthread *thread; /* Mark the pool dying and wait for threads to commit suicide. */ mutex_enter(&pool->ttp_lock); KASSERT(TAILQ_EMPTY(&pool->ttp_workers)); pool->ttp_flags |= TASKTHREAD_POOL_DYING; TAILQ_FOREACH(thread, &pool->ttp_idle_threads, tt_entry) cv_signal(&thread->tt_cv); while (0 < pool->ttp_refcnt) cv_wait(&pool->ttp_overseer.tt_cv, &pool->ttp_lock); mutex_exit(&pool->ttp_lock); KASSERT(pool->ttp_overseer.tt_worker == NULL); KASSERT(pool->ttp_overseer.tt_pool == pool); KASSERT(pool->ttp_flags == TASKTHREAD_POOL_DYING); KASSERT(pool->ttp_refcnt == 0); KASSERT(TAILQ_EMPTY(&pool->ttp_idle_threads)); KASSERT(TAILQ_EMPTY(&pool->ttp_workers)); cv_destroy(&pool->ttp_overseer.tt_cv); mutex_destroy(&pool->ttp_lock); pool_cache_put(taskthread_pool_pc, pool); } /* Task thread pool reference counting */ static int taskthread_pool_hold(struct taskthread_pool *pool) { unsigned int refcnt; do { refcnt = pool->ttp_refcnt; if (refcnt == UINT_MAX) return EBUSY; } while (atomic_cas_uint(&pool->ttp_refcnt, refcnt, (refcnt + 1)) != refcnt); return 0; } static void taskthread_pool_rele(struct taskthread_pool *pool) { unsigned int refcnt; do { refcnt = pool->ttp_refcnt; KASSERT(0 < refcnt); if (refcnt == 1) { mutex_enter(&pool->ttp_lock); refcnt = atomic_dec_uint_nv(&pool->ttp_refcnt); KASSERT(refcnt != UINT_MAX); if (refcnt == 0) cv_signal(&pool->ttp_overseer.tt_cv); mutex_exit(&pool->ttp_lock); return; } } while (atomic_cas_uint(&pool->ttp_refcnt, refcnt, (refcnt - 1)) != refcnt); } /* Task scheduling */ /* * task_schedule: Schedule task on the shared system low-priority task * queue. */ void task_schedule(struct task *task) { taskqueue_schedule(system_taskqueue, task); } /* * delayed_task_schedule: Schedule dt on the shared system low-priority * task queue with the specified delay. */ void delayed_task_schedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { taskqueue_schedule_delayed(system_taskqueue, dt, nsec, jitter); } /* * delayed_task_reschedule: Try to reschedule a delayed task to run the * specified time from now. It may be too late: the timeout may have * already fired. Return true if we successfully rescheduled, false if * it was too late. */ bool delayed_task_reschedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { struct taskworker *worker = dt->dt_task.task_worker; if (worker == NULL) /* It was not scheduled to begin with, so no reschedule. */ return false; mutex_enter(&worker->tw_lock); if (dt->dt_task.task_worker != worker) { /* Its worker already executed it, so too late. */ mutex_exit(&worker->tw_lock); return false; } if (worker->tw_current_task == &dt->dt_task) { /* The worker is already executing it, so too late. */ mutex_exit(&worker->tw_lock); return false; } if (callout_stop(&dt->dt_callout)) { /* The callout already fired, so too late. */ mutex_exit(&worker->tw_lock); return false; } /* We stopped the callout before it fired. Reschedule it. */ if (nsec == 0) { taskworker_maybe_switch(&worker, &dt->dt_task); taskworker_schedule(worker, &dt->dt_task); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } mutex_exit(&worker->tw_lock); return true; } /* * taskworker_maybe_switch: If *workerp is not the current CPU's * worker, switch task, which must currently be assigned to *workerp, * to the current CPU's worker instead, and store the current CPU's * worker in *workerp. Returns with *workerp's lock held, whether the * same worker as on input or the new worker. In the latter case, * taskworker_maybe_switch will have dropped the old worker's lock and * acquired the new worker's lock. * * This is needed so that it is on the queue that the softint handler * will examine when we softint_schedule, which triggers the softint on * the current CPU only. The alternative would be to use an xcall to * call softint_schedule on the remote CPU, but that requires sleeping, * which delayed_task_timeout and delayed_task_reschedule can't do. * * This is necessary only before calling taskworker_schedule with a * worker that for a softint task queue and is not guaranteed to be the * current CPU's worker. This is the case only in delayed_task_timeout * and delayed_task_reschedule. For thread task queues or when the * worker is guaranteed to be the current CPU's worker, this is not * needed. percpu_getref disables preemption, and spin locks do not * yield, so taskqueue_schedule and taskqueue_schedule_delayed are in * no danger of switching CPUs while they're not looking. */ static void taskworker_maybe_switch(struct taskworker **workerp, struct task *task) { if ((*workerp)->tw_taskqueue->tq_type != TASKQUEUE_SOFTINT) return; KASSERT(kpreempt_disabled()); KASSERT(mutex_owned(&(*workerp)->tw_lock)); KASSERT(task->task_worker == *workerp); KASSERT((*workerp)->tw_current_task != task); /* Get the current CPU's worker. */ struct taskworker **const curcpu_workerp = percpu_getref((*workerp)->tw_taskqueue->tq_percpu); struct taskworker *const curcpu_worker = *curcpu_workerp; percpu_putref((*workerp)->tw_taskqueue->tq_percpu); KASSERT(curcpu_worker->tw_taskqueue == (*workerp)->tw_taskqueue); /* If *workerp is already it, we're good. */ if (__predict_true(curcpu_worker == *workerp)) return; /* Otherwise, atomically switch workers and change locks. */ struct taskworker *const worker0 __diagused = atomic_swap_ptr(&task->task_worker, curcpu_worker); KASSERT(worker0 == *workerp); mutex_exit(&(*workerp)->tw_lock); *workerp = curcpu_worker; mutex_enter(&(*workerp)->tw_lock); KASSERT(task->task_worker == *workerp); } /* * delayed_task_timedout: Return true if dt ran after nonzero delay. */ bool delayed_task_timedout(struct delayed_task *dt) { KASSERTMSG((dt->dt_task.task_worker != NULL), "delayed_task_timedout(%p) must be called only from task action", dt); KASSERTMSG((dt->dt_task.task_worker->tw_current_task == &dt->dt_task), "delayed_task_timedout(%p) must be called only from task action", dt); return callout_invoking(&dt->dt_callout); } /* * taskqueue_schedule: Schedule task to run on taskqueue. If task is * already scheduled, don't change it. */ void taskqueue_schedule(struct taskqueue *taskqueue, struct task *task) { struct taskworker *worker0; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; mutex_enter(&worker->tw_lock); /* Try to grab the task. */ while (__predict_false((worker0 = atomic_cas_ptr(&task->task_worker, NULL, worker)) != NULL)) { /* * Someone else got in first. Switch to the other * worker's lock. */ if (worker0 != worker) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (__predict_true(task->task_worker == worker0)) { /* * Nobody else got in while we were locking. * The task is either on worker0's queue or * being run by worker0. */ if ((worker0->tw_current_task == task) && !ISSET(worker0->tw_flags, TASKWORKER_REQUEUED)) { /* * It's already begun to run. Queue it * up to run again. */ worker0->tw_flags |= TASKWORKER_REQUEUED; TAILQ_INSERT_TAIL(&worker0->tw_tasks, task, task_entry); } mutex_exit(&worker0->tw_lock); goto out; } if (worker0 != worker) { mutex_exit(&worker0->tw_lock); mutex_enter(&worker->tw_lock); } } /* We grabbed it. Put it on the queue. */ taskworker_schedule(worker, task); mutex_exit(&worker->tw_lock); out: percpu_putref(taskqueue->tq_percpu); } /* * taskqueue_schedule_delayed: Schedule the delayed task dt to run on * taskqueue after the specified number of nanoseconds. If it is * already scheduled, don't change it. taskqueue must run at a * priority no higher than PRI_SOFTCLOCK. If the timeout is zero, * schedule the task to run without delay. */ void taskqueue_schedule_delayed(struct taskqueue *taskqueue, struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { struct taskworker *worker0; KASSERTMSG((taskqueue->tq_pri <= PRI_SOFTCLOCK), "taskqueue %p priority too high for delayed task %p: %d", taskqueue, dt, (int)taskqueue->tq_pri); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; mutex_enter(&worker->tw_lock); while (__predict_false((worker0 = atomic_cas_ptr(&dt->dt_task.task_worker, NULL, worker)) != NULL)) { if (worker != worker0) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (__predict_true(dt->dt_task.task_worker == worker0)) { if ((worker0->tw_current_task == &dt->dt_task) && !ISSET(worker0->tw_flags, TASKWORKER_REQUEUED)) { worker0->tw_flags |= TASKWORKER_REQUEUED; if (nsec == 0) { worker0->tw_flags |= TASKWORKER_CALLOUT_ACK; TAILQ_INSERT_TAIL(&worker0->tw_tasks, &dt->dt_task, task_entry); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } } mutex_exit(&worker0->tw_lock); goto out; } if (worker != worker0) { mutex_exit(&worker0->tw_lock); mutex_enter(&worker->tw_lock); } } if (nsec == 0) { /* Clear any prior CALLOUT_INVOKING flag first. */ callout_ack(&dt->dt_callout); taskworker_schedule(worker, &dt->dt_task); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } mutex_exit(&worker->tw_lock); out: percpu_putref(taskqueue->tq_percpu); } static int nstohz(nsec_t nsec, jitter_t jitter) { /* * XXX Temporary stub until we get proper nanosecond-resolution * timer APIs. */ (void)jitter; return mstohz(nsec / 1000000u); } /* Task cancellation */ /* * task_cancel: Cancel task. If it has already begun, wait for it to * complete (or to call task_done). May drop interlock if it is * necessary to wait. May sleep. * * Returns true if task was scheduled and we prevented it from running. * Returns false if the task was not scheduled or had already begun. */ bool task_cancel(struct task *task, kmutex_t *interlock) { struct taskworker *const worker = task->task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); /* If it's not scheduled, it never ran. */ if (worker == NULL) return true; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == task) { mutex_exit(interlock); while ((worker->tw_current_task == task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* Got it before it ran! Remove it from the queue. */ TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* * task_cancel_async: Like task_cancel, but returns immediately even if * the task is in flight. Hence there is no need for an interlock. */ bool task_cancel_async(struct task *task) { struct taskworker *worker = task->task_worker; KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (worker->tw_current_task == task) { mutex_exit(&worker->tw_lock); return false; } TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* Delayed task cancellation */ /* * delayed_task_cancel: Cancel the delayed task dt. If it has already * begun, wait for it to complete (or to call task_done). May drop * interlock if it is necessary to wait, either for the callout to * complete or for the task to complete. May sleep. * * Returns true if dt was scheduled and we prevented it from running. * Returns false if dt was not scheduled or had already begun. */ bool delayed_task_cancel(struct delayed_task *dt, kmutex_t *interlock) { struct taskworker *worker = dt->dt_task.task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); /* If it's not scheduled, we can't cancel it. */ if (worker == NULL) return false; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (dt->dt_task.task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == &dt->dt_task) { mutex_exit(interlock); while ((worker->tw_current_task == &dt->dt_task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* * Try to cancel it without dropping the interlock. This is an * optimization; we don't need to do it. */ if (!callout_stop(&dt->dt_callout)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); return true; } /* * We have to drop the interlock here because we need to drop * the interlock and the worker lock in order to sleep if the * callout has fired, but callout_halt can't drop two locks. * * XXX An alternative would be to have a caller-supplied * interlock stored in the delayed task structure. That would * simplify the logic in scheduling delayed tasks, too. * However, it would complicate the API for callers that don't * need interlocking in task_cancel (XXX are there any, * really?). */ mutex_exit(interlock); /* Try to cancel the callout. */ if (!callout_halt(&dt->dt_callout, &worker->tw_lock)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; } /* It already fired. Try to cancel the task. */ if (worker->tw_current_task == &dt->dt_task) { /* The task is already running. Wait to complete. */ while ((worker->tw_current_task == &dt->dt_task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* The task has not yet run. Don't let it. */ TAILQ_REMOVE(&worker->tw_tasks, &dt->dt_task, task_entry); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; } /* * delayed_task_cancel_async: Like delayed_task_cancel, but returns * immediately even if the task is in flight. Hence there is no need * for an interlock. */ bool delayed_task_cancel_async(struct delayed_task *dt) { struct taskworker *worker = dt->dt_task.task_worker; KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (dt->dt_task.task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (!callout_stop(&dt->dt_callout)) { mutex_exit(&worker->tw_lock); return true; } if (worker->tw_current_task != &dt->dt_task) { TAILQ_REMOVE(&worker->tw_tasks, &dt->dt_task, task_entry); mutex_exit(&worker->tw_lock); return true; } mutex_exit(&worker->tw_lock); return false; } /* Worker scheduling */ /* * taskworker_schedule: Schedule worker to execute task, which has * already been assigned to worker. */ static void taskworker_schedule(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(task->task_worker == worker); /* Put it on the queue. */ TAILQ_INSERT_TAIL(&worker->tw_tasks, task, task_entry); /* Notify the softint or thread. */ switch (worker->tw_taskqueue->tq_type) { case TASKQUEUE_SOFTINT: KASSERT(kpreempt_disabled()); #if DIAGNOSTIC { struct taskworker **const workerp = percpu_getref(worker->tw_taskqueue->tq_percpu); KASSERT(worker == *workerp); percpu_putref(worker->tw_taskqueue->tq_percpu); } #endif softint_schedule(worker->tw_taskqueue->tq_u.softint.cookie); break; case TASKQUEUE_THREAD: if (__predict_false(worker->tw_u.thread.thread == NULL)) taskworker_assign_thread(worker); KASSERT(worker->tw_u.thread.thread != NULL); cv_signal(&worker->tw_cv); break; default: panic("taskqueue %p has invalid type: %d", worker->tw_taskqueue, (int)worker->tw_taskqueue->tq_type); } } /* * taskworker_assign_thread: Assign a thread to worker, which currently * has none assigned to it. If there are any idle threads, assign one * to it. If there are no idle threads, assign the overseer to handle * it. */ static void taskworker_assign_thread(struct taskworker *worker) { struct taskthread_pool *const pool = worker->tw_u.thread.pool; KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(worker->tw_u.thread.thread == NULL); mutex_enter(&pool->ttp_lock); if (__predict_false(TAILQ_EMPTY(&pool->ttp_idle_threads))) { worker->tw_u.thread.thread = &pool->ttp_overseer; TAILQ_INSERT_TAIL(&pool->ttp_workers, worker, tw_entry); } else { worker->tw_u.thread.thread = TAILQ_FIRST(&pool->ttp_idle_threads); TAILQ_REMOVE(&pool->ttp_idle_threads, worker->tw_u.thread.thread, tt_entry); worker->tw_u.thread.thread->tt_worker = worker; } cv_signal(&worker->tw_u.thread.thread->tt_cv); mutex_exit(&pool->ttp_lock); } /* Task execution */ /* * taskworker_run_1: Pick one task off queue and execute it. To be * called from a worker thread or softint handler. */ static void taskworker_run_1(struct taskworker *worker, struct task_head *queue) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(!TAILQ_EMPTY(queue)); /* Grab the task from the queue. */ struct task *const task = TAILQ_FIRST(queue); TAILQ_REMOVE(queue, task, task_entry); /* We had better not be working on anything yet. */ KASSERT(worker->tw_current_task == NULL); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)); /* Mark it current and run it. */ worker->tw_current_task = task; mutex_exit(&worker->tw_lock); (*task->task_fn)(task); mutex_enter(&worker->tw_lock); KASSERT(worker->tw_current_task == task); /* Mark it done. */ if (!ISSET(worker->tw_flags, TASKWORKER_DONE)) taskworker_done(worker, task); /* Can't touch task after this. */ KASSERT(ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_CALLOUT_ACK)); /* Clear the flags related to this task. */ worker->tw_flags &= ~(TASKWORKER_DONE | TASKWORKER_REQUEUED | TASKWORKER_CALLOUT_ACK); } /* * taskworker_done: Mark task done. The worker will not touch it after * this, and the task can be executed again if scheduled afterward. If * someone had already requeued the task, acknowledge this; otherwise, * disassociate the task from the worker. */ static void taskworker_done(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(worker->tw_current_task == task); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(task->task_worker == worker); /* XXX KLUDGE */ if (ISSET(worker->tw_flags, TASKWORKER_CALLOUT_ACK)) { struct delayed_task *const dt = container_of(task, struct delayed_task, dt_task); /* Clear the CALLOUT_INVOKING flag before it runs again. */ callout_ack(&dt->dt_callout); worker->tw_flags &= ~TASKWORKER_CALLOUT_ACK; } if (ISSET(worker->tw_flags, TASKWORKER_REQUEUED)) { worker->tw_flags &= ~TASKWORKER_REQUEUED; } else { struct taskworker *const worker0 __diagused = atomic_swap_ptr(&task->task_worker, NULL); /* Can't touch task after this. Notify task_cancel. */ KASSERT(worker0 == worker); worker->tw_current_task = NULL; cv_broadcast(&worker->tw_cv); } worker->tw_flags |= TASKWORKER_DONE; } /* Worker soft interrupt handler and delayed task callout */ static void taskworker_softintr(void *arg) { struct taskqueue *const taskqueue = arg; struct task_head queue = TAILQ_HEAD_INITIALIZER(queue); struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; percpu_putref(taskqueue->tq_percpu); mutex_enter(&worker->tw_lock); TAILQ_CONCAT(&queue, &worker->tw_tasks, task_entry); while (!TAILQ_EMPTY(&queue)) taskworker_run_1(worker, &queue); mutex_exit(&worker->tw_lock); } void delayed_task_timeout(void *arg) { struct delayed_task *const dt = arg; struct taskworker *worker = dt->dt_task.task_worker; /* We had better have been assigned a worker. */ KASSERT(worker != NULL); mutex_enter(&worker->tw_lock); /* We had better still be on this worker. */ KASSERT(dt->dt_task.task_worker == worker); if (worker->tw_current_task == &dt->dt_task) { /* * The task was already running and the timeout * triggered because it was scheduled again. Just put * it on the end of the queue and mark it requeued. */ if (!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)) worker->tw_flags |= TASKWORKER_REQUEUED; TAILQ_INSERT_TAIL(&worker->tw_tasks, &dt->dt_task, task_entry); } else { /* * Make sure we're putting it on the current worker's * CPU, and schedule it anew. */ taskworker_maybe_switch(&worker, &dt->dt_task); taskworker_schedule(worker, &dt->dt_task); } mutex_exit(&worker->tw_lock); } /* Worker thread */ static void __dead taskworker_thread(void *arg) { struct taskthread *const thread = arg; struct taskthread_pool *const pool = thread->tt_pool; /* Wait until we're initialized and on the queue. */ mutex_enter(&pool->ttp_lock); while (thread->tt_lwp == NULL) cv_wait(&thread->tt_cv, &pool->ttp_lock); mutex_exit(&pool->ttp_lock); for (;;) { bool suicide = false; /* * Wait until we are assigned a worker, or if there's * no work for a while, commit suicide. */ mutex_enter(&pool->ttp_lock); while (thread->tt_worker == NULL) { if (ISSET(pool->ttp_flags, TASKTHREAD_POOL_DYING)) { suicide = true; break; } if (cv_timedwait(&thread->tt_cv, &pool->ttp_lock, TASKWORKER_IDLE_TICKS)) { if (thread->tt_worker == NULL) { suicide = true; break; } } } if (__predict_false(suicide)) { KASSERT(thread->tt_worker == NULL); TAILQ_REMOVE(&pool->ttp_idle_threads, thread, tt_entry); } mutex_exit(&pool->ttp_lock); if (__predict_false(suicide)) { KASSERT(thread->tt_worker == NULL); break; } KASSERT(thread->tt_worker != NULL); struct taskworker *const worker = thread->tt_worker; mutex_enter(&worker->tw_lock); /* Set our lwp name to reflect whose work we're doing. */ lwp_lock(curlwp); char *const lwp_name = curlwp->l_name; curlwp->l_name = __UNCONST(worker->tw_taskqueue->tq_name); lwp_unlock(curlwp); /* Do all the tasks the worker has. */ for (;;) { while (!TAILQ_EMPTY(&worker->tw_tasks)) taskworker_run_1(worker, &worker->tw_tasks); /* * If the worker isn't dying, wait for more * work on this task queue for a little while, * but if none turns up soon, go idle. */ if (ISSET(worker->tw_flags, TASKWORKER_DYING) || cv_timedwait(&worker->tw_cv, &worker->tw_lock, TASKWORKER_IDLE_TICKS)) break; } /* Restore our lwp name. */ lwp_lock(curlwp); curlwp->l_name = lwp_name; lwp_unlock(curlwp); /* Remove our worker assignment and become idle. */ mutex_enter(&pool->ttp_lock); thread->tt_worker = NULL; TAILQ_INSERT_TAIL(&pool->ttp_idle_threads, thread, tt_entry); mutex_exit(&pool->ttp_lock); /* Release the worker. */ mutex_exit(&worker->tw_lock); taskworker_rele(worker); } cv_destroy(&thread->tt_cv); pool_cache_put(taskthread_pc, thread); taskthread_pool_rele(pool); kthread_exit(0); } /* Overseer thread */ static void __dead taskworker_overseer_thread(void *arg) { struct taskthread *const overseer = arg; struct taskthread_pool *const pool = overseer->tt_pool; struct lwp *lwp = NULL; int error; mutex_enter(&pool->ttp_lock); while (overseer->tt_lwp == NULL) cv_wait(&overseer->tt_cv, &pool->ttp_lock); for (;;) { /* Wait until a worker wants a thread. */ while (TAILQ_EMPTY(&pool->ttp_workers) && !ISSET(pool->ttp_flags, TASKTHREAD_POOL_DYING)) cv_wait(&overseer->tt_cv, &pool->ttp_lock); /* If the pool is going away, exit. */ if (__predict_false(ISSET(pool->ttp_flags, TASKTHREAD_POOL_DYING))) break; /* If there are no threads, we'll have to try to start one. */ if (TAILQ_EMPTY(&pool->ttp_idle_threads)) { error = taskthread_pool_hold(pool); if (error) { (void)kpause("taskpool", false, hz, &pool->ttp_lock); continue; } mutex_exit(&pool->ttp_lock); struct taskthread *const thread = pool_cache_get(taskthread_pc, PR_WAITOK); thread->tt_lwp = NULL; thread->tt_pool = pool; thread->tt_worker = NULL; cv_init(&thread->tt_cv, "taskthrd"); error = kthread_create(pool->ttp_pri, KTHREAD_MPSAFE, NULL, &taskworker_thread, thread, &lwp, "taskw/%u@%d", cpu_index(curcpu()), (int)pool->ttp_pri); mutex_enter(&pool->ttp_lock); if (error) { pool_cache_put(taskthread_pc, thread); taskthread_pool_rele(pool); /* XXX What to do to wait for memory? */ (void)kpause("tasklwp", false, hz, &pool->ttp_lock); continue; } KASSERT(lwp != NULL); TAILQ_INSERT_TAIL(&pool->ttp_idle_threads, thread, tt_entry); thread->tt_lwp = lwp; lwp = NULL; cv_signal(&thread->tt_cv); continue; } /* There are threads, so try getting a worker. */ bool rele_worker = true; struct taskworker *const worker = TAILQ_FIRST(&pool->ttp_workers); TAILQ_REMOVE(&pool->ttp_workers, worker, tw_entry); error = taskworker_hold(worker); if (error) { TAILQ_INSERT_TAIL(&pool->ttp_workers, worker, tw_entry); (void)kpause("taskwork", false, hz, &pool->ttp_lock); continue; } mutex_exit(&pool->ttp_lock); mutex_enter(&worker->tw_lock); /* If the worker drained, we'll no longer be its thread. */ if (worker->tw_u.thread.thread == overseer) { mutex_enter(&pool->ttp_lock); if (__predict_false( TAILQ_EMPTY(&pool->ttp_idle_threads))) { /* * Someone else snagged the thread * first. We'll have to try again. */ TAILQ_INSERT_HEAD(&pool->ttp_workers, worker, tw_entry); } else { /* * Assign the thread to the worker and * wake the thread so it starts work. */ struct taskthread *const thread = TAILQ_FIRST(&pool->ttp_idle_threads); TAILQ_REMOVE(&pool->ttp_idle_threads, thread, tt_entry); worker->tw_u.thread.thread = thread; cv_signal(&thread->tt_cv); /* Gave the thread our worker reference. */ rele_worker = false; } mutex_exit(&pool->ttp_lock); } mutex_exit(&worker->tw_lock); if (__predict_false(rele_worker)) taskworker_rele(worker); mutex_enter(&pool->ttp_lock); } mutex_exit(&pool->ttp_lock); taskthread_pool_rele(pool); kthread_exit(0); } /* Draining task queues */ struct drain { unsigned int d_count; struct taskworker d_worker; struct taskthread d_thread; struct drain_task *d_tasks; }; struct drain_task { struct task drt_task; struct drain *drt_drain; }; void taskqueue_drain(struct taskqueue *taskqueue) { xcfunc_t xcfunc; struct drain drain; /* We are going to sleep big-time. */ ASSERT_SLEEPABLE(); /* Set up some temporary state for draining. */ drain.d_count = ncpu; taskworker_init(&drain.d_worker, "tqdrain", taskqueue->tq_ipl, taskqueue); drain.d_worker.tw_u.thread.thread = &drain.d_thread; drain.d_thread.tt_lwp = curlwp; drain.d_thread.tt_pool = NULL; drain.d_thread.tt_worker = NULL; CTASSERT(MAXCPUS <= (SIZE_MAX / sizeof(drain.d_tasks[0]))); /* XXX Allocation to destroy is sketchy... */ drain.d_tasks = kmem_alloc((sizeof(drain.d_tasks[0]) * ncpu), KM_SLEEP); /* Do the per-CPU draining setup. */ switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: xcfunc = &taskqueue_drain_softint_xc; break; case TASKQUEUE_THREAD: xcfunc = &taskqueue_drain_thread_xc; break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } xc_wait(xc_broadcast(0, xcfunc, taskqueue, &drain)); mutex_enter(&drain.d_worker.tw_lock); /* Wait for the tasks or xcalls running on other CPUs to complete. */ while (0 < drain.d_count) cv_wait(&drain.d_worker.tw_cv, &drain.d_worker.tw_lock); /* Run all the tasks that couldn't run on other CPUs. */ while (!TAILQ_EMPTY(&drain.d_worker.tw_tasks)) taskworker_run_1(&drain.d_worker, &drain.d_worker.tw_tasks); mutex_exit(&drain.d_worker.tw_lock); /* Nuke the temporary drain state. */ kmem_free(&drain.d_tasks, (sizeof(drain.d_tasks[0]) * ncpu)); taskworker_destroy(&drain.d_worker); KASSERT(drain.d_count == 0); } static void drain_task(struct task *task) { struct drain_task *const drt = container_of(task, struct drain_task, drt_task); mutex_enter(&drt->drt_drain->d_worker.tw_lock); if (--drt->drt_drain->d_count == 0) cv_signal(&drt->drt_drain->d_worker.tw_cv); mutex_exit(&drt->drt_drain->d_worker.tw_lock); } static void taskqueue_drain_softint_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const drt = &drain->d_tasks[cpu_index(curcpu())]; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; /* * Just schedule the drain task -- softints are supposed to be * quick, are not allowed to sleep, and are not preemptive with * one another, so there is no point in pooling them. */ task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; taskworker_schedule(worker, &drt->drt_task); percpu_putref(taskqueue->tq_percpu); } static void taskqueue_drain_thread_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const drt = &drain->d_tasks[cpu_index(curcpu())]; struct task *task, *next; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; percpu_putref(taskqueue->tq_percpu); mutex_enter(&worker->tw_lock); if (worker->tw_u.thread.thread == NULL) { /* No tasks on this CPU. All done on this CPU. */ mutex_enter(&drain->d_worker.tw_lock); if (--drain->d_count == 0) cv_signal(&drain->d_worker.tw_cv); mutex_exit(&drain->d_worker.tw_lock); } else if (worker->tw_u.thread.thread == &worker->tw_u.thread.pool->ttp_overseer) { /* Tasks on this CPU but no thread to run them. */ /* Remove this worker from consideration. */ worker->tw_u.thread.thread = NULL; mutex_enter(&worker->tw_u.thread.pool->ttp_lock); TAILQ_REMOVE(&worker->tw_u.thread.pool->ttp_workers, worker, tw_entry); mutex_exit(&worker->tw_u.thread.pool->ttp_lock); /* Move all the tasks to our drain worker. */ mutex_enter(&drain->d_worker.tw_lock); TAILQ_FOREACH_SAFE(task, &worker->tw_tasks, task_entry, next) { TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); TAILQ_INSERT_TAIL(&drain->d_worker.tw_tasks, task, task_entry); } /* All done on this CPU. */ if (--drain->d_count == 0) cv_signal(&drain->d_worker.tw_cv); mutex_exit(&drain->d_worker.tw_lock); } else { /* We have a thread. Use it to schedule the drain task. */ struct taskworker *const worker0 __diagused = atomic_swap_ptr(&drt->drt_task.task_worker, worker); KASSERT(worker0 == NULL); KASSERT(worker->tw_current_task != &drt->drt_task); task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; taskworker_schedule(worker, &drt->drt_task); } mutex_exit(&worker->tw_lock); }