/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code is not tested! It is a draft of an idea. See below the * copyright notice for a summary. * * Fifth draft, using threadpool(9). */ /*- * Copyright (c) 2014, 2015, 2020 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /* * XXX Haven't (yet?) implemented single-threaded task queues. Would * be more bookkeeping, but could reduce the footprint of seldom-used * task queues by a factor of ncpu. * * XXX Haven't (yet?) implemented unbound task queues. Still more * bookkeeping, but maybe worthwhile. * * XXX Hmm... Can the worker queues be managed without locks, by * restricting access to the local CPU? That would make task_cancel * difficult, and cross-CPU requeueing logic would be hairy. * * XXX Offline and online CPUs? Ugh... * * Lock order: * * 1. callers' locks, including task_cancel interlock * 2. struct taskworker::tw_lock for one normal worker at a time * 3. struct taskworker::tw_lock for one drain worker at a time */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define TASKQUEUE_IDLE_TICKS mstohz(100) /* SDT probes */ SDT_PROBE_DEFINE1(sdt, kernel, task, init, "struct task *"/*task*/); SDT_PROBE_DEFINE1(sdt, kernel, task, done, "struct task *"/*task*/); SDT_PROBE_DEFINE1(sdt, kernel, task, destroy, "struct task *"/*task*/); SDT_PROBE_DEFINE1(sdt, kernel, delayed__task, init, "struct delayed_task *"/*dt*/); SDT_PROBE_DEFINE1(sdt, kernel, delayed__task, done, "struct delayed_task *"/*dt*/); SDT_PROBE_DEFINE1(sdt, kernel, delayed__task, destroy, "struct delayed_task *"/*dt*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, get, "pri_t"/*pri*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, get__create, "pri_t"/*pri*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, get__race, "pri_t"/*pri*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, put, "pri_t"/*pri*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, put__destroy, "pri_t"/*pri*/); SDT_PROBE_DEFINE4(sdt, kernel, taskqueue, create, "const char *"/*name*/, "pri_t"/*pri*/, "int"/*ipl*/, "int"/*flags*/); SDT_PROBE_DEFINE5(sdt, kernel, taskqueue, create__success, "const char *"/*name*/, "pri_t"/*pri*/, "int"/*ipl*/, "int"/*flags*/, "struct taskqueue *"/*taskqueue*/); SDT_PROBE_DEFINE5(sdt, kernel, taskqueue, create__failure, "const char *"/*name*/, "pri_t"/*pri*/, "int"/*ipl*/, "int"/*flags*/, "int"/*error*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, destroy, "struct taskqueue *"/*taskqueue*/); SDT_PROBE_DEFINE1(sdt, kernel, task, schedule, "struct task *"/*task*/); SDT_PROBE_DEFINE3(sdt, kernel, delayed__task, schedule, "struct delayed_task *"/*dt*/, "nsec_t"/*nsec*/, "jitter_t"/*jitter*/); SDT_PROBE_DEFINE3(sdt, kernel, delayed__task, reschedule, "struct delayed_task *"/*dt*/, "nsec_t"/*nsec*/, "jitter_t"/*jitter*/); SDT_PROBE_DEFINE3(sdt, kernel, task, switch, "struct task *"/*task*/, "struct cpu_info *"/*from_cpu*/, "struct cpu_info *"/*to_cpu*/); SDT_PROBE_DEFINE2(sdt, kernel, taskqueue, schedule, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/); SDT_PROBE_DEFINE2(sdt, kernel, taskqueue, schedule__race, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/); SDT_PROBE_DEFINE3(sdt, kernel, taskqueue, schedule__requeue, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/, "struct cpu_info *"/*cpu*/); SDT_PROBE_DEFINE3(sdt, kernel, taskqueue, schedule__enqueue, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/, "struct cpu_info *"/*cpu*/); SDT_PROBE_DEFINE4(sdt, kernel, taskqueue, schedule__delayed, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/, "nsec_t"/*nsec*/, "jitter_t"/*jitter*/); SDT_PROBE_DEFINE4(sdt, kernel, taskqueue, schedule__delayed__race, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/, "nsec_t"/*nsec*/, "jitter_t"/*jitter*/); SDT_PROBE_DEFINE5(sdt, kernel, taskqueue, schedule__delayed__requeue, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/, "nsec_t"/*nsec*/, "jitter_t"/*jitter*/, "struct cpu_info *"/*cpu*/); SDT_PROBE_DEFINE5(sdt, kernel, taskqueue, schedule__delayed__enqueue, "struct taskqueue *"/*taskqueue*/, "struct task *"/*task*/, "nsec_t"/*nsec*/, "jitter_t"/*jitter*/, "struct cpu_info *"/*cpu*/); SDT_PROBE_DEFINE2(sdt, kernel, task, cancel, "struct task *"/*task*/, "kmutex_t *"/*interlock*/); SDT_PROBE_DEFINE2(sdt, kernel, task, cancel__wait, "struct task *"/*task*/, "kmutex_t *"/*interlock*/); SDT_PROBE_DEFINE3(sdt, kernel, task, cancelled, "struct task *"/*task*/, "kmutex_t *"/*interlock*/, "bool"/*success*/); SDT_PROBE_DEFINE1(sdt, kernel, task, run, "struct task *"/*task*/); SDT_PROBE_DEFINE1(sdt, kernel, delayed__task, timeout, "struct delayed_task *"/*dt*/); SDT_PROBE_DEFINE2(sdt, kernel, delayed__task, timeout__enqueue, "struct delayed_task *"/*dt*/, "struct cpu_info *"/*ci*/); SDT_PROBE_DEFINE2(sdt, kernel, delayed__task, timeout__requeue, "struct delayed_task *"/*dt*/, "struct cpu_info *"/*ci*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, drain__start, "struct taskqueue *"/*taskqueue*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, drain__done, "struct taskqueue *"/*taskqueue*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, drain__cpu, "struct taskqueue *"/*taskqueue*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, drain__cpu__migrate, "struct taskqueue *"/*taskqueue*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, drain__cpu__wait, "struct taskqueue *"/*taskqueue*/); SDT_PROBE_DEFINE1(sdt, kernel, taskqueue, drain__cpu__done, "struct taskqueue *"/*taskqueue*/); /* Data structures */ struct taskqueue { struct percpu *tq_percpu; /* struct taskworker * */ union { struct { void *cookie; } softint; struct { struct threadpool_percpu *pool_percpu; } thread; } tq_u; enum taskqueue_type { TASKQUEUE_SOFTINT, TASKQUEUE_THREAD, } tq_type; int tq_flags; pri_t tq_pri; int tq_ipl; const char *tq_name; }; struct taskworker { kmutex_t tw_lock; kcondvar_t tw_cv; struct taskqueue *tw_taskqueue; TAILQ_HEAD(, task) tw_tasks; struct task *tw_current_task; struct cpu_info *tw_cpu; int tw_flags; #define TASKWORKER_DONE 0x01 #define TASKWORKER_REQUEUED 0x02 #define TASKWORKER_CALLOUT_ACK 0x04 /* XXX KLUDGE */ #define TASKWORKER_DYING 0x08 }; struct taskworker_softint { struct taskworker tws_worker; }; struct taskworker_thread { struct taskworker twt_worker; struct threadpool_job twt_job; }; /* Forward declarations */ static size_t taskqueue_pri_index(pri_t); static void task_error(struct task *); static int _taskqueue_create(struct taskqueue **, const char *, pri_t, int, int); static void _taskqueue_destroy(struct taskqueue *); static int taskqueue_init_softint(struct taskqueue *); static int taskqueue_init_thread(struct taskqueue *); static void taskqueue_cpu_init(void *, void *, struct cpu_info *); static void taskqueue_cancel_cpu(struct taskqueue *, struct cpu_info *); static void taskqueue_cpu_fini(void *, void *, struct cpu_info *); static void taskworker_init(struct taskworker *, const char *, struct taskqueue *, struct cpu_info *); static void taskworker_destroy(struct taskworker *, struct cpu_info *); static struct taskworker * current_taskworker(struct taskqueue *); static int nstohz(nsec_t, jitter_t); static void taskworker_enqueue(struct taskworker *, struct task *); static void taskworker_requeue(struct taskworker *, struct task *); static void taskworker_enqueue_delayed(struct taskworker *, struct delayed_task *, nsec_t, jitter_t); static void taskworker_requeue_delayed(struct taskworker *, struct delayed_task *, nsec_t, jitter_t); static void taskworker_maybe_switch(struct taskworker **, struct task *); static void taskworker_schedule(struct taskworker *, struct task *); static void taskworker_run_1(struct taskworker *); static void taskworker_done(struct taskworker *, struct task *); static void taskworker_softintr(void *); static void taskworker_job(struct threadpool_job *); static task_fn_t drain_task; static void taskqueue_drain_softint_xc(void *, void *); static void taskqueue_drain_thread_xc(void *, void *); /* Global state and initialization */ static struct pool taskqueue_pool; static struct pool taskworker_softint_pool; static struct pool taskworker_thread_pool; static kmutex_t taskqueues_lock __cacheline_aligned; static struct { struct taskqueue *taskqueue; uint64_t refcnt; } taskqueues[PRI_COUNT + 1]; /* XXX Where should these definitions go? */ #define MAXPRI_TASK MAXPRI_SOFTINT #define MAXPRI_SOFTINT MAXPRI_KERNEL_RT #define MINPRI_SOFTINT PRI_SOFTCLOCK #define PRI_COUNT_KTHREAD (MAXPRI_KTHREAD + 1) struct taskqueue *system_taskqueue __read_mostly; /* * tasks_init: Initialize the task subsystem. */ void tasks_init(void) { int error; pool_init(&taskqueue_pool, sizeof(struct taskqueue), 0, 0, 0, "taskq", NULL, IPL_NONE); pool_init(&taskworker_softint_pool, sizeof(struct taskworker_softint), 0, 0, 0, "taskwsih", NULL, IPL_NONE); pool_init(&taskworker_thread_pool, sizeof(struct taskworker_thread), 0, 0, 0, "taskwthr", NULL, IPL_NONE); mutex_init(&taskqueues_lock, MUTEX_DEFAULT, IPL_NONE); error = taskqueue_get(&system_taskqueue, PRI_NONE); if (error) panic("failed to create system task queue: %d", error); } static size_t taskqueue_pri_index(pri_t pri) { if (pri == PRI_NONE) { return (PRI_COUNT + 1); } else { KASSERTMSG((0 <= pri), "negative priority: %d", (int)pri); KASSERTMSG((pri <= MAXPRI_TASK), "priority out of range: %d", (int)pri); return pri; } } /* Tasks */ static void task_error(struct task *task) { panic("destroyed task got run: %p", task); } /* * task_destroy: Destroy a task initialized with task_init. Must no * longer be scheduled. Use task_cancel first if this is not already * guaranteed. task_cancel_async is not enough. If you want to call * this inside a task callback, you must use task_done first. * * Not strictly necessary, but this is part of the documented API in * case it ever becomes necessary. */ void task_destroy(struct task *task) { KASSERTMSG((task->task_fn != &task_error), "task already destroyed: %p", task); KASSERTMSG((task->task_worker == NULL), "task still scheduled or running: %p", task); SDT_PROBE1(sdt, kernel, task, destroy, task); task->task_fn = task_error; } /* * task_done: To be called in a task callback to notify the task worker * that the task is done and can be scheduled again. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct task. */ void task_done(struct task *task) { struct taskworker *worker = task->task_worker; KASSERTMSG((worker != NULL), "task not running: %p", task); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't be completing: %p", task); mutex_enter(&worker->tw_lock); KASSERTMSG((task->task_worker == worker), "task %p changed workers: %p -> %p", task, worker, task->task_worker); KASSERTMSG((worker->tw_current_task == task), "task %p is not current on worker %p", task, worker); KASSERTMSG(!ISSET(worker->tw_flags, TASKWORKER_DONE), "task %p is already done on worker %p", task, worker); taskworker_done(worker, task); mutex_exit(&worker->tw_lock); } /* Delayed tasks */ /* * delayed_task_destroy: Destroy a delayed task initialized with * delayed_task_init. Must no longer be scheduled. Use * delayed_task_cancel first if this is not already guaranteed. * delayed_task_cancel_async is not enough. If you want to call this * inside a task callback, you must use delayed_task_done first. */ void delayed_task_destroy(struct delayed_task *dt) { KASSERTMSG((!callout_pending(&dt->dt_callout)), "delayed task still pending timeout: %p", dt); KASSERTMSG((dt->dt_task.task_worker == NULL), "delayed task still scheduled or running: %p", dt); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task already destroyed: %p", dt); callout_destroy(&dt->dt_callout); task_destroy(&dt->dt_task); } /* * delayed_task_done: To be called in a delayed task callback to notify * the task worker that the task is done and can be scheduled again. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct delayed_task. */ void delayed_task_done(struct delayed_task *dt) { task_done(&dt->dt_task); } /* * delayed_task_timedout: Return true if dt ran after nonzero delay. */ bool delayed_task_timedout(struct delayed_task *dt) { KASSERTMSG((dt->dt_task.task_worker != NULL), "delayed_task_timedout(%p) must be called only from task action", dt); KASSERTMSG((dt->dt_task.task_worker->tw_current_task == &dt->dt_task), "delayed_task_timedout(%p) must be called only from task action", dt); return callout_invoking(&dt->dt_callout); } /* Reference-counted shared system task queues */ /* * taskqueue_get: Obtain a reference to a shared system task queue * running at the specified priority. The resulting task queue can be * used at any IPL up to and including IPL_VM. It must be released * with taskqueue_put -- do not pass it to taskqueue_destroy. May * sleep. */ int taskqueue_get(struct taskqueue **taskqueue_ret, pri_t pri) { const size_t i = taskqueue_pri_index(pri); char buf[9]; struct taskqueue *taskqueue, *tmp = NULL; int error; ASSERT_SLEEPABLE(); SDT_PROBE1(sdt, kernel, taskqueue, get, pri); /* Try to get it. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Not there. Drop the lock to create a new one. */ KASSERT(taskqueues[i].refcnt == 0); mutex_exit(&taskqueues_lock); SDT_PROBE1(sdt, kernel, taskqueue, get__create, pri); /* Three digits will fit in the buffer. */ (void)snprintf(buf, sizeof buf, "taskq%d", (int)pri); error = _taskqueue_create(&tmp, buf, pri, IPL_VM, TASKQUEUE_PERCPU); if (error) return error; /* Try again, but we may have raced. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Nobody raced us. Commit tmp. */ KASSERT(taskqueues[i].refcnt == 0); taskqueue = taskqueues[i].taskqueue = tmp; tmp = NULL; } else { SDT_PROBE1(sdt, kernel, taskqueue, get__race, pri); } } /* Bump the reference count. */ taskqueues[i].refcnt++; mutex_exit(&taskqueues_lock); /* If we created tmp but didn't end up using it, destroy it. */ if (tmp != NULL) _taskqueue_destroy(tmp); KASSERT(taskqueue != NULL); *taskqueue_ret = taskqueue; return 0; } /* * taskqueue_put: Release a reference to a shared system taskqueue * obtained with taskqueue_get. If this is the last one, destroy it. * Do not pass a taskqueue created with taskqueue_create. May sleep. */ void taskqueue_put(struct taskqueue *taskqueue) { const size_t i = taskqueue_pri_index(taskqueue->tq_pri); ASSERT_SLEEPABLE(); SDT_PROBE1(sdt, kernel, taskqueue, put, taskqueue->tq_pri); mutex_enter(&taskqueues_lock); KASSERT(taskqueues[i].taskqueue == taskqueue); KASSERT(0 < taskqueues[i].refcnt); if (--taskqueues[i].refcnt == 0) taskqueues[i].taskqueue = NULL; else taskqueue = NULL; mutex_exit(&taskqueues_lock); if (taskqueue != NULL) { SDT_PROBE1(sdt, kernel, taskqueue, put__destroy, taskqueue->tq_pri); _taskqueue_destroy(taskqueue); } } /* Task queue creation */ /* * taskqueue_create: Create a task queue to run tasks at priority * task_pri which can be scheduled from IPL schedule_ipl. The task * queue must be destroyed with taskqueue_destroy -- do not pass it to * taskqueue_put. May sleep. * * For soft interrupt priorities, you must use taskqueue_get to get a * shared system task queue, not taskqueue_create. */ int taskqueue_create(struct taskqueue **taskqueuep, const char *name, pri_t task_pri, int schedule_ipl, int flags) { KASSERTMSG(((task_pri == PRI_NONE) || (task_pri <= MAXPRI_KTHREAD)), "priority too high for taskqueue_create(%s): %d", name, (int)task_pri); return _taskqueue_create(taskqueuep, name, task_pri, schedule_ipl, flags); } static int _taskqueue_create(struct taskqueue **taskqueuep, const char *name, pri_t task_pri, int schedule_ipl, int flags) { struct taskqueue *taskqueue; int error; ASSERT_SLEEPABLE(); SDT_PROBE4(sdt, kernel, taskqueue, create, name, task_pri, schedule_ipl, flags); taskqueue = pool_get(&taskqueue_pool, PR_WAITOK); taskqueue->tq_flags = flags; taskqueue->tq_pri = task_pri; taskqueue->tq_ipl = schedule_ipl; taskqueue->tq_name = name; taskqueue->tq_percpu = percpu_create(sizeof(struct taskworker *), taskqueue_cpu_init, taskqueue_cpu_fini, taskqueue); KASSERT(task_pri <= MAXPRI_TASK); if (MINPRI_SOFTINT <= task_pri) { KASSERT(task_pri <= MAXPRI_SOFTINT); error = taskqueue_init_softint(taskqueue); } else { KASSERT(task_pri <= MAXPRI_KTHREAD); error = taskqueue_init_thread(taskqueue); } if (error) goto fail; /* Success! */ *taskqueuep = taskqueue; SDT_PROBE5(sdt, kernel, taskqueue, create__success, name, task_pri, schedule_ipl, flags, taskqueue); return 0; fail: percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); KASSERT(error); pool_put(&taskqueue_pool, taskqueue); SDT_PROBE5(sdt, kernel, taskqueue, create__failure, name, task_pri, schedule_ipl, flags, error); return error; } static int taskqueue_init_softint(struct taskqueue *taskqueue) { int softint_flags = SOFTINT_MPSAFE; /* XXX This table should go somewhere else. */ if (PRI_SOFTSERIAL <= taskqueue->tq_pri) softint_flags |= SOFTINT_SERIAL; else if (PRI_SOFTNET <= taskqueue->tq_pri) softint_flags |= SOFTINT_NET; else if (PRI_SOFTBIO <= taskqueue->tq_pri) softint_flags |= SOFTINT_BIO; else if (PRI_SOFTCLOCK <= taskqueue->tq_pri) softint_flags |= SOFTINT_CLOCK; else panic("invalid softint-level priority: %d", (int)taskqueue->tq_pri); taskqueue->tq_u.softint.cookie = softint_establish(softint_flags, &taskworker_softintr, taskqueue); if (taskqueue->tq_u.softint.cookie == NULL) return ENOMEM; return 0; } static int taskqueue_init_thread(struct taskqueue *taskqueue) { return threadpool_percpu_get(&taskqueue->tq_u.thread.pool_percpu, taskqueue->tq_pri); } static void taskqueue_cpu_init(void *vworkerp, void *vtaskqueue, struct cpu_info *ci) { struct taskworker **workerp = vworkerp; struct taskworker *worker; struct taskqueue *taskqueue = vtaskqueue; if (MINPRI_SOFTINT <= taskqueue->tq_pri) { struct taskworker_softint *worker_softint = pool_get(&taskworker_softint_pool, PR_WAITOK); worker = &worker_softint->tws_worker; } else { struct taskworker_thread *worker_thread = pool_get(&taskworker_thread_pool, PR_WAITOK); threadpool_job_init(&worker_thread->twt_job, &taskworker_job, &worker_thread->twt_worker.tw_lock, "%s/%u", taskqueue->tq_name, cpu_index(ci)); worker = &worker_thread->twt_worker; } taskworker_init(worker, taskqueue->tq_name, taskqueue, ci); *workerp = worker; } /* Task queue destruction */ /* * taskqueue_destroy: Wait for all tasks on taskqueue to complete and * destroy it. The taskqueue must have been created with * taskqueue_create. May sleep. */ void taskqueue_destroy(struct taskqueue *taskqueue) { #if DIAGNOSTIC { const size_t i = taskqueue_pri_index(taskqueue->tq_pri); mutex_enter(&taskqueues_lock); KASSERTMSG((taskqueues[i].taskqueue != taskqueue), "taskqueue_destroy used with shared taskqueue at pri %d", (int)taskqueue->tq_pri); mutex_exit(&taskqueues_lock); } #endif _taskqueue_destroy(taskqueue); } static void _taskqueue_destroy(struct taskqueue *taskqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); SDT_PROBE1(sdt, kernel, taskqueue, destroy, taskqueue); switch (taskqueue->tq_type) { case TASKQUEUE_THREAD: /* Cancel worker job on all CPUs and wait to complete. */ for (CPU_INFO_FOREACH(cii, ci)) taskqueue_cancel_cpu(taskqueue, ci); /* Release the thread pool. */ threadpool_percpu_put(taskqueue->tq_u.thread.pool_percpu, taskqueue->tq_pri); taskqueue->tq_u.thread.pool_percpu = NULL; /* paranoia */ break; case TASKQUEUE_SOFTINT: /* Wait for softint to complete on all CPUs and free it. */ softint_disestablish(taskqueue->tq_u.softint.cookie); taskqueue->tq_u.softint.cookie = NULL; /* paranoia */ break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); pool_put(&taskqueue_pool, taskqueue); } static void taskqueue_cancel_cpu(struct taskqueue *taskqueue, struct cpu_info *ci) { struct taskworker **workerp, *worker; struct taskworker_thread *worker_thread; struct threadpool *pool; /* Get the task worker. */ percpu_traverse_enter(); workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); worker = *workerp; percpu_traverse_exit(); /* Get the worker thread and thread pool for that CPU. */ worker_thread = container_of(worker, struct taskworker_thread, twt_worker); pool = threadpool_percpu_ref_remote(taskqueue->tq_u.thread.pool_percpu, ci); /* Mark the worker dying and cancel the thread pool job. */ mutex_enter(&worker->tw_lock); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DYING)); worker->tw_flags |= TASKWORKER_DYING; cv_broadcast(&worker->tw_cv); threadpool_cancel_job(pool, &worker_thread->twt_job); KASSERT(ISSET(worker->tw_flags, TASKWORKER_DYING)); worker->tw_flags &= ~TASKWORKER_DYING; mutex_exit(&worker->tw_lock); /* All done. Destroy the job. */ threadpool_job_destroy(&worker_thread->twt_job); } static void taskqueue_cpu_fini(void *vworkerp, void *vtaskqueue, struct cpu_info *ci) { struct taskworker *const *workerp = vworkerp; struct taskworker *worker = *workerp; struct taskqueue *taskqueue = vtaskqueue; taskworker_destroy(worker, ci); switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: pool_put(&taskworker_softint_pool, container_of(worker, struct taskworker_softint, tws_worker)); break; case TASKQUEUE_THREAD: pool_put(&taskworker_thread_pool, container_of(worker, struct taskworker_thread, twt_worker)); break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } } /* Task worker initialization and destruction */ static void taskworker_init(struct taskworker *worker, const char *name, struct taskqueue *taskqueue, struct cpu_info *ci) { mutex_init(&worker->tw_lock, MUTEX_DEFAULT, taskqueue->tq_ipl); cv_init(&worker->tw_cv, name); worker->tw_taskqueue = taskqueue; TAILQ_INIT(&worker->tw_tasks); worker->tw_current_task = NULL; worker->tw_cpu = ci; worker->tw_flags = 0; } static void taskworker_destroy(struct taskworker *worker, struct cpu_info *ci) { KASSERT(worker->tw_flags == 0); KASSERT(worker->tw_cpu == ci); KASSERT(worker->tw_current_task == NULL); KASSERT(TAILQ_EMPTY(&worker->tw_tasks)); worker->tw_taskqueue = NULL; cv_destroy(&worker->tw_cv); mutex_destroy(&worker->tw_lock); } static struct taskworker * current_taskworker(struct taskqueue *taskqueue) { struct taskworker **workerp, *worker; KASSERT(curlwp->l_pflag & LP_BOUND); workerp = percpu_getref(taskqueue->tq_percpu); worker = *workerp; percpu_putref(taskqueue->tq_percpu); return worker; } /* Task scheduling */ /* * task_schedule: Schedule task on the shared system low-priority task * queue. */ void task_schedule(struct task *task) { taskqueue_schedule(system_taskqueue, task); } /* * delayed_task_schedule: Schedule dt on the shared system low-priority * task queue with the specified delay. */ void delayed_task_schedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { taskqueue_schedule_delayed(system_taskqueue, dt, nsec, jitter); } /* * delayed_task_reschedule: Try to reschedule a delayed task to run the * specified time from now. It may be too late: the timeout may have * already fired. Return true if we successfully rescheduled, false if * it was too late. */ bool delayed_task_reschedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { struct taskworker *worker = dt->dt_task.task_worker; if (worker == NULL) /* It was not scheduled to begin with, so no reschedule. */ return false; mutex_enter(&worker->tw_lock); if (dt->dt_task.task_worker != worker) { /* Its worker already executed it, so too late. */ mutex_exit(&worker->tw_lock); return false; } if (worker->tw_current_task == &dt->dt_task) { /* The worker is already executing it, so too late. */ mutex_exit(&worker->tw_lock); return false; } if (callout_stop(&dt->dt_callout)) { /* The callout already fired, so too late. */ mutex_exit(&worker->tw_lock); return false; } /* We stopped the callout before it fired. Reschedule it. */ if (nsec == 0) { taskworker_maybe_switch(&worker, &dt->dt_task); taskworker_schedule(worker, &dt->dt_task); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } mutex_exit(&worker->tw_lock); return true; } /* * taskworker_maybe_switch: If *workerp is not the current CPU's * worker, switch task, which must currently be assigned to *workerp, * to the current CPU's worker instead, and store the current CPU's * worker in *workerp. Returns with *workerp's lock held, whether the * same worker as on input or the new worker. In the latter case, * taskworker_maybe_switch will have dropped the old worker's lock and * acquired the new worker's lock. * * This is needed so that it is on the queue that the softint handler * will examine when we softint_schedule, which triggers the softint on * the current CPU only. The alternative would be to use an xcall to * call softint_schedule on the remote CPU, but that requires sleeping, * which delayed_task_timeout and delayed_task_reschedule can't do. * Another alternative would be to implement callout_bind to force the * callout to run only on the correct CPU. * * This is necessary only before calling taskworker_schedule with a * worker that for a softint task queue and is not guaranteed to be the * current CPU's worker. This is the case only in delayed_task_timeout * and delayed_task_reschedule. For thread task queues or when the * worker is guaranteed to be the current CPU's worker, this is not * needed. We are bound to the CPU, so taskqueue_schedule and * taskqueue_schedule_delayed are in no danger of switching CPUs while * they're not looking. */ static void taskworker_maybe_switch(struct taskworker **workerp, struct task *task) { struct taskworker *cur_worker, *worker0 __diagused; if ((*workerp)->tw_taskqueue->tq_type != TASKQUEUE_SOFTINT) return; KASSERT(curlwp->l_pflag & LP_BOUND); KASSERT(mutex_owned(&(*workerp)->tw_lock)); KASSERT(task->task_worker == *workerp); KASSERT((*workerp)->tw_current_task != task); /* Get the current CPU's worker. */ cur_worker = current_taskworker((*workerp)->tw_taskqueue); KASSERT(cur_worker->tw_taskqueue == (*workerp)->tw_taskqueue); /* If *workerp is already it, we're good. */ if (__predict_true(cur_worker == *workerp)) return; /* Otherwise, atomically switch workers and change locks. */ SDT_PROBE3(sdt, kernel, task, switch, task, (*workerp)->tw_cpu, cur_worker->tw_cpu); worker0 = atomic_swap_ptr(&task->task_worker, cur_worker); KASSERT(worker0 == *workerp); mutex_exit(&(*workerp)->tw_lock); *workerp = cur_worker; mutex_enter(&(*workerp)->tw_lock); KASSERT(task->task_worker == *workerp); } /* * taskqueue_schedule: Schedule task to run on taskqueue. If task is * already scheduled, don't change it. */ void taskqueue_schedule(struct taskqueue *taskqueue, struct task *task) { struct taskworker *worker, *worker0; int bound; SDT_PROBE2(sdt, kernel, taskqueue, schedule, taskqueue, task); /* Bind to the current CPU and get the CPU's worker. */ bound = curlwp_bind(); worker = current_taskworker(taskqueue); mutex_enter(&worker->tw_lock); /* Try to grab the task. */ while (__predict_false((worker0 = atomic_cas_ptr(&task->task_worker, NULL, worker)) != NULL)) { /* Someone else got in first. Switch locks. */ if (worker0 != worker) { SDT_PROBE2(sdt, kernel, taskqueue, schedule__race, taskqueue, task); mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (__predict_true(task->task_worker == worker0)) { /* * Nobody else got in while we were locking. * The task is either on worker0's queue or * being run by worker0. */ taskworker_requeue(worker0, task); mutex_exit(&worker0->tw_lock); goto out; } /* Someone got in again. Switch locks back and retry. */ if (worker0 != worker) { mutex_exit(&worker0->tw_lock); mutex_enter(&worker->tw_lock); } } /* We grabbed it. Put it on the queue. */ taskworker_enqueue(worker, task); mutex_exit(&worker->tw_lock); out: curlwp_bindx(bound); } static void taskworker_enqueue(struct taskworker *worker, struct task *task) { SDT_PROBE3(sdt, kernel, taskqueue, schedule__enqueue, worker->tw_taskqueue, task, worker->tw_cpu); taskworker_schedule(worker, task); } static void taskworker_requeue(struct taskworker *worker0, struct task *task) { /* Is it still on the queue, or already on the queue again? */ if (worker0->tw_current_task != task || ISSET(worker0->tw_flags, TASKWORKER_REQUEUED)) return; /* It's already begun to run. Queue it up to run again. */ SDT_PROBE3(sdt, kernel, taskqueue, schedule__requeue, worker0->tw_taskqueue, task, worker0->tw_cpu); worker0->tw_flags |= TASKWORKER_REQUEUED; taskworker_schedule(worker0, task); } /* * taskqueue_schedule_delayed: Schedule the delayed task dt to run on * taskqueue after the specified number of nanoseconds. If it is * already scheduled, don't change it. taskqueue must run at a * priority no higher than PRI_SOFTCLOCK. If the timeout is zero, * schedule the task to run without delay. */ void taskqueue_schedule_delayed(struct taskqueue *taskqueue, struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { struct taskworker *worker, *worker0; int bound; KASSERTMSG((taskqueue->tq_pri <= PRI_SOFTCLOCK), "taskqueue %p priority too high for delayed task %p: %d", taskqueue, dt, (int)taskqueue->tq_pri); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); SDT_PROBE4(sdt, kernel, taskqueue, schedule__delayed, taskqueue, dt, nsec, jitter); /* Bind to the current CPU and get the CPU's worker. */ bound = curlwp_bind(); worker = current_taskworker(taskqueue); mutex_enter(&worker->tw_lock); while (__predict_false((worker0 = atomic_cas_ptr(&dt->dt_task.task_worker, NULL, worker)) != NULL)) { if (worker0 != worker) { SDT_PROBE4(sdt, kernel, taskqueue, schedule__delayed__race, taskqueue, dt, nsec, jitter); mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (__predict_true(dt->dt_task.task_worker == worker0)) { taskworker_requeue_delayed(worker0, dt, nsec, jitter); mutex_exit(&worker0->tw_lock); goto out; } if (worker0 != worker) { mutex_exit(&worker0->tw_lock); mutex_enter(&worker->tw_lock); } } taskworker_enqueue_delayed(worker, dt, nsec, jitter); mutex_exit(&worker->tw_lock); out: curlwp_bindx(bound); } static void taskworker_enqueue_delayed(struct taskworker *worker, struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { SDT_PROBE5(sdt, kernel, taskqueue, schedule__delayed__enqueue, worker->tw_taskqueue, dt, nsec, jitter, worker->tw_cpu); if (nsec == 0) { /* * Clear any prior CALLOUT_INVOKING flag first so that * delayed_task_timedout(dt) will return false while * it's running. */ callout_ack(&dt->dt_callout); taskworker_schedule(worker, &dt->dt_task); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } } static void taskworker_requeue_delayed(struct taskworker *worker0, struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { if (worker0->tw_current_task != &dt->dt_task || ISSET(worker0->tw_flags, TASKWORKER_REQUEUED)) return; SDT_PROBE5(sdt, kernel, taskqueue, schedule__delayed__requeue, worker0->tw_taskqueue, dt, nsec, jitter, worker0->tw_cpu); worker0->tw_flags |= TASKWORKER_REQUEUED; if (nsec == 0) { worker0->tw_flags |= TASKWORKER_CALLOUT_ACK; taskworker_schedule(worker0, &dt->dt_task); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } } static int nstohz(nsec_t nsec, jitter_t jitter) { /* * XXX Temporary stub until we get proper nanosecond-resolution * timer APIs. */ (void)jitter; return mstohz(nsec / 1000000u); } /* Task cancellation */ /* * task_cancel: Cancel task. If it has already begun, wait for it to * complete (or to call task_done). May drop interlock if it is * necessary to wait. May sleep. * * Returns true if task was scheduled and we prevented it from running. * Returns false if the task was not scheduled or had already begun. */ bool task_cancel(struct task *task, kmutex_t *interlock) { struct taskworker *const worker = atomic_load_acquire(&task->task_worker); bool cancelled; ASSERT_SLEEPABLE(); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); SDT_PROBE2(sdt, kernel, task, cancel, task, interlock); /* If it's not scheduled, it never ran. */ if (worker == NULL) { cancelled = true; goto out; } mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (task->task_worker != worker) { cancelled = false; goto out_unlock; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == task) { mutex_exit(interlock); SDT_PROBE2(sdt, kernel, task, cancel__wait, task, interlock); while ((worker->tw_current_task == task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); cancelled = false; goto out; } /* Got it before it ran! Remove it from the queue. */ TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); cancelled = true; out_unlock: mutex_exit(&worker->tw_lock); out: SDT_PROBE3(sdt, kernel, task, cancelled, task, interlock, cancelled); return cancelled; } /* * task_cancel_async: Like task_cancel, but returns immediately even if * the task is in flight. Hence there is no need for an interlock. */ bool task_cancel_async(struct task *task) { struct taskworker *worker = atomic_load_acquire(&task->task_worker); bool cancelled; KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); SDT_PROBE2(sdt, kernel, task, cancel, task, NULL); if (worker == NULL) { cancelled = false; goto out; } mutex_enter(&worker->tw_lock); if (task->task_worker != worker) { cancelled = false; goto out_unlock; } if (worker->tw_current_task == task) { cancelled = false; goto out_unlock; } TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); cancelled = true; out_unlock: mutex_exit(&worker->tw_lock); out: SDT_PROBE3(sdt, kernel, task, cancelled, task, NULL, cancelled); return true; } /* Delayed task cancellation */ /* * delayed_task_cancel: Cancel the delayed task dt. If it has already * begun, wait for it to complete (or to call task_done). May drop * interlock if it is necessary to wait, either for the callout to * complete or for the task to complete. May sleep. * * Returns true if dt was scheduled and we prevented it from running. * Returns false if dt was not scheduled or had already begun. */ bool delayed_task_cancel(struct delayed_task *dt, kmutex_t *interlock) { struct taskworker *worker = atomic_load_acquire(&dt->dt_task.task_worker); bool cancelled; ASSERT_SLEEPABLE(); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); SDT_PROBE2(sdt, kernel, task, cancel, &dt->dt_task, interlock); /* If it's not scheduled, we can't cancel it. */ if (worker == NULL) { cancelled = false; goto out; } mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (dt->dt_task.task_worker != worker) { cancelled = false; out_unlock: mutex_exit(&worker->tw_lock); goto out; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == &dt->dt_task) { mutex_exit(interlock); while ((worker->tw_current_task == &dt->dt_task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); cancelled = false; goto out_relock; } /* * Try to cancel it without dropping the interlock. This is an * optimization; we don't need to do it. */ if (!callout_stop(&dt->dt_callout)) { /* Callout had not fired, so we cancelled it. */ cancelled = true; goto out_unlock; } /* * We have to drop the interlock here because we need to drop * the interlock and the worker lock in order to sleep if the * callout has fired, but callout_halt can't drop two locks. * * XXX An alternative would be to have a caller-supplied * interlock stored in the delayed task structure. That would * simplify the logic in scheduling delayed tasks, too. * However, it would complicate the API for callers that don't * need interlocking in task_cancel (XXX are there any, * really?). */ mutex_exit(interlock); SDT_PROBE2(sdt, kernel, task, cancel__wait, &dt->dt_task, interlock); /* Cancel the callout and wait for it to complete. */ if (!callout_halt(&dt->dt_callout, &worker->tw_lock)) { /* Callout had not fired, so we cancelled it. */ /* XXX Is this case possible? */ cancelled = true; goto out_relock; } /* It already fired. Try to cancel the task on the queue. */ if (worker->tw_current_task == &dt->dt_task) { /* The task is already running. Wait to complete. */ while ((worker->tw_current_task == &dt->dt_task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); cancelled = false; goto out_relock; } /* The task has not yet run. Don't let it. */ TAILQ_REMOVE(&worker->tw_tasks, &dt->dt_task, task_entry); cancelled = true; out_relock: mutex_exit(&worker->tw_lock); mutex_enter(interlock); out: SDT_PROBE3(sdt, kernel, task, cancelled, &dt->dt_task, interlock, cancelled); return cancelled; } /* * delayed_task_cancel_async: Like delayed_task_cancel, but returns * immediately even if the task is in flight. Hence there is no need * for an interlock. */ bool delayed_task_cancel_async(struct delayed_task *dt) { struct taskworker *worker = atomic_load_acquire(&dt->dt_task.task_worker); bool cancelled; KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); SDT_PROBE2(sdt, kernel, task, cancel, &dt->dt_task, NULL); if (worker == NULL) { cancelled = false; goto out; } mutex_enter(&worker->tw_lock); if (dt->dt_task.task_worker != worker) { cancelled = false; goto out_unlock; } if (!callout_stop(&dt->dt_callout)) { cancelled = true; goto out_unlock; } if (worker->tw_current_task != &dt->dt_task) { TAILQ_REMOVE(&worker->tw_tasks, &dt->dt_task, task_entry); cancelled = true; goto out_unlock; } cancelled = false; out_unlock: mutex_exit(&worker->tw_lock); out: SDT_PROBE3(sdt, kernel, task, cancelled, &dt->dt_task, NULL, cancelled); return cancelled; } /* Worker scheduling */ /* * taskworker_schedule: Put a task on worker's queue and make sure it's * scheduled to run. */ static void taskworker_schedule(struct taskworker *worker, struct task *task) { struct taskqueue *taskqueue = worker->tw_taskqueue; KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(curlwp->l_pflag & LP_BOUND); KASSERT(worker == current_taskworker(taskqueue)); KASSERT(task->task_worker == worker); TAILQ_INSERT_TAIL(&worker->tw_tasks, task, task_entry); switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: softint_schedule(taskqueue->tq_u.softint.cookie); break; case TASKQUEUE_THREAD: { struct taskworker_thread *worker_thread = container_of(worker, struct taskworker_thread, twt_worker); struct threadpool *pool = threadpool_percpu_ref(taskqueue->tq_u.thread.pool_percpu); threadpool_schedule_job(pool, &worker_thread->twt_job); break; } default: panic("taskqueue %p has invalid type: %d", worker->tw_taskqueue, (int)worker->tw_taskqueue->tq_type); } } /* Task execution */ /* * taskworker_run_1: Pick one task off queue and execute it. To be * called from a worker thread or softint handler. */ static void taskworker_run_1(struct taskworker *worker) { struct task *task; KASSERT(mutex_owned(&worker->tw_lock)); /* Grab the task from the queue. */ task = TAILQ_FIRST(&worker->tw_tasks); KASSERT(task != NULL); TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); /* We had better not be working on anything yet. */ KASSERT(worker->tw_current_task == NULL); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)); /* Mark it current and run it. */ worker->tw_current_task = task; mutex_exit(&worker->tw_lock); SDT_PROBE1(sdt, kernel, task, run, task); (*task->task_fn)(task); mutex_enter(&worker->tw_lock); KASSERT(worker->tw_current_task == task); /* Mark it done. */ if (!ISSET(worker->tw_flags, TASKWORKER_DONE)) taskworker_done(worker, task); /* Can't touch task after this. */ KASSERT(ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_CALLOUT_ACK)); /* Clear the flags related to this task. */ worker->tw_flags &= ~(TASKWORKER_DONE | TASKWORKER_REQUEUED | TASKWORKER_CALLOUT_ACK); } /* * taskworker_done: Mark task done. The worker will not touch it after * this, and the task can be executed again if scheduled afterward. If * someone had already requeued the task, acknowledge this; otherwise, * disassociate the task from the worker. */ static void taskworker_done(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(worker->tw_current_task == task); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(task->task_worker == worker); SDT_PROBE1(sdt, kernel, task, done, task); /* XXX KLUDGE */ if (ISSET(worker->tw_flags, TASKWORKER_CALLOUT_ACK)) { struct delayed_task *dt = container_of(task, struct delayed_task, dt_task); /* Clear the CALLOUT_INVOKING flag before it runs again. */ callout_ack(&dt->dt_callout); worker->tw_flags &= ~TASKWORKER_CALLOUT_ACK; } /* Was it requeued while running? */ if (ISSET(worker->tw_flags, TASKWORKER_REQUEUED)) { /* * Yes -- it's on the queue now, so just unmark it as * requeued, as if it had just been put on the queue. */ worker->tw_flags &= ~TASKWORKER_REQUEUED; } else { /* * No -- it no longer belongs to this taskworker, so * take it off the queue. Can't touch task after this. * Use atomic_store_release to ensure all side effects * of the task are visible by after an attempt to * cancel that witnesses the NULL. */ atomic_store_release(&task->task_worker, NULL); } /* Notify task_cancel that we're done. */ worker->tw_current_task = NULL; cv_broadcast(&worker->tw_cv); /* * Let taskworker_run_1 know that we're done, in case we got * here via explicit task_done. */ worker->tw_flags |= TASKWORKER_DONE; } /* Worker soft interrupt handler and threadpool job */ static void taskworker_softintr(void *cookie) { struct taskqueue *taskqueue = cookie; struct taskworker *worker; struct task marker; /* Get the worker. We're bound to the CPU while in a softint. */ worker = current_taskworker(taskqueue); /* * Mark where we were and run only to that point. We block * other softints at the same priority, so if any new tasks are * scheduled, we'll handle them in the next batch, in order to * let other softints get a chance to run. */ mutex_enter(&worker->tw_lock); TAILQ_INSERT_TAIL(&worker->tw_tasks, &marker, task_entry); while (TAILQ_FIRST(&worker->tw_tasks) != &marker) { KASSERT(!TAILQ_EMPTY(&worker->tw_tasks)); taskworker_run_1(worker); } TAILQ_REMOVE(&worker->tw_tasks, &marker, task_entry); mutex_exit(&worker->tw_lock); } static void taskworker_job(struct threadpool_job *job) { struct taskworker_thread *worker_thread = container_of(job, struct taskworker_thread, twt_job); struct taskworker *worker = &worker_thread->twt_worker; mutex_enter(&worker->tw_lock); for (;;) { /* * Wait until we have a task, or give up if nobody's * handed us any after a little while. */ while (__predict_false(TAILQ_EMPTY(&worker->tw_tasks))) { if (ISSET(worker->tw_flags, TASKWORKER_DYING)) goto out; if (cv_timedwait(&worker->tw_cv, &worker->tw_lock, TASKQUEUE_IDLE_TICKS)) { if (TAILQ_EMPTY(&worker->tw_tasks)) goto out; } } /* * Run one task at a time. Preemption is allowed while * the task is running, so there's no need to process * tasks in batches like for softints. */ while (!TAILQ_EMPTY(&worker->tw_tasks)) taskworker_run_1(worker); } out: KASSERT(TAILQ_EMPTY(&worker->tw_tasks)); threadpool_job_done(job); mutex_exit(&worker->tw_lock); } /* Delayed task callout handler */ void delayed_task_timeout(void *cookie) { struct delayed_task *dt = cookie; struct taskworker *worker = dt->dt_task.task_worker; /* We had better be bound to the CPU, in a callout. */ KASSERT(curlwp->l_pflag & LP_BOUND); SDT_PROBE1(sdt, kernel, delayed__task, timeout, dt); /* We had better have been assigned a worker. */ KASSERT(worker != NULL); mutex_enter(&worker->tw_lock); /* We had better still be on this worker. */ KASSERT(dt->dt_task.task_worker == worker); /* Is a prior instance already running? */ if (worker->tw_current_task == &dt->dt_task) { /* * The task was already running and the timeout * triggered because it was scheduled again. Mark it * requeued. */ SDT_PROBE2(sdt, kernel, delayed__task, timeout__requeue, dt, worker->tw_cpu); worker->tw_flags |= TASKWORKER_REQUEUED; } else { /* Make sure we're putting it on the current worker. */ SDT_PROBE2(sdt, kernel, delayed__task, timeout__enqueue, dt, worker->tw_cpu); taskworker_maybe_switch(&worker, &dt->dt_task); } /* * Put it at the end of the queue and make sure the worker is * scheduled to run. */ taskworker_schedule(worker, &dt->dt_task); mutex_exit(&worker->tw_lock); } /* Draining task queues */ struct drain { struct taskqueue *d_taskqueue; struct taskworker d_worker; struct drain_task *d_tasks; unsigned int d_count; }; struct drain_task { struct task drt_task; struct drain *drt_drain; }; void taskqueue_drain(struct taskqueue *taskqueue) { xcfunc_t xcfunc; struct drain drain; /* We are going to sleep big-time. */ ASSERT_SLEEPABLE(); SDT_PROBE1(sdt, kernel, taskqueue, drain__start, taskqueue); /* Set up some temporary state for draining. */ drain.d_taskqueue = taskqueue; drain.d_count = ncpu; taskworker_init(&drain.d_worker, "tqdrain", taskqueue, NULL); /* XXX Allocation to destroy is sketchy... */ CTASSERT(MAXCPUS <= SIZE_MAX/sizeof(drain.d_tasks[0])); drain.d_tasks = kmem_alloc(ncpu*sizeof(drain.d_tasks[0]), KM_SLEEP); /* Do the per-CPU draining setup. */ switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: xcfunc = &taskqueue_drain_softint_xc; break; case TASKQUEUE_THREAD: xcfunc = &taskqueue_drain_thread_xc; break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } xc_wait(xc_broadcast(0, xcfunc, taskqueue, &drain)); mutex_enter(&drain.d_worker.tw_lock); /* Wait for the tasks or xcalls running on other CPUs to complete. */ while (0 < drain.d_count) cv_wait(&drain.d_worker.tw_cv, &drain.d_worker.tw_lock); /* Run all the tasks that couldn't run on other CPUs. */ while (!TAILQ_EMPTY(&drain.d_worker.tw_tasks)) taskworker_run_1(&drain.d_worker); mutex_exit(&drain.d_worker.tw_lock); /* Nuke the temporary drain state. */ kmem_free(drain.d_tasks, ncpu*sizeof(drain.d_tasks[0])); taskworker_destroy(&drain.d_worker, NULL); KASSERT(drain.d_count == 0); SDT_PROBE1(sdt, kernel, taskqueue, drain__done, taskqueue); } static void drain_task(struct task *task) { struct drain_task *drt = container_of(task, struct drain_task, drt_task); SDT_PROBE1(sdt, kernel, taskqueue, drain__cpu__done, drt->drt_drain->d_taskqueue); mutex_enter(&drt->drt_drain->d_worker.tw_lock); if (--drt->drt_drain->d_count == 0) cv_broadcast(&drt->drt_drain->d_worker.tw_cv); mutex_exit(&drt->drt_drain->d_worker.tw_lock); } static void taskqueue_drain_softint_xc(void *vtaskqueue, void *vdrain) { struct taskqueue *taskqueue = vtaskqueue; struct drain *drain = vdrain; struct drain_task *drt = &drain->d_tasks[cpu_index(curcpu())]; struct taskworker *worker; /* We're in a cross-call so we're bound to the CPU. */ KASSERT(curlwp->l_pflag & LP_BOUND); SDT_PROBE1(sdt, kernel, taskqueue, drain__cpu, drt->drt_drain->d_taskqueue); worker = current_taskworker(taskqueue); /* * Just schedule the drain task -- softints are supposed to be * quick, are not allowed to sleep, and are not preemptive with * one another, so there is no point in pooling them. */ task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; taskworker_schedule(worker, &drt->drt_task); } static void taskqueue_drain_thread_xc(void *vtaskqueue, void *vdrain) { struct taskqueue *taskqueue = vtaskqueue; struct drain *drain = vdrain; struct taskworker *worker; struct taskworker_thread *worker_thread; struct threadpool *pool; struct task *task, *next; SDT_PROBE1(sdt, kernel, taskqueue, drain__cpu, drain->d_taskqueue); worker = current_taskworker(taskqueue); worker_thread = container_of(worker, struct taskworker_thread, twt_worker); pool = threadpool_percpu_ref(taskqueue->tq_u.thread.pool_percpu); mutex_enter(&worker->tw_lock); if (threadpool_cancel_job_async(pool, &worker_thread->twt_job)) { /* Cancelled. Move all the tasks to our drain worker. */ SDT_PROBE1(sdt, kernel, taskqueue, drain__cpu__migrate, drain->d_taskqueue); mutex_enter(&drain->d_worker.tw_lock); TAILQ_FOREACH_SAFE(task, &worker->tw_tasks, task_entry, next) { TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); TAILQ_INSERT_TAIL(&drain->d_worker.tw_tasks, task, task_entry); } if (--drain->d_count == 0) cv_broadcast(&drain->d_worker.tw_cv); mutex_exit(&drain->d_worker.tw_lock); } else { /* Job is running. Let it run the drain task. */ struct drain_task *drt = &drain->d_tasks[cpu_index(curcpu())]; SDT_PROBE1(sdt, kernel, taskqueue, drain__cpu__wait, drain->d_taskqueue); KASSERT(drt->drt_task.task_worker == NULL); KASSERT(worker->tw_current_task != &drt->drt_task); task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; drt->drt_task.task_worker = worker; TAILQ_INSERT_TAIL(&worker->tw_tasks, &drt->drt_task, task_entry); /* No need to call taskworker_schedule -- already running. */ } mutex_exit(&worker->tw_lock); }