/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code does not run! I have not even compile-tested it. It is a * draft of an idea. See below the copyright notice for a summary. * * Third draft, with a shared pool of per-CPU task workers. */ /*- * Copyright (c) 2014 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /* * Tasks * * A /task/ is an action to be executed once asynchronously. Tasks are * cheap: each one is four pointers long, initializing them is quick, * and scheduling them requires constant time and writing only to * per-CPU memory. * * Tasks are executed by putting them on /task queues/ to be executed * by workers, either in soft interrupt context or in thread context, * and at any priority level, depending on the task queue. To choose a * task queue, either get one of the shared per-CPU system ones, * * struct taskqueue *taskqueue; * * error = taskqueue_get(&taskqueue, PRI_SOFTNET); * if (error) * goto fail; * ... * taskqueue_put(taskqueue); * * or create your own, for interrupt handlers running at IPL_BIO to run * tasks at priority PRI_NONE in per-CPU threads: * * error = taskqueue_create(&taskqueue, "mytaskq", PRI_NONE, IPL_BIO, * TASKQUEUE_PERCPU); * if (error) * goto fail; * ... * taskqueue_destroy(taskqueue); * * Once you have a task queue, and you have allocated storage for a * struct task, you can execute your task on the task queue by * initializing and scheduling the task: * * task_init(task, &my_task_action); * task_schedule(taskqueue, task); * * static void * my_task_action(struct task *task) * { * printf("my task ran!\n"); * } * * task_init and task_schedule never fail, and you can use them in any * context, including hard interrupt context, as long as the interrupt * priority level is not above the one used to create the task queue -- * IPL_BIO in the taskqueue_create example above, or IPL_VM for all the * shared system task queues. * * If you schedule a task multiple times before it runs, it will be * executed only once. If you schedule a task again after it has begun * executing, it will not begin executing again until its action has * either returned or chosen to call task_done. * * Once you are done with a task, and you have made sure it has * finished executing, you must destroy it with task_destroy: * * task_destroy(task); * * If you're not sure whether a task is scheduled or not, or whether it * has finished executing or not, you can cancel it and wait for it to * complete with task_cancel. There are two tricky details about task * cancellation: * * 1. The task might need a lock that the caller of task_cancel holds. * In that case, you must pass the lock to task_cancel so that it * can drop the lock before waiting for the task. * * 2. The task might be responsible for releasing a resource, even a * resource such as the memory containing its struct task. In that * case, if the task was about to run but is cancelled, task_cancel * returns true to indicate the caller must take responsibility for * the resource. Otherwise, task_cancel returns false. * * The following contrived example illustrates a pattern that might * arise in a device driver using tasks. Since the task action frees * the struct task, it must first call task_done before returning; * otherwise the worker running the task will continue to use the newly * freed memory. * * static void * my_driver_action(struct task *task) * { * printf("my driver's task ran!\n"); * mutex_enter(&sc->sc_lock); * KASSERT(sc->sc_curtask == task); * sc->sc_curtask = NULL; * mutex_exit(&sc->sc_lock); * task_done(task); * task_destroy(task); * kmem_free(task, sizeof(*task)); * } * * // Set up a task, if we need one. * struct task *tmp = kmem_alloc(sizeof(*task), KM_SLEEP); * mutex_enter(&sc->sc_lock); * if (sc->sc_curtask == NULL) { * sc->sc_curtask = tmp; * tmp = NULL; * task_init(task, &my_task_action); * task_schedule(taskqueue, &sc->sc_task); * } * mutex_exit(&sc->sc_lock); * if (tmp != NULL) * kmem_free(tmp, sizeof(*tmp)); * * ... * * // Cancel the task, if there is one. * struct task *task = NULL; * mutex_enter(&sc->sc_lock); * if (sc->sc_curtask != NULL) { * if (task_cancel(sc->sc_curtask, &sc->sc_lock)) { * // We cancelled it, so we have to clean it up. * task = sc->sc_curtask; * sc->sc_curtask = NULL; * } * } * mutex_exit(&sc->sc_lock); * if (task != NULL) { * task_destroy(task); * kmem_free(task, sizeof(*task)); * } * * If you haven't kept track of all your driver's tasks, but your * device is detaching and you used a shared system task queue, instead * of cancelling them all you can wait for them to complete by draining * the task queue with taskqueue_drain. * * Draining means waiting for any other tasks anyone else sharing the * task queue has scheduled on it, so tasks on the shared system task * queues are not allowed long sleeps! Nevertheless, taskqueue_drain * may take a long time simply because it must wait for all tasks on * the queue on every CPU to complete. * * static void * mydev_attach(device_t self, cfdata_t match, void *aux) * { * struct mydev_softc *sc = device_private(self); * int error; * ... * error = taskqueue_get(&sc->sc_taskqueue, PRI_SOFTNET); * if (error) { * sc->sc_taskqueue = NULL; * aprint_error_dev(self, "unable to get softnet taskqueue: %d\n", * error); * goto fail; * } * ... * } * * static void * mydev_detach(device_t self, int flags) * { * struct mydev_softc *sc = device_private(self); * ... * if (sc->sc_taskqueue != NULL) { * taskqueue_drain(sc->sc_taskqueue); * taskqueue_put(sc->sc_taskqueue); * sc->sc_taskqueue = NULL; * } * } * * === Appendix A (interface notes) * * Tasks are meant to replace most applications of workqueues, * callouts, and softints. However, that doesn't mean that those * abstractions will go away. This code itself is an application of * callouts and softints, and while scheduling a task is more flexible * than queueing work, the latter is even lighter-weight, so * applications not needing the extra flexibility may reasonably stay * with workqueues. * * The task abstraction was influenced by Linux workqueues, FreeBSD * taskqueues, NetBSD workqueues/callouts/softints, and design notes * for NetBSD kconts (`kernel continuations'). There are a few more * operations not mentioned in this little tutorial, including delayed * tasks; for details, see the comments below, above each routine. * * === Appendix B (implementation notes) * * For each priority at which there are task queues in use, and for * each CPU, there is a worker thread pool. A worker thread pool * consists of an overseer thread and any number of worker threads. * Each worker thread is either idle or running tasks from a single * task queue. * * When a task is scheduled, if there is a thread assigned to the task * queue on the CPU, then the task will be queued up for that thread to * run. If there is no thread assigned, and there are idle threads * available, then one of the idle threads will be chosen. If there * are no idle threads, then the overseer is asked to create a worker * thread for the task queue. * * XXX Haven't (yet?) implemented single-threaded task queues. * * Lock order: * * 1. callers' locks, including task_cancel interlock * 2. struct taskworker::tw_lock for normal workers * 3. struct taskworker::tw_lock for drain workers * 4. struct taskthread_pool::ttp_lock */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* Data structures */ TAILQ_HEAD(task_head, task); struct taskqueue { struct percpu *tq_percpu; /* struct taskworker * */ union { struct { void *cookie; } softint; } tq_type_u; enum taskqueue_type { TASKQUEUE_SOFTINT, TASKQUEUE_THREAD, } tq_type; int tq_flags; pri_t tq_pri; }; struct taskworker { kmutex_t tw_lock; kcondvar_t tw_cv; struct taskqueue *tw_taskqueue; union { struct { struct taskthread_pool *pool; struct taskthread *thread; } thread; } tw_type_u; TAILQ_ENTRY(taskworker) tw_entry; struct task_head *tw_tasks; struct task *tw_current_task; unsigned int tw_refcnt; int tw_flags; #define TASKWORKER_DONE 0x01 #define TASKWORKER_RESCHEDULED 0x02 }; struct taskthread_pool { kmutex_t ttp_lock; kcondvar_t ttp_cv; struct taskthread ttp_overseer; struct taskworker_head ttp_workers; struct taskthread_head ttp_idle_threads; unsigned int ttp_refcnt; int ttp_flags; #define TASKTHREAD_POOL_DYING 0x01 }; struct taskthread { struct lwp *tt_lwp; struct taskthread_pool *tt_pool; struct taskworker *tt_worker; }; /* Forward declarations */ static void task_error(struct task *); static int _taskqueue_create(struct taskqueue **, const char *, pri_t, int, int); static void taskworker_init(struct taskworker *); static void taskworker_destroy(struct taskworker *); static int taskworker_hold(struct taskworker *); static void taskworker_rele(struct taskworker *); static int taskthread_pool_percpu_get(struct percpu **, pri_t); static void taskthread_pool_percpu_put(pri_t); static int taskthread_pool_percpu_create(struct percpu **, pri_t); static void taskthread_pool_percpu_destroy(struct percpu *); static int taskthread_pool_create(struct taskthread_pool **, struct cpu_info *, pri_t); static void taskthread_pool_destroy(struct taskthread_pool *); static int taskthread_pool_hold(struct taskthread_pool *); static void taskthread_pool_rele(struct taskthread_pool *); static void taskworker_schedule(struct taskworker *, struct task *); static void taskworker_assign_thread(struct taskworker *); static void taskworker_run_1(struct taskworker *, struct task_head *); static void taskworker_done(struct taskworker *, struct task *); static void taskworker_softintr(void *); static void taskworker_thread(void *) __dead; static void taskworker_overseer_thread(void *) __dead; static task_fn_t drain_task; static void taskqueue_drain_softint_xc(void *, void *); static void taskqueue_drain_thread_xc(void *, void *); /* Global state and initialization */ static pool_cache_t taskqueues_pc; static pool_cache_t taskthread_pools_pc; static pool_cache_t taskthreads_pc; static kmutex_t taskqueues_lock __cacheline_aligned; static struct { struct taskqueue *taskqueue; unsigned int refcnt; } taskqueues[PRI_COUNT + 1]; /* XXX Where should these definitions go? */ #define MAXPRI_TASKQUEUE MAXPRI_SOFTINT #define MAXPRI_SOFTINT MAXPRI_KERNEL_RT #define MINPRI_SOFTINT PRI_SOFTCLOCK #define MAXPRI_THREAD (MINPRI_SOFTINT - 1) #define PRI_COUNT_THREAD (MAXPRI_THREAD + 1) static kmutex_t taskthread_pools_lock __cacheline_aligned; static struct { struct percpu *percpu; /* struct taskthread_pool * */ unsigned int refcnt; } taskthread_pools[PRI_COUNT_THREAD + 1]; /* * tasks_init: Initialize the task subsystem. */ void tasks_init(void) { taskqueues_pc = pool_cache_init(sizeof(struct taskqueue), 0, 0, 0, "taskqueues", NULL, IPL_NONE, NULL, NULL, NULL); taskthread_pools_pc = pool_cache_init(sizeof(struct taskthread_pool), 0, 0, 0, "taskthread_pools", NULL, IPL_NONE, NULL, NULL, NULL); taskthreads_pc = pool_cache_init(sizeof(struct taskthread), 0, 0, 0, "taskthreads", NULL, IPL_NONE, NULL, NULL, NULL); mutex_init(&taskqueues_lock, MUTEX_DEFAULT, IPL_NONE); mutex_init(&taskthread_pools_lock, MUTEX_DEFAULT, IPL_NONE); } /* Tasks */ static void task_error(struct task *task) { panic("destroyed task got run: %p", task); } /* * task_destroy: Destroy a task initialized with task_init. Must no * longer be scheduled. Use task_cancel first if this is not already * guaranteed. task_cancel_async is not enough. If you want to call * this inside a task callback, you must use task_done first. * * Not strictly necessary, but this is part of the documented API in * case it ever becomes necessary. */ void task_destroy(struct task *task) { KASSERTMSG((task->task_fn != &task_error), "task already destroyed: %p", task); KASSERTMSG((task->task_worker == NULL), "task still scheduled: %p", task); task->task_fn = task_error; } /* * delayed_task_destroy: Destroy a delayed task initialized with * delayed_task_init. Must no longer be scheduled. Use * delayed_task_cancel first if this is not already guaranteed. * delayed_task_cancel_async is not enough. If you want to call this * inside a task callback, you must use task_done first. */ void delayed_task_destroy(struct delayed_task *dt) { KASSERTMSG((!callout_pending(&dt->dt_callout)), "delayed task still pending timeout: %p", dt); KASSERTMSG((dt->dt_task.task_worker == NULL), "delayed task still scheduled: %p", dt); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); callout_destroy(&dt->dt_callout); task_destroy(&dt->dt_task); } /* * task_done: To be called in a task callback to notify the task worker * that the task is done and can be rescheduled. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct task. */ void task_done(struct task *task) { struct taskworker *const worker = task->task_worker; KASSERTMSG((worker != NULL), "task not running: %p", task); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't be completing: %p", task); mutex_enter(&worker->tw_lock); KASSERTMSG((task->task_worker == worker), "task %p changed workers: %p -> %p", task, worker, task->task_worker); KASSERTMSG((worker->tw_current_work == task), "task %p is not current on worker %p", task, worker); KASSERTMSG(!ISSET(worker->tw_flags, TASKWORKER_DONE), "task %p is already done on worker %p", task, worker); taskworker_done(worker, task); mutex_exit(&worker->tw_lock); } /* Reference-counted shared system task queues */ /* * taskqueue_get: Obtain a reference to a shared system task queue * running at the specified priority. The resulting task queue can be * used at any IPL up to and including IPL_VM. It must be released * with taskqueue_put -- do not pass it to taskqueue_destroy. May * sleep. */ int taskqueue_get(struct taskqueue **taskqueue_ret, pri_t pri) { unsigned int i; char buf[9]; struct taskqueue *taskqueue, *tmp = NULL; int error; ASSERT_SLEEPABLE(); if (pri == PRI_NONE) { i = PRI_COUNT; } else { KASSERT(pri < PRI_COUNT); i = pri; } /* Try to get it. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Not there. Drop the lock to create a new one. */ KASSERT(taskqueues[i].refcnt == 0); mutex_exit(&taskqueues_lock); /* Three digits will fit in the buffer. */ (void)snprintf(buf, sizeof buf, "taskq%d", pri); error = _taskqueue_create(&tmp, buf, pri, IPL_VM, TASKQUEUE_PERCPU); if (error) return error; /* Try again, but we may have raced. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Nobody raced us. Commit tmp. */ KASSERT(taskqueues[i].refcnt == 0); taskqueue = taskqueues[i].taskqueue = tmp; tmp = NULL; } } /* Bump the reference count. */ if (taskqueues[i].refcnt == UINT_MAX) { mutex_exit(&taskqueues_lock); if (tmp != NULL) taskqueue_destroy(tmp); return EBUSY; } taskqueues[i].refcnt++; mutex_exit(&taskqueues_lock); /* If we created tmp but didn't end up using it, destroy it. */ if (tmp != NULL) taskqueue_destroy(tmp); KASSERT(taskqueue != NULL); *taskqueue_ret = taskqueue; return 0; } /* Task queue creation */ /* * taskqueue_create: Create a task queue to run tasks at priority * task_pri which can be scheduled from IPL schedule_ipl. The task * queue must be destroyed with taskqueue_destroy -- do not pass it to * taskqueue_put. May sleep. * * For soft interrupt priorities, you must use taskqueue_get to get a * shared system task queue, not taskqueue_create. */ void taskqueue_create(struct taskqueue **taskqueue_ret, const char *name, pri_t task_pri, int schedule_ipl, int flags) { KASSERT(MAXPRI_THREAD < task_pri); return _taskqueue_create(taskqueue_ret, name, task_pri, schedule_ipl, flags); } static int _taskqueue_create(struct taskqueue **taskqueue_ret, const char *name, pri_t task_pri, int schedule_ipl, int flags) { struct taskqueue *taskqueue; struct cpu_info *ci; CPU_INFO_ITERATOR cii; int task_ipl; int error; ASSERT_SLEEPABLE(); taskqueue = pool_cache_get(&taskqueue_pc, PR_WAITOK); taskqueue->tq_percpu = percpu_alloc(sizeof(struct taskworker *)); if (taskqueue->tq_percpu == NULL) { error = ENOMEM; goto fail0; } taskqueue->tq_flags = flags; taskqueue->tq_pri = task_pri; KASSERT(MAXPRI_TASKQUEUE < task_pri); if (MINPRI_SOFTINT <= task_pri) { KASSERT(pri <= MAXPRI_SOFTINT); /* XXX This table should go somewhere else. */ if (PRI_SOFTSERIAL <= task_pri) task_ipl = SOFTINT_SERIAL; else if (PRI_SOFTNET <= task_pri) task_ipl = SOFTINT_NET; else if (PRI_SOFTBIO <= task_pri) task_ipl = SOFTINT_BIO; else task_ipl = SOFTINT_CLOCK; taskqueue->tq_type = TASKQUEUE_SOFTINT; } else { KASSERT(pri <= MAXPRI_THREAD); taskqueue->tq_type = TASKQUEUE_THREAD; } switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: taskqueue->tq_type_u.softint.cookie = softint_establish( (task_ipl | SOFTINT_MPSAFE), &taskworker_softintr, taskqueue); if (taskqueue->tq_type_u.softint.cookie == NULL) { error = ENOMEM; goto fail1; } for (CPU_INFO_FOREACH(cii, ci)) { struct taskworker *const worker = pool_cache_get(taskworker_pc, PR_WAITOK); taskworker_init(worker, name, schedule_ipl, taskqueue); percpu_traverse_enter(); struct taskworker **const workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); *workerp = worker; percpu_traverse_exit(); } break; case TASKQUEUE_THREAD: error = taskthread_pool_percpu_get(&pool_percpu, task_pri); if (error) goto fail1; for (CPU_INFO_FOREACH(cii, ci)) { struct taskworker *const worker = pool_cache_get(taskworker_pc, PR_WAITOK); taskworker_init(worker, name, schedule_ipl, taskqueue); percpu_traverse_enter(); struct taskworker **const workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); struct taskqueue_pool *const pool = percpu_getptr_remote(pool_percpu, ci); *workerp = worker; worker->tw_type_u.thread.pool = pool; percpu_traverse_exit(); } default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } /* Success! */ *taskqueue_ret = taskqueue; return 0; fail1: percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); fail0: pool_cache_put(&taskqueue_pc, taskqueue); return error; } /* Task queue destruction */ /* * taskqueue_destroy: Wait for all tasks on taskqueue to complete and * destroy it. The taskqueue must have been created with * taskqueue_create. May sleep. */ void taskqueue_destroy(struct taskqueue *taskqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); for (CPU_INFO_ITERATOR(cii, ci)) { percpu_traverse_enter(); struct taskworker **const workerp = percpu_remote_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; percpu_traverse_exit(); taskworker_destroy(worker); pool_cache_put(&taskworker_pc, worker); } switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: softint_disestablish(taskqueue->tq_type_u.softint.cookie); break; case TASKQUEUE_THREAD: taskthread_pool_percpu_put(taskqueue->tq_pri); break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); pool_cache_put(&taskqueue_pc, taskqueue); } /* * taskqueue_put: Release a reference to a shared system taskqueue * obtained with taskqueue_get. If this is the last one, destroy it. * Do not pass a taskqueue created with taskqueue_create. May sleep. */ void taskqueue_put(struct taskqueue *taskqueue) { unsigned int i; bool destroy = false; ASSERT_SLEEPABLE(); if (taskqueue->tq_pri == PRI_NONE) { i = PRI_COUNT; } else { KASSERT(taskqueue->tq_pri < PRI_COUNT); i = taskqueue->tq_pri; } mutex_enter(&taskqueues_lock); KASSERT(taskqueues[i].taskqueue == taskqueue); if (--taskqueues[i].refcnt == 0) { taskqueues[i].taskqueue = NULL; destroy = true; } mutex_exit(&taskqueues_lock); if (destroy) taskqueue_destroy(taskqueue); } /* Task worker initialization and destruction */ static void taskworker_init(struct taskworker *worker, const char *name, int ipl, struct taskqueue *taskqueue) { mutex_init(&worker->tw_lock, MUTEX_DEFAULT, ipl); cv_init(&worker->tw_cv, name); worker->tw_taskqueue = taskqueue; /* worker->tw_type_u is initialized separately in taskqueue_create. */ TAILQ_INIT(&worker->tw_tasks); worker->tw_current_task = NULL; worker->tw_refcnt = 0; worker->tw_flags = 0; } static void taskworker_destroy(struct taskworker *worker) { mutex_enter(&worker->tw_lock); while (0 < worker->tw_refcnt) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); KASSERT(worker->tw_flags == 0); KASSERT(worker->tw_refcnt == 0); KASSERT(worker->tw_current_task == NULL); worker->tw_taskqueue = NULL; cv_destroy(&worker->tw_cv); mutex_destroy(&worker->tw_lock); } static int taskworker_hold(struct taskworker *worker) { unsigned int refcnt; do { refcnt = worker->tw_refcnt; if (refcnt == UINT_MAX) return EBUSY; } while (atomic_cas_uint(&worker->tw_refcnt, refcnt, (refcnt + 1)) != refcnt); return 0; } static void taskworker_rele(struct taskworker *worker) { unsigned int refcnt; do { refcnt = worker->tw_refcnt; KASSERT(0 < refcnt); if (refcnt == 1) { mutex_enter(&worker->tw_lock); refcnt = atomic_dec_uint_nv(&worker->tw_refcnt); KASSERT(refcnt != UINT_MAX); if (refcnt == 0) cv_signal(&pool->ttp_cv); mutex_exit(&worker->tw_lock); return; } } while (atomic_cas_uint(&worker->tw_refcnt, refcnt, (refcnt - 1)) != refcnt); } /* Task thread pool per-CPU state reference counting */ static int taskthread_pool_percpu_get(struct percpu **percpup, pri_t pri) { unsigned int i; struct percpu *percpu, *tmp = NULL; ASSERT_SLEEPABLE(); if (pri == PRI_NONE) { i = PRI_COUNT_THREAD; } else { KASSERT(pri < PRI_COUNT_THREAD); i = pri; } mutex_enter(&taskthread_pools_lock); percpu = taskthread_pools[i].percpu; if (percpu == NULL) { mutex_exit(&taskthread_pools_lock); error = taskthread_pool_percpu_create(&tmp, pri); if (error) return error; KASSERT(tmp != NULL); mutex_enter(&taskthread_pools_lock); percpu = taskthread_pools[i].percpu; if (percpu == NULL) { percpu = taskthread_pools[i].percpu = tmp; tmp = NULL; } } KASSERT(percpu != NULL); if (taskthread_pools[i].refcnt == UINT_MAX) { mutex_exit(&taskthread_pools_lock); if (tmp != NULL) taskthread_pool_percpu_destroy(tmp); return EBUSY; } taskthread_pools[i].refcnt++; mutex_exit(&taskthread_pools_lock); if (tmp != NULL) taskthread_pool_percpu_destroy(tmp); KASSERT(percpu != NULL); *percpup = percpu; return 0; } static void taskthread_pool_percpu_put(pri_t i) { unsigned int i; bool destroy = false; ASSERT_SLEEPABLE(); if (pri == PRI_NONE) { i = PRI_COUNT_THREAD; } else { KASSERT(pri < PRI_COUNT_THREAD); i = pri; } mutex_enter(&taskthread_pools_lock); KASSERT(0 < taskthread_pools[i].refcnt); if (--taskthread_pools[i]->refcnt == 0) { taskthread_pools[i].percpu = NULL; destroy = true; } mutex_exit(&taskthread_pools_lock); if (destroy) taskthread_pool_percpu_destroy(pool); } /* Task thread pool per-CPU state creation and destruction */ static int taskthread_pool_percpu_create(struct percpu **percpup, pri_t pri) { struct percpu *percpu; struct taskthread_pool **poolp, *pool; struct cpu_info *ci; CPU_INFO_ITERATOR cii; unsigned int i, j; int error; percpu = percpu_alloc(sizeof(struct taskthread_pool *)); if (percpu == NULL) return ENOMEM; for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) { error = taskthread_pool_create(&pool, ci, pri); if (error) goto fail; percpu_traverse_enter(); poolp = percpu_getptr_remote(percpu, ci); *poolp = pool; percpu_traverse_exit(); } /* Success! */ *percpup = percpu; return 0; fail: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) { if (i <= j) break; percpu_traverse_enter(); poolp = percpu_getptr_remote(percpu, ci); pool = *poolp; percpu_traverse_exit(); taskthread_pool_destroy(pool); } percpu_free(percpu, sizeof(struct taskthread_pool *)); return error; } static void taskthread_pool_percpu_destroy(struct percpu *percpu) { struct taskthread_pool **poolp, *pool; struct cpu_info *ci; CPU_INFO_ITERATOR cii; for (CPU_INFO_FOREACH(cii, ci)) { percpu_traverse_enter(); poolp = percpu_getptr_remote(percpu, ci); pool = *poolp; percpu_traverse_exit(); taskthread_pool_destroy(pool); } percpu_free(percpu, sizeof(struct taskthread_pool *)); } /* Task thread pool creation and destruction */ static int taskthread_pool_create(struct taskthread_pool **poolp, struct cpu_info *ci, pri_t pri) { struct taskthread_pool *pool; struct lwp *lwp; int error; pool = pool_cache_get(&taskthread_pools_pc, PR_WAITOK); mutex_init(&pool->ttp_lock, MUTEX_DEFAULT, IPL_VM); cv_init(&pool->ttp_cv, "taskp%d"); TAILQ_INIT(&pool->ttp_workers); TAILQ_INIT(&pool->ttp_idle_threads); pool->ttp_refcnt = 0; pool->ttp_flags = 0; taskthread_pool_hold(pool); pool->ttp_overseer.tt_lwp = NULL; pool->ttp_overseer.tt_pool = pool; pool->ttp_overseer.tt_worker = NULL; error = kthread_create(pri, KTHREAD_MPSAFE, ci, &taskworker_overseer_thread, &pool->ttp_overseer, &lwp, "taskoverseer/%u@%d", cpu_index(ci), pri); if (error) goto fail0; /* Notify the overseer that it can start running. */ mutex_enter(&pool->ttp_lock); pool->ttp_overseer.tt_lwp = lwp; cv_signal(&pool->ttp_cv); mutex_exit(&pool->ttp_lock); /* Success! */ *poolp = pool; return 0; fail0: cv_destroy(&pool->ttp_cv); mutex_destroy(&pool->ttp_lock); pool_cache_put(&taskthread_pools_pc, pool); return error; } static void taskthread_pool_destroy(struct taskthread_pool *pool) { /* Mark the pool dying and wait for threads to commit suicide. */ mutex_enter(&pool->ttp_lock); KASSERT(TAILQ_EMPTY(&pool->ttp_workers)); ttp->ttp_flags |= TASKTHREAD_POOL_DYING; while (0 < ttp->ttp_refcnt) cv_wait(&pool->ttp_cv, &pool->ttp_lock); mutex_exit(&pool->ttp_lock); KASSERT(pool->ttp_overseer.tt_worker == NULL); KASSERT(pool->ttp_overseer.tt_pool == pool); KASSERT(pool->ttp_flags == TASKTHREAD_POOL_DYING); KASSERT(pool->ttp_refcnt == 0); KASSERT(TAILQ_EMPTY(&pool->ttp_idle_threads)); KASSERT(TAILQ_EMPTY(&pool->ttp_workers)); cv_destroy(&pool->ttp_cv); mutex_destroy(&pool->ttp_lock); pool_cache_put(&taskthread_pools_pc); } /* Task thread pool reference counting */ static int taskthread_pool_hold(struct taskthread_pool *pool) { unsigned int refcnt; do { refcnt = pool->ttp_refcnt; if (refcnt == UINT_MAX) return EBUSY; } while (atomic_cas_uint(&pool->ttp_refcnt, refcnt, (refcnt + 1)) != refcnt); return 0; } static void taskthread_pool_rele(struct taskthread_pool *pool) { unsigned int refcnt; do { refcnt = pool->ttp_refcnt; KASSERT(0 < refcnt); if (refcnt == 1) { mutex_enter(&pool->ttp_lock); refcnt = atomic_dec_uint_nv(&pool->ttp_refcnt); KASSERT(refcnt != UINT_MAX); if (refcnt == 0) cv_signal(&pool->ttp_cv); mutex_exit(&pool->ttp_lock); return; } } while (atomic_cas_uint(&pool->ttp_refcnt, refcnt, (refcnt - 1)) != refcnt); } /* Task scheduling */ /* * task_schedule: Schedule task to run on taskqueue. If task is * already scheduled, don't change it. */ void task_schedule(struct taskqueue *taskqueue, struct task *task) { struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; struct taskworker *worker0; mutex_enter(&worker->tw_lock); /* Try to grab the task. */ if ((worker0 = atomic_cas_ptr(&task->task_worker, NULL, worker)) != NULL) { /* Someone else got in first. */ if (worker != worker0) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (task->task_worker != worker0) /* worker0 already ran it. */ goto out_race; if (worker0->tw_current_task != task) /* worker0 is still scheduled to run it. */ goto out_race; if (ISSET(worker0->tw_flags, TASKWORKER_RESCHEDULED)) /* It's running and already rescheduled. */ goto out_race; /* * It's running. Put it back on the queue and notify * everyone else that it's been rescheduled. */ worker0->tw_flags |= TASKWORKER_RESCHEDULED; TAILQ_INSERT_TAIL(&worker0->tw_queue, task, task_entry); out_race: mutex_exit(&worker0->tw_lock); return; } /* We grabbed it. Put it on the queue. */ taskworker_schedule(worker, task); mutex_exit(&worker->tw_lock); percpu_putref(taskqueue->tq_percpu); } /* Delayed task scheduling */ /* * delayed_task_schedule_ticks: Schedule the delayed task dt to run on * taskqueue after the specified number of ticks. If it is already * scheduled, don't change it. taskqueue must run at a priority no * higher than PRI_SOFTCLOCK. If ticks is zero, schedule the task to * run without delay. */ void delayed_task_schedule_ticks(struct taskqueue *taskqueue, struct delayed_task *dt, int ticks) { struct taskworker *worker0; KASSERTMSG((taskqueue->tq_pri <= PRI_SOFTCLOCK), "taskqueue %p priority too high for delayed task %p: %"PRIdMAX, taskqueue, dt, (intmax_t)taskqueue->tq_pri); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); KASSERTMSG((0 <= ticks), "can't schedule delayed task %p in the past, %d ticks from now!", dt, ticks); if (ticks == 0) { task_schedule(taskqueue, &dt->dt_task); return; } struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; mutex_enter(&worker->tw_lock); if ((worker0 = atomic_cas_ptr(&dt->dt_task.task_worker, NULL, worker)) != NULL) { if (worker != worker0) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (dt->dt_task.task_worker != worker0) /* worker0 already ran it. */ goto out_race; if (worker0->tw_current_task != &dt->dt_task) /* worker0 is still scheduled to run it. */ goto out_race; if (ISSET(worker0->tw_flags, TASKWORKER_RESCHEDULED)) /* It's running and already rescheduled. */ goto out_race; /* * It's running. Reschedule the callout and mark it as * rescheduled in case anyone else tries scheduling it. */ worker0->tw_flags |= TASKWORKER_RESCHEDULED; callout_schedule(&dt->dt_callout, ticks); out_race: mutex_exit(&worker0->tw_lock); return; } /* Task is grabbed. Schedule it. */ callout_schedule(&dt->dt_callout, ticks); mutex_exit(&worker->tw_lock); } /* Task cancellation */ /* * task_cancel: Cancel task. If it has already begun, wait for it to * complete (or to call task_done). May drop interlock if it is * necessary to wait. May sleep. * * Returns true if task was scheduled and we prevented it from running. * Returns false if the task was not scheduled or had already begun. */ bool task_cancel(struct task *task, kmutex_t *interlock) { struct taskworker *const worker = task->task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); /* If it's not scheduled, it never ran. */ if (worker == NULL) return true; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == worker) { mutex_exit(interlock); while ((worker->tw_current_task == worker) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_done_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* Got it before it ran! Remove it from the queue. */ TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* * task_cancel_async: Like task_cancel, but returns immediately even if * the task is in flight. Hence there is no need for an interlock. */ bool task_cancel_async(struct task *task) { struct taskworker *worker = task->task_worker; KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (worker->tw_current_task == worker) { mutex_exit(&worker->tw_lock); return false; } TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* Delayed task cancellation */ /* * delayed_task_cancel: Cancel the delayed task dt. If it has already * begun, wait for it to complete (or to call task_done). May drop * interlock if it is necessary to wait, either for the callout to * complete or for the task to complete. May sleep. * * Returns true if dt was scheduled and we prevented it from running. * Returns false if dt was not scheduled or had already begun. */ bool delayed_task_cancel(struct delayed_task *dt, kmutex_t *interlock) { struct taskworker *worker = task->task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); /* If it's not scheduled, we can't cancel it. */ if (worker == NULL) return false; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == worker) { mutex_exit(interlock); while ((worker->tw_current_task == worker) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_done_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* * Try to cancel it without dropping the interlock. This is an * optimization; we don't need to do it. */ if (!callout_stop(&dt->dt_callout)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); return true; } /* * We have to drop the interlock here because we need to drop * the interlock and the worker lock in order to sleep if the * callout has fired, but callout_halt can't drop two locks. * * XXX An alternative would be to have a caller-supplied * interlock stored in the delayed task structure. That would * simplify the logic in scheduling delayed tasks, too. * However, it would complicate the API for callers that don't * need interlocking in task_cancel (XXX are there any, * really?). */ mutex_exit(interlock); /* Try to cancel the callout. */ if (!callout_halt(&dt->dt_callout, &worker->tw_lock)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; } /* It already fired. Try to cancel the task. */ if (worker->tw_current_task == worker) { /* The task is already running. Wait to complete. */ while ((worker->tw_current_work == worker) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_done_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* The task has not yet run. Don't let it. */ TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; } /* * delayed_task_cancel_async: Like delayed_task_cancel, but returns * immediately even if the task is in flight. Hence there is no need * for an interlock. */ bool delayed_task_cancel_async(struct delayed_task *dt) { struct taskworker *worker = task->task_worker; KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (!callout_stop(&dt->dt_callout)) { mutex_exit(&worker->tw_lock); return true; } if (worker->tw_current_task != worker) { TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); return true; } mutex_exit(&worker->tw_lock); return false; } /* Worker scheduling */ /* * taskworker_schedule: Schedule worker to execute task, which has * already been assigned to worker. */ static void taskworker_schedule(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(task->task_worker == worker); /* Put it on the queue. */ TAILQ_INSERT_TAIL(&worker->tw_queue, task, task_entry); /* Notify the softint or thread. */ switch (worker->tw_taskqueue->tq_type) { case TASKQUEUE_SOFTINT: /* XXX kassert the right queue */ softint_schedule(taskqueue->tq_u.softint.cookie); break; case TASKQUEUE_THREAD: if (__predict_false(worker->tw_thread == NULL)) taskworker_assign_thread(worker); KASSERT(worker->tw_thread != NULL); cv_signal(&worker->tw_cv); break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } } /* * taskworker_assign_thread: Assign a thread to worker, which currently * has none assigned to it. If there are any idle threads, assign one * to it. If there are no idle threads, assign the overseer to handle * it. */ static void taskworker_assign_thread(struct taskworker *worker) { struct taskthread_pool *const pool = worker->tw_type_u.thread.pool; KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(worker->tw_thread == NULL); mutex_enter(&pool->ttp_lock); if (TAILQ_EMPTY(&pool->ttp_idle_threads)) { worker->tw_type_u.thread.thread = &pool->ttp_overseer; TAILQ_INSERT_TAIL(&pool->ttp_workers, worker, tw_entry); } else { worker->tw_type_u.thread.thread = TAILQ_FIRST(&pool->ttp_idle_threads); TAILQ_REMOVE(&pool->ttp_idle_threads, worker->tw_type_u.thread.thread, tt_entry); worker->tw_type_u.thread.thread->tt_worker = worker; } cv_signal(&pool->ttp_cv); mutex_exit(&pool->ttp_lock); } /* Task execution */ /* * taskworker_run_1: Pick one task off queue and execute it. To be * called from a worker thread or softint handler. */ static void taskworker_run_1(struct taskworker *worker, struct task_head *queue) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(!TAILQ_EMPTY(queue)); /* Grab the task from the queue. */ struct task *const task = TAILQ_FIRST(queue); TAILQ_REMOVE(queue, task, task_entry); /* We had better not be working on anything yet. */ KASSERT(worker->tw_current_task == NULL); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_RESCHEDULE)); /* Mark it current and run it. */ worker->tw_current_task = task; mutex_exit(&worker->tw_lock); (*task->task_fn)(task); mutex_enter(&worker->tw_lock); KASSERT(worker->tw_current_task == task); /* Mark it done. */ if (!ISSET(worker->tw_flags, TASKWORKER_DONE)) taskworker_done(worker, task); /* Can't touch task after this. */ KASSERT(ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_RESCHEDULE)); /* Clear the flags related to this task. */ worker->tw_flags &= ~(TASKWORKER_DONE | TASKWORKER_RESCHEDULE); /* Notify anyone waiting to drain. */ worker->tw_current_task = NULL; cv_broadcast(&worker->tw_done_cv); } /* * taskworker_done: Mark task done. The worker will not touch it after * this, and the task can be executed again if rescheduled. If someone * had already asked to reschedule the task, acknowledge this; * otherwise, disassociate the task from the worker. */ static void taskworker_done(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(taskworker->tw_current_task == task); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(task->tw_worker == worker); if (ISSET(worker->tw_flags, TASKWORKER_RESCHEDULED)) { worker->tw_flags &= ~TASKWORKER_RESCHEDULE; } else { struct taskworker *const worker0 __diagused = atomic_swap_ptr(&task->task_worker, NULL); KASSERT(worker0 == worker); /* Can't touch task after this. */ } worker->tw_flags |= TASKWORKER_DONE; } /* Worker soft interrupt handler and delayed task callout */ static void taskworker_softintr(void *arg) { struct taskqueue *const taskqueue = arg; struct task_head queue = TAILQ_HEAD_INITIALIZER(&queue); struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; percpu_putref(taskqueue->tq_percpu); mutex_enter(&worker->tw_lock); /* XXX Is there a quicker way to grab everything off a tailq? */ while (!TAILQ_EMPTY(&worker->tw_queue)) { struct task *const task = TAILQ_FIRST(&worker->tw_queue); TAILQ_REMOVE(&worker->tw_queue, task, task_entry); TAILQ_INSERT_TAIL(&queue, task, task_entry); } while (!TAILQ_EMPTY(&queue)) taskworker_run_1(worker, &queue); mutex_exit(&worker->tw_lock); } void delayed_task_timeout(void *arg) { struct delayed_task *const dt = arg; struct worker *const worker = dt->dt_task.task_worker; /* We had better have been assigned a worker. */ KASSERT(worker != NULL); mutex_enter(&worker->tw_lock); /* The worker had better not be already executing us. */ KASSERT(worker->tw_current_task != &dt->dt_task); /* We had better still be on this worker. */ KASSERT(dt->dt_task.task_worker == worker); /* Schedule us. */ taskworker_schedule(worker, task); mutex_exit(&worker->tw_lock); } /* Worker thread */ static void __dead taskworker_thread(void *arg) { struct taskthread *const thread = arg; struct taskthread_pool *const pool = thread->tt_pool; /* Wait until we're initialized and on the queue. */ mutex_enter(&pool->ttp_lock); while (thread->tt_lwp == NULL) cv_wait(&pool->ttp_cv, &pool->ttp_lock); mutex_exit(&pool->ttp_lock); for (;;) { bool suicide = false; /* * Wait until we are assigned a worker, or if there's * no work for a while, commit suicide. */ mutex_enter(&pool->ttp_lock); while (thread->tt_worker == NULL) { if (ISSET(pool->ttp_flags, TASKTHREAD_POOL_DYING)) { suicide = true; break; } if (cv_timedwait(&pool->ttp_cv, &pool->ttp_lock, TASKWORKER_IDLE_TICKS)) { if (thread->tt_worker == NULL) { suicide = true; break; } } } if (suicide) { KASSERT(thread->tt_worker == NULL); TAILQ_REMOVE(&pool->ttp_idle_threads, thread, tt_entry); } mutex_exit(&pool->ttp_lock); if (suicide) { KASSERT(thread->tt_worker == NULL); break; } KASSERT(thread->tt_worker != NULL); /* Do all the tasks the worker has. */ struct taskworker *const worker = thread->tt_worker; kmutex_t *const lock = &worker->tw_lock; mutex_enter(lock); for (;;) { while (!TAILQ_EMPTY(&worker->tw_queue)) taskworker_run_1(worker, &worker->tw_queue); /* * Wait for more work on this task queue for a * little while, but if none turns up soon, go * idle. */ if (cv_timedwait(&worker->tw_cv, lock, TASKWORKER_IDLE_TICKS)) break; } mutex_enter(&pool->ttp_lock); thread->tt_worker = NULL; TAILQ_INSERT_TAIL(&pool->ttp_idle_threads, thread, tt_entry); mutex_exit(&pool->ttp_lock); mutex_exit(lock); } pool_cache_put(&taskthread_pc, thread); taskthread_pool_rele(pool); kthread_exit(0); } /* Overseer thread */ static void __dead taskworker_overseer_thread(void *arg) { struct taskthread *const overseer = arg; struct taskthread_pool *const pool = overseer->tt_pool; struct lwp *lwp = NULL; int error; mutex_enter(&pool->ttp_lock); while (overseer->tt_lwp == NULL) cv_wait(&pool->ttp_cv, &pool->ttp_lock); for (;;) { /* Wait until a worker wants a thread. */ while (TAILQ_EMPTY(&pool->ttp_workers) && !ISSET(pool->ttp_flags, TASKTHREAD_POOL_DYING)) cv_wait(&pool->ttp_cv, &pool->ttp_lock); /* If the pool is going away, exit. */ if (ISSET(pool->ttp_flags, TASKTHREAD_POOL_DYING)) break; /* If there are no threads, we'll have to try to start one. */ if (TAILQ_EMPTY(&pool->ttp_idle_threads)) { error = taskthread_pool_hold(pool); if (error) { (void)kpause("taskpool", false, hz, &pool->ttp_lock); continue; } mutex_exit(&pool->ttp_workers); struct taskthread *const thread = pool_cache_get(&taskthread_pc, PR_WAITOK); thread->tt_lwp = NULL; thread->tt_pool = pool; thread->tt_worker = NULL; error = kthread_create(pri, &taskthread, thread, &lwp, "taskw/%u@%d", cpu_index(curcpu()), pri); if (error) { pool_cache_put(&taskthread_pc, thread); /* XXX What to do to wait for memory? */ kpause("tasklwp", false, hz, NULL); } KASSERT(lwp != NULL); mutex_enter(&pool->ttp_lock); TAILQ_INSERT_TAIL(&pool->ttp_idle_threads, thread, tt_entry); thread->tt_lwp = lwp; lwp = NULL; cv_signal(&pool->ttp_cv); continue; } /* There are threads, so try getting a worker. */ struct taskworker *const worker = TAILQ_FIRST(&pool->ttp_workers); TAILQ_REMOVE(&pool->ttp_workers, worker, tw_entry); error = taskworker_hold(worker); if (error) { (void)kpause("taskwork", false, hz, &pool->ttp_lock); continue; } mutex_exit(&pool->ttp_lock); mutex_enter(&worker->tw_lock); /* If the worker drained, we'll no longer be its thread. */ if (worker->tw_type_u.thread.thread == overseer) { mutex_enter(&pool->ttp_lock); if (TAILQ_EMPTY(&pool->ttp_idle_threads)) { /* * Someone else snagged the thread * first. We'll have to try again. */ TAILQ_INSERT_HEAD(&pool->ttp_workers, worker, tw_entry); } else { /* * Assign the thread to the worker and * wake the thread so it starts work. */ struct taskthread *const thread = TAILQ_FIRST(&pool->ttp_idle_threads); TAILQ_REMOVE(&pool->ttp_idle_threads, thread, tt_entry); worker->tw_type_u.thread.thread = thread; cv_signal(&pool->ttp_cv); } mutex_exit(&pool->ttp_lock); } mutex_exit(&worker->tw_lock); taskworker_rele(worker); mutex_enter(&pool->ttp_lock); } mutex_exit(&pool->ttp_lock); taskthread_pool_rele(pool); kthread_exit(0); } /* Draining task queues */ struct drain { unsigned int d_count; struct taskworker d_worker; struct taskthread d_thread; }; struct drain_task { struct task drt_task; struct drain *drt_drain; }; void taskqueue_drain(struct taskqueue *taskqueue) { xcfunc_t xcfunc; struct drain drain; /* We are going to sleep big-time. */ ASSERT_SLEEPABLE(); /* Set up some temporary state for draining. */ drain->d_count = taskqueue->tq_nworkers; taskworker_init(&drain->d_worker, "tqdrain", taskqueue->tq_ipl, taskqueue); drain->d_worker.tw_type_u.thread.thread = &drain->d_thread; drain->d_thread.tt_lwp = curlwp; drain->d_thread.tt_pool = NULL; drain->d_thread.tt_worker = NULL; CTASSERT(MAXCPUS <= (SIZE_MAX / sizeof(drain->d_tasks[0]))); /* XXX Allocation to destroy is sketchy... */ drain->d_tasks = kmem_alloc((sizeof(drain->d_tasks[0]) * ncpu), KM_SLEEP); /* Do the per-CPU draining setup. */ switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: xcfunc = &taskqueue_drain_softint_xc; break; case TASKQUEUE_THREAD: xcfunc = &taskqueue_drain_thread_xc; break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } xc_wait(xc_broadcast(0, xcfunc, taskqueue, &drain)); mutex_enter(&drain->d_worker.tw_lock); /* Wait for the tasks or xcalls running on other CPUs to complete. */ while (0 < drain->d_count) cv_wait(&drain->d_worker.tw_cv, &drain->d_worker.tw_lock); /* Run all the tasks that couldn't run on other CPUs. */ while (!TAILQ_EMPTY(&drain->d_worker.tw_queue)) taskworker_run_1(&drain->d_worker, &drain->d_worker.tw_queue); mutex_exit(&drain->d_worker.tw_lock); /* Nuke the temporary drain state. */ kmem_free(&drain->d_tasks, (sizeof(drain->d_tasks[0]) * ncpu)); taskworker_destroy(&drain->d_worker); KASSERT(drain->d_count == 0); } static void drain_task(struct task *task) { struct drain_task *const drt = container_of(task, struct drain_task, drt_task); mutex_enter(&drt->drt_drain->d_worker.tw_lock); if (--drt->drt_drain->d_count == 0) cv_signal(&drt->drt_drain->d_worker.tw_cv); mutex_exit(&drt->drt_drain->d_worker.tw_lock); } static void taskqueue_drain_softint_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const drt = &drain->d_tasks[cpu_index(curcpu)]; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const workerp = *workerp; percpu_putref(taskqueue->tq_percpu); /* * Just schedule the drain task -- nobody is allowed to sleep * in softints, so we needn't worry about juggling workers. */ task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; taskworker_schedule(worker, &drt->drt_task); } static void taskqueue_drain_thread_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const drt = &drain->d_tasks[cpu_index(curcpu)]; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const workerp = *workerp; percpu_putref(taskqueue->tq_percpu); mutex_enter(&worker->tw_lock); if (worker->tw_type_u.thread.thread == NULL) { /* No tasks on this CPU. All done on this CPU. */ mutex_enter(&drain->d_worker.tw_lock); if (--drain->d_count == 0) cv_signal(&drain->d_cv); mutex_exit(&drain->d_worker.tw_lock); } else if (worker->tw_type_u.thread.thread == &worker->tw_type_u.thread.pool->ttp_overseer) { /* Tasks on this CPU but no thread to run them. */ worker->tw_type_u.thread.thread = NULL; /* Remove this worker from consideration. */ mutex_enter(&worker->tw_type_u.thread.pool->ttp_lock); TAILQ_REMOVE(&pool->ttp_workers, worker, tw_entry); mutex_exit(&worker->tw_type_u.thread.pool->ttp_lock); /* Move all the tasks to our drain worker. */ mutex_enter(&drain->d_worker.tw_lock); TAILQ_FOREACH_SAFE(task, &worker->tw_tasks, task_entry, next) { TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); TAILQ_INSERT_TAIL(&drain->d_worker.tw_tasks, task, task_entry); } /* All done on this CPU. */ if (--drain->d_count == 0) cv_signal(&drain->d_cv); mutex_exit(&drain->d_worker.tw_lock); } else { /* We have a thread. Use it to schedule the drain task. */ struct taskworker *const worker0 __diagused = atomic_swap_ptr(&drt->drt_task.task_worker, worker); KASSERT(worker0 == worker); KASSERT(worker->tw_current_task != &drt->drt_task); task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; taskworker_schedule(worker, task); } mutex_exit(&worker->tw_lock); }