/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code is not tested! It is a draft of an idea. See below the * copyright notice for a summary. * * Fourth draft, using threadpool(9). */ /*- * Copyright (c) 2014 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /* * XXX Haven't (yet?) implemented single-threaded task queues. Would * be more bookkeeping, but could reduce the footprint of seldom-used * task queues by a factor of ncpu. * * XXX Haven't (yet?) implemented unbound task queues. Still more * bookkeeping, but maybe worthwhile. * * XXX Hmm... Can the worker queues be managed without locks, by * restricting access to the local CPU? That would make task_cancel * difficult, and cross-CPU requeueing logic would be hairy. * * XXX Offline and online CPUs? Ugh... * * Lock order: * * 1. callers' locks, including task_cancel interlock * 2. struct taskworker::tw_lock for one normal worker at a time * 3. struct taskworker::tw_lock for one drain worker at a time */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define container_of(PTR, TYPE, FIELD) \ ((void)sizeof((PTR) - \ &((TYPE *)(((char *)(PTR)) - \ offsetof(TYPE, FIELD)))->FIELD), \ ((TYPE *)(((char *)(PTR)) - offsetof(TYPE, FIELD)))) #define TASKQUEUE_IDLE_TICKS mstohz(100) /* Data structures */ TAILQ_HEAD(task_head, task); struct taskqueue { struct percpu *tq_percpu; /* struct taskworker * */ union { struct { void *cookie; } softint; struct { struct threadpool_percpu *pool_percpu; } } tq_u; enum taskqueue_type { TASKQUEUE_SOFTINT, TASKQUEUE_THREAD, } tq_type; int tq_flags; pri_t tq_pri; int tq_ipl; const char *tq_name; }; struct taskworker { kmutex_t tw_lock; kcondvar_t tw_cv; struct taskqueue *tw_taskqueue; struct task_head tw_tasks; struct task *tw_current_task; int tw_flags; #define TASKWORKER_DONE 0x01 #define TASKWORKER_REQUEUED 0x02 #define TASKWORKER_CALLOUT_ACK 0x04 /* XXX KLUDGE */ #define TASKWORKER_DYING 0x08 }; struct taskworker_softint { struct taskworker tws_worker; }; struct taskqueue_thread { struct taskworker twt_worker; struct threadpool_job twt_job; }; /* Forward declarations */ static size_t taskqueue_pri_index(pri_t); static void task_error(struct task *); static int _taskqueue_create(struct taskqueue **, const char *, pri_t, int, int); static void _taskqueue_destroy(struct taskqueue *); static void taskqueue_init_softint(struct taskqueue *); static void taskqueue_init_thread(struct taskqueue *); static void taskworker_init(struct taskworker *, const char *, int, struct taskqueue *); static void taskworker_destroy(struct taskworker *); static void taskworker_assign_cpu(struct taskworker *, struct taskqueue *, struct cpu_info *); static struct taskworker * taskworker_unassign_cpu(struct taskqueue *, struct cpu_info *); static int nstohz(nsec_t, jitter_t); static void taskworker_maybe_switch(struct taskworker **, struct task *); static void taskworker_schedule(struct taskworker *); static void taskworker_run_1(struct taskworker *, struct task_head *); static void taskworker_done(struct taskworker *, struct task *); static void taskworker_softintr(void *); static void taskworker_job(struct threadpool_job *); static task_fn_t drain_task; static void taskqueue_drain_softint_xc(void *, void *); static void taskqueue_drain_thread_xc(void *, void *); /* Global state and initialization */ static struct pool taskqueue_pool; static struct pool taskworker_softint_pool; static struct pool taskworker_thread_pool; static kmutex_t taskqueues_lock __cacheline_aligned; static struct { struct taskqueue *taskqueue; unsigned int refcnt; } taskqueues[PRI_COUNT + 1]; /* XXX Where should these definitions go? */ #define MAXPRI_TASKQUEUE MAXPRI_SOFTINT #define MAXPRI_SOFTINT MAXPRI_KERNEL_RT #define MINPRI_SOFTINT PRI_SOFTCLOCK #define PRI_COUNT_KTHREAD (MAXPRI_KTHREAD + 1) struct taskqueue *system_taskqueue __read_mostly; /* * tasks_init: Initialize the task subsystem. */ void tasks_init(void) { int error; pool_init(&taskqueue_pool, sizeof(struct taskqueue), 0, 0, 0, "taskq", NULL, IPL_NONE); pool_init(&taskworker_softint_pool, sizeof(struct taskworker_softint), 0, 0, 0, "taskwsih", NULL, IPL_NONE); pool_init(&taskworker_thread_pool, sizeof(struct taskworker_thread), 0, 0, 0, "taskwthr", NULL, IPL_NONE); mutex_init(&taskqueues_lock, MUTEX_DEFAULT, IPL_NONE); error = taskqueue_get(&system_taskqueue, PRI_NONE); if (error) panic("failed to create system task queue: %d", error); } static size_t taskqueue_pri_index(pri_t pri) { if (pri == PRI_NONE) { return (PRI_COUNT + 1); } else { KASSERTMSG((0 <= pri), "negative priority: %d", (int)pri); KASSERTMSG((pri <= maxpri), "priority out of range: %d", (int)pri); return pri; } } /* Tasks */ static void task_error(struct task *task) { panic("destroyed task got run: %p", task); } /* * task_destroy: Destroy a task initialized with task_init. Must no * longer be scheduled. Use task_cancel first if this is not already * guaranteed. task_cancel_async is not enough. If you want to call * this inside a task callback, you must use task_done first. * * Not strictly necessary, but this is part of the documented API in * case it ever becomes necessary. */ void task_destroy(struct task *task) { KASSERTMSG((task->task_fn != &task_error), "task already destroyed: %p", task); KASSERTMSG((task->task_worker == NULL), "task still scheduled or running: %p", task); task->task_fn = task_error; } /* * task_done: To be called in a task callback to notify the task worker * that the task is done and can be scheduled again. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct task. */ void task_done(struct task *task) { struct taskworker *const worker = task->task_worker; KASSERTMSG((worker != NULL), "task not running: %p", task); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't be completing: %p", task); mutex_enter(&worker->tw_lock); KASSERTMSG((task->task_worker == worker), "task %p changed workers: %p -> %p", task, worker, task->task_worker); KASSERTMSG((worker->tw_current_task == task), "task %p is not current on worker %p", task, worker); KASSERTMSG(!ISSET(worker->tw_flags, TASKWORKER_DONE), "task %p is already done on worker %p", task, worker); taskworker_done(worker, task); mutex_exit(&worker->tw_lock); } /* Delayed tasks */ /* * delayed_task_destroy: Destroy a delayed task initialized with * delayed_task_init. Must no longer be scheduled. Use * delayed_task_cancel first if this is not already guaranteed. * delayed_task_cancel_async is not enough. If you want to call this * inside a task callback, you must use delayed_task_done first. */ void delayed_task_destroy(struct delayed_task *dt) { KASSERTMSG((!callout_pending(&dt->dt_callout)), "delayed task still pending timeout: %p", dt); KASSERTMSG((dt->dt_task.task_worker == NULL), "delayed task still scheduled or running: %p", dt); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task already destroyed: %p", dt); callout_destroy(&dt->dt_callout); task_destroy(&dt->dt_task); } /* * delayed_task_done: To be called in a delayed task callback to notify * the task worker that the task is done and can be scheduled again. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct delayed_task. */ void delayed_task_done(struct delayed_task *dt) { task_done(&dt->dt_task); } /* Reference-counted shared system task queues */ /* * taskqueue_get: Obtain a reference to a shared system task queue * running at the specified priority. The resulting task queue can be * used at any IPL up to and including IPL_VM. It must be released * with taskqueue_put -- do not pass it to taskqueue_destroy. May * sleep. */ int taskqueue_get(struct taskqueue **taskqueue_ret, pri_t pri) { const size_t i = taskqueue_pri_index(pri, PRI_COUNT); char buf[9]; struct taskqueue *taskqueue, *tmp = NULL; int error; ASSERT_SLEEPABLE(); /* Try to get it. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Not there. Drop the lock to create a new one. */ KASSERT(taskqueues[i].refcnt == 0); mutex_exit(&taskqueues_lock); /* Three digits will fit in the buffer. */ (void)snprintf(buf, sizeof buf, "taskq%d", (int)pri); error = _taskqueue_create(&tmp, buf, pri, IPL_VM, TASKQUEUE_PERCPU); if (error) return error; /* Try again, but we may have raced. */ mutex_enter(&taskqueues_lock); taskqueue = taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Nobody raced us. Commit tmp. */ KASSERT(taskqueues[i].refcnt == 0); taskqueue = taskqueues[i].taskqueue = tmp; tmp = NULL; } } /* Bump the reference count. */ if (taskqueues[i].refcnt == UINT_MAX) { mutex_exit(&taskqueues_lock); if (tmp != NULL) _taskqueue_destroy(tmp); return EBUSY; } taskqueues[i].refcnt++; mutex_exit(&taskqueues_lock); /* If we created tmp but didn't end up using it, destroy it. */ if (tmp != NULL) _taskqueue_destroy(tmp); KASSERT(taskqueue != NULL); *taskqueue_ret = taskqueue; return 0; } /* * taskqueue_put: Release a reference to a shared system taskqueue * obtained with taskqueue_get. If this is the last one, destroy it. * Do not pass a taskqueue created with taskqueue_create. May sleep. */ void taskqueue_put(struct taskqueue *taskqueue) { const size_t i = taskqueue_pri_index(taskqueue->tq_pri, PRI_COUNT); bool destroy = false; ASSERT_SLEEPABLE(); mutex_enter(&taskqueues_lock); KASSERT(taskqueues[i].taskqueue == taskqueue); KASSERT(0 < taskqueues[i].refcnt); if (--taskqueues[i].refcnt == 0) { taskqueues[i].taskqueue = NULL; destroy = true; } mutex_exit(&taskqueues_lock); if (destroy) _taskqueue_destroy(taskqueue); } /* Task queue creation */ /* * taskqueue_create: Create a task queue to run tasks at priority * task_pri which can be scheduled from IPL schedule_ipl. The task * queue must be destroyed with taskqueue_destroy -- do not pass it to * taskqueue_put. May sleep. * * For soft interrupt priorities, you must use taskqueue_get to get a * shared system task queue, not taskqueue_create. */ int taskqueue_create(struct taskqueue **taskqueuep, const char *name, pri_t task_pri, int schedule_ipl, int flags) { KASSERTMSG(((task_pri == PRI_NONE) || (task_pri <= MAXPRI_KTHREAD)), "priority too high for taskqueue_create(%s): %d", name, (int)task_pri); return _taskqueue_create(taskqueuep, name, task_pri, schedule_ipl, flags); } static int _taskqueue_create(struct taskqueue **taskqueuep, const char *name, pri_t task_pri, int schedule_ipl, int flags) { struct taskqueue *taskqueue; int task_ipl; int error; ASSERT_SLEEPABLE(); taskqueue = pool_get(&taskqueue_pool, PR_WAITOK); taskqueue->tq_flags = flags; taskqueue->tq_pri = task_pri; taskqueue->tq_ipl = schedule_ipl; taskqueue->tq_name = name; taskqueue->tq_percpu = percpu_alloc(sizeof(struct taskworker *)); if (taskqueue->tq_percpu == NULL) { error = ENOMEM; goto fail0; } KASSERT(MAXPRI_TASKQUEUE < task_pri); if (MINPRI_SOFTINT <= task_pri) { KASSERT(task_pri <= MAXPRI_SOFTINT); error = taskqueue_init_softint(taskqueue); } else { KASSERT(task_pri <= MAXPRI_KTHREAD); error = taskqueue_init_thread(taskqueue); } if (error) goto fail1; /* Success! */ *taskqueuep = taskqueue; return 0; fail1: percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); fail0: KASSERT(error); pool_put(&taskqueue_pool, taskqueue); return error; } static void taskqueue_init_softint(struct taskqueue *taskqueue) { int task_ipl; struct cpu_info *ci; CPU_INFO_ITERATOR cii; /* XXX This table should go somewhere else. */ if (PRI_SOFTSERIAL <= taskqueue->tq_pri) task_ipl = SOFTINT_SERIAL; else if (PRI_SOFTNET <= taskqueue->tq_pri) task_ipl = SOFTINT_NET; else if (PRI_SOFTBIO <= taskqueue->tq_pri) task_ipl = SOFTINT_BIO; else if (PRI_SOFTCLOCK <= taskqueue->tq_pri) task_ipl = SOFTINT_CLOCK; else panic("invalid softint-level priority: %d", (int)taskqueue->tq_pri); taskqueue->tq_u.softint.cookie = softint_establish((task_ipl | SOFTINT_MPSAFE), &taskworker_softintr, taskqueue); if (taskqueue->tq_u.softint.cookie == NULL) { error = ENOMEM; goto fail0; } for (CPU_INFO_FOREACH(cii, ci)) { struct taskworker_softint *const worker_softint = pool_get(&taskworker_softint_pool, PR_WAITOK); struct taskworker *const worker = &worker_softint->tws_worker; taskworker_init(worker, taskqueue->tq_name, taskqueue); taskworker_assign_cpu(worker, taskqueue, ci); } return 0; fail0: KASSERT(error); return error; } static void taskqueue_init_thread(struct taskqueue *taskqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; error = threadpool_percpu_get(&taskqueue->tq_u.thread.pool_percpu, taskqueue->tq_pri); if (error) goto fail0; for (CPU_INFO_FOREACH(cii, ci)) { struct taskworker_thread *const worker_thread = pool_get(&taskworker_thread_pool, PR_WAITOK); struct taskworker *const worker = &worker_thread->twt_worker; taskworker_init(worker, taskqueue->tq_name, taskqueue); threadpool_job_init(&worker_thread->twt_job, &taskqueue_job, &worker_thread->twt_worker.tw_lock, "%s/%u", taskqueue->tq_name, cpu_index(ci)); taskworker_assign_cpu(worker, taskqueue, ci); } return 0; fail0: KASSERT(error); return error; } /* Task queue destruction */ /* * taskqueue_destroy: Wait for all tasks on taskqueue to complete and * destroy it. The taskqueue must have been created with * taskqueue_create. May sleep. */ void taskqueue_destroy(struct taskqueue *taskqueue) { #if DIAGNOSTIC { const size_t i = taskqueue_pri_index(taskqueue->tq_pri); mutex_enter(&taskqueues_lock); KASSERTMSG((taskqueues[i].taskqueue != taskqueue), "taskqueue_destroy used with shared taskqueue at pri %d", (int)taskqueue->tq_pri); mutex_exit(&taskqueues_lock); } #endif _taskqueue_destroy(taskqueue); } static void _taskqueue_destroy(struct taskqueue *taskqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); /* Disestablish the softint first to ensure it has stopped running. */ if (taskqueue->tq_type == TASKQUEUE_SOFTINT) softint_disestablish(taskqueue->tq_u.softint.cookie); for (CPU_INFO_FOREACH(cii, ci)) { struct taskworker *const worker = taskworker_unassign_cpu(taskqueue, ci); /* Stop the thread job. */ if (taskqueue->tq_type == TASKQUEUE_THREAD) { struct taskworker_thread *const worker_thread = container_of(worker, struct taskworker_thread, twt_worker); struct threadpool_percpu *const pool_percpu = taskqueue->tq_u.thread.pool_percpu; struct threadpool *const pool = threadpool_percpu_ref_remote(pool_percpu, ci); mutex_enter(&worker->tw_lock); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DYING)); worker->tw_flags |= TASKWORKER_DYING; cv_broadcast(&worker->tw_cv); threadpool_cancel_job(pool, &worker_thread->twt_job); KASSERT(ISSET(worker->tw_flags, TASKWORKER_DYING)); worker->tw_flags &= ~TASKWORKER_DYING; mutex_exit(&worker->tw_lock); threadpool_job_destroy(&worker_thread->twt_job); } taskworker_destroy(worker); switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: pool_put(&taskworker_softint_pool, container_of(worker, struct taskworker_softint, tws_worker)); break; case TASKQUEUE_THREAD: pool_put(&taskworker_thread_pool, container_of(worker, struct taskworker_thread, twt_worker)); break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } } /* Jobs are now done, so we can drop the threadpool_percpu. */ if (taskqueue->tq_type == TASKQUEUE_THREAD) threadpool_percpu_put(taskqueue->tq_u.thread.pool_percpu, taskqueue->tq_pri); percpu_free(taskqueue->tq_percpu, sizeof(struct taskworker *)); pool_put(&taskqueue_pool, taskqueue); } /* Task worker initialization and destruction */ static void taskworker_init(struct taskworker *worker, const char *name, struct taskqueue *taskqueue) { mutex_init(&worker->tw_lock, MUTEX_DEFAULT, taskqueue->tq_ipl); cv_init(&worker->tw_cv, name); worker->tw_taskqueue = taskqueue; TAILQ_INIT(&worker->tw_tasks); worker->tw_current_task = NULL; worker->tw_flags = 0; } static void taskworker_destroy(struct taskworker *worker) { KASSERT(worker->tw_flags == 0); KASSERT(worker->tw_current_task == NULL); KASSERT(TAILQ_EMPTY(&worker->tw_tasks)); worker->tw_taskqueue = NULL; cv_destroy(&worker->tw_cv); mutex_destroy(&worker->tw_lock); } static void taskworker_assign_cpu(struct taskworker *worker, struct taskqueue *taskqueue, struct cpu_info *ci) { percpu_traverse_enter(); struct taskworker **const workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); KASSERT(*workerp == NULL); *workerp = worker; percpu_traverse_exit(); } static struct taskworker * taskworker_unassign_cpu(struct taskqueue *taskqueue, struct cpu_info *ci) { percpu_traverse_enter(); struct taskworker **const workerp = percpu_getptr_remote(taskqueue->tq_percpu, ci); struct taskworker *const worker = *workerp; KASSERT(worker != NULL); *workerp = NULL; percpu_traverse_exit(); return worker; } /* Task scheduling */ /* * task_schedule: Schedule task on the shared system low-priority task * queue. */ void task_schedule(struct task *task) { taskqueue_schedule(system_taskqueue, task); } /* * delayed_task_schedule: Schedule dt on the shared system low-priority * task queue with the specified delay. */ void delayed_task_schedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { taskqueue_schedule_delayed(system_taskqueue, dt, nsec, jitter); } /* * delayed_task_reschedule: Try to reschedule a delayed task to run the * specified time from now. It may be too late: the timeout may have * already fired. Return true if we successfully rescheduled, false if * it was too late. */ bool delayed_task_reschedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { struct taskworker *worker = dt->dt_task.task_worker; if (worker == NULL) /* It was not scheduled to begin with, so no reschedule. */ return false; mutex_enter(&worker->tw_lock); if (dt->dt_task.task_worker != worker) { /* Its worker already executed it, so too late. */ mutex_exit(&worker->tw_lock); return false; } if (worker->tw_current_task == &dt->dt_task) { /* The worker is already executing it, so too late. */ mutex_exit(&worker->tw_lock); return false; } if (callout_stop(&dt->dt_callout)) { /* The callout already fired, so too late. */ mutex_exit(&worker->tw_lock); return false; } /* We stopped the callout before it fired. Reschedule it. */ if (nsec == 0) { taskworker_maybe_switch(&worker, &dt->dt_task); TAILQ_INSERT_TAIL(&worker->tw_tasks, &dt->dt_task, task_entry); taskworker_schedule(worker); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } mutex_exit(&worker->tw_lock); return true; } /* * taskworker_maybe_switch: If *workerp is not the current CPU's * worker, switch task, which must currently be assigned to *workerp, * to the current CPU's worker instead, and store the current CPU's * worker in *workerp. Returns with *workerp's lock held, whether the * same worker as on input or the new worker. In the latter case, * taskworker_maybe_switch will have dropped the old worker's lock and * acquired the new worker's lock. * * This is needed so that it is on the queue that the softint handler * will examine when we softint_schedule, which triggers the softint on * the current CPU only. The alternative would be to use an xcall to * call softint_schedule on the remote CPU, but that requires sleeping, * which delayed_task_timeout and delayed_task_reschedule can't do. * Another alternative would be to implement callout_bind to force the * callout to run only on the correct CPU. * * This is necessary only before calling taskworker_schedule with a * worker that for a softint task queue and is not guaranteed to be the * current CPU's worker. This is the case only in delayed_task_timeout * and delayed_task_reschedule. For thread task queues or when the * worker is guaranteed to be the current CPU's worker, this is not * needed. percpu_getref disables preemption, and spin locks do not * yield, so taskqueue_schedule and taskqueue_schedule_delayed are in * no danger of switching CPUs while they're not looking. */ static void taskworker_maybe_switch(struct taskworker **workerp, struct task *task) { if ((*workerp)->tw_taskqueue->tq_type != TASKQUEUE_SOFTINT) return; KASSERT(kpreempt_disabled()); KASSERT(mutex_owned(&(*workerp)->tw_lock)); KASSERT(task->task_worker == *workerp); KASSERT((*workerp)->tw_current_task != task); /* Get the current CPU's worker. */ struct taskworker **const curcpu_workerp = percpu_getref((*workerp)->tw_taskqueue->tq_percpu); struct taskworker *const curcpu_worker = *curcpu_workerp; percpu_putref((*workerp)->tw_taskqueue->tq_percpu); KASSERT(curcpu_worker->tw_taskqueue == (*workerp)->tw_taskqueue); /* If *workerp is already it, we're good. */ if (__predict_true(curcpu_worker == *workerp)) return; /* Otherwise, atomically switch workers and change locks. */ struct taskworker *const worker0 __diagused = atomic_swap_ptr(&task->task_worker, curcpu_worker); KASSERT(worker0 == *workerp); mutex_exit(&(*workerp)->tw_lock); *workerp = curcpu_worker; mutex_enter(&(*workerp)->tw_lock); KASSERT(task->task_worker == *workerp); } /* * delayed_task_timedout: Return true if dt ran after nonzero delay. */ bool delayed_task_timedout(struct delayed_task *dt) { KASSERTMSG((dt->dt_task.task_worker != NULL), "delayed_task_timedout(%p) must be called only from task action", dt); KASSERTMSG((dt->dt_task.task_worker->tw_current_task == &dt->dt_task), "delayed_task_timedout(%p) must be called only from task action", dt); return callout_invoking(&dt->dt_callout); } /* * taskqueue_schedule: Schedule task to run on taskqueue. If task is * already scheduled, don't change it. */ void taskqueue_schedule(struct taskqueue *taskqueue, struct task *task) { struct taskworker *worker0; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; mutex_enter(&worker->tw_lock); /* Try to grab the task. */ while (__predict_false((worker0 = atomic_cas_ptr(&task->task_worker, NULL, worker)) != NULL)) { /* * Someone else got in first. Switch to the other * worker's lock. */ if (worker0 != worker) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (__predict_true(task->task_worker == worker0)) { /* * Nobody else got in while we were locking. * The task is either on worker0's queue or * being run by worker0. */ if ((worker0->tw_current_task == task) && !ISSET(worker0->tw_flags, TASKWORKER_REQUEUED)) { /* * It's already begun to run. Queue it * up to run again. */ worker0->tw_flags |= TASKWORKER_REQUEUED; TAILQ_INSERT_TAIL(&worker0->tw_tasks, task, task_entry); } mutex_exit(&worker0->tw_lock); goto out; } if (worker0 != worker) { mutex_exit(&worker0->tw_lock); mutex_enter(&worker->tw_lock); } } /* We grabbed it. Put it on the queue. */ TAILQ_INSERT_TAIL(&worker->tw_tasks, task, task_entry); taskworker_schedule(worker); mutex_exit(&worker->tw_lock); out: percpu_putref(taskqueue->tq_percpu); } /* * taskqueue_schedule_delayed: Schedule the delayed task dt to run on * taskqueue after the specified number of nanoseconds. If it is * already scheduled, don't change it. taskqueue must run at a * priority no higher than PRI_SOFTCLOCK. If the timeout is zero, * schedule the task to run without delay. */ void taskqueue_schedule_delayed(struct taskqueue *taskqueue, struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { struct taskworker *worker0; KASSERTMSG((taskqueue->tq_pri <= PRI_SOFTCLOCK), "taskqueue %p priority too high for delayed task %p: %d", taskqueue, dt, (int)taskqueue->tq_pri); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; mutex_enter(&worker->tw_lock); while (__predict_false((worker0 = atomic_cas_ptr(&dt->dt_task.task_worker, NULL, worker)) != NULL)) { if (worker != worker0) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (__predict_true(dt->dt_task.task_worker == worker0)) { if ((worker0->tw_current_task == &dt->dt_task) && !ISSET(worker0->tw_flags, TASKWORKER_REQUEUED)) { worker0->tw_flags |= TASKWORKER_REQUEUED; if (nsec == 0) { worker0->tw_flags |= TASKWORKER_CALLOUT_ACK; TAILQ_INSERT_TAIL(&worker0->tw_tasks, &dt->dt_task, task_entry); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } } mutex_exit(&worker0->tw_lock); goto out; } if (worker != worker0) { mutex_exit(&worker0->tw_lock); mutex_enter(&worker->tw_lock); } } if (nsec == 0) { /* Clear any prior CALLOUT_INVOKING flag first. */ callout_ack(&dt->dt_callout); TAILQ_INSERT_TAIL(&worker->tw_tasks, &dt->dt_task, task_entry); taskworker_schedule(worker); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } mutex_exit(&worker->tw_lock); out: percpu_putref(taskqueue->tq_percpu); } static int nstohz(nsec_t nsec, jitter_t jitter) { /* * XXX Temporary stub until we get proper nanosecond-resolution * timer APIs. */ (void)jitter; return mstohz(nsec / 1000000u); } /* Task cancellation */ /* * task_cancel: Cancel task. If it has already begun, wait for it to * complete (or to call task_done). May drop interlock if it is * necessary to wait. May sleep. * * Returns true if task was scheduled and we prevented it from running. * Returns false if the task was not scheduled or had already begun. */ bool task_cancel(struct task *task, kmutex_t *interlock) { struct taskworker *const worker = task->task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); /* If it's not scheduled, it never ran. */ if (worker == NULL) return true; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == task) { mutex_exit(interlock); while ((worker->tw_current_task == task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* Got it before it ran! Remove it from the queue. */ TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* * task_cancel_async: Like task_cancel, but returns immediately even if * the task is in flight. Hence there is no need for an interlock. */ bool task_cancel_async(struct task *task) { struct taskworker *worker = task->task_worker; KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (worker->tw_current_task == task) { mutex_exit(&worker->tw_lock); return false; } TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* Delayed task cancellation */ /* * delayed_task_cancel: Cancel the delayed task dt. If it has already * begun, wait for it to complete (or to call task_done). May drop * interlock if it is necessary to wait, either for the callout to * complete or for the task to complete. May sleep. * * Returns true if dt was scheduled and we prevented it from running. * Returns false if dt was not scheduled or had already begun. */ bool delayed_task_cancel(struct delayed_task *dt, kmutex_t *interlock) { struct taskworker *worker = dt->dt_task.task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); /* If it's not scheduled, we can't cancel it. */ if (worker == NULL) return false; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (dt->dt_task.task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == &dt->dt_task) { mutex_exit(interlock); while ((worker->tw_current_task == &dt->dt_task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* * Try to cancel it without dropping the interlock. This is an * optimization; we don't need to do it. */ if (!callout_stop(&dt->dt_callout)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); return true; } /* * We have to drop the interlock here because we need to drop * the interlock and the worker lock in order to sleep if the * callout has fired, but callout_halt can't drop two locks. * * XXX An alternative would be to have a caller-supplied * interlock stored in the delayed task structure. That would * simplify the logic in scheduling delayed tasks, too. * However, it would complicate the API for callers that don't * need interlocking in task_cancel (XXX are there any, * really?). */ mutex_exit(interlock); /* Try to cancel the callout. */ if (!callout_halt(&dt->dt_callout, &worker->tw_lock)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; } /* It already fired. Try to cancel the task. */ if (worker->tw_current_task == &dt->dt_task) { /* The task is already running. Wait to complete. */ while ((worker->tw_current_task == &dt->dt_task) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* The task has not yet run. Don't let it. */ TAILQ_REMOVE(&worker->tw_tasks, &dt->dt_task, task_entry); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; } /* * delayed_task_cancel_async: Like delayed_task_cancel, but returns * immediately even if the task is in flight. Hence there is no need * for an interlock. */ bool delayed_task_cancel_async(struct delayed_task *dt) { struct taskworker *worker = dt->dt_task.task_worker; KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (dt->dt_task.task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (!callout_stop(&dt->dt_callout)) { mutex_exit(&worker->tw_lock); return true; } if (worker->tw_current_task != &dt->dt_task) { TAILQ_REMOVE(&worker->tw_tasks, &dt->dt_task, task_entry); mutex_exit(&worker->tw_lock); return true; } mutex_exit(&worker->tw_lock); return false; } /* Worker scheduling */ /* * taskworker_schedule: Make sure worker is about to run any queued * tasks. */ static void taskworker_schedule(struct taskworker *worker) { struct taskqueue *const taskqueue = worker->tw_taskqueue; KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(kpreempt_disabled()); #if DIAGNOSTIC { struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); KASSERT(worker == *workerp); percpu_putref(taskqueue->tq_percpu); } #endif switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: softint_schedule(taskqueue->tq_u.softint.cookie); break; case TASKQUEUE_THREAD: { struct taskworker_thread *const worker_thread = container_of(worker, struct taskworker_thread, twt_worker); struct threadpool *const pool = threadpool_percpu_ref(taskqueue->tq_u.thread.pool_percpu); threadpool_schedule_job(pool, &worker_thread->twt_job); break; } default: panic("taskqueue %p has invalid type: %d", worker->tw_taskqueue, (int)worker->tw_taskqueue->tq_type); } } /* Task execution */ /* * taskworker_run_1: Pick one task off queue and execute it. To be * called from a worker thread or softint handler. */ static void taskworker_run_1(struct taskworker *worker, struct task_head *queue) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(!TAILQ_EMPTY(queue)); /* Grab the task from the queue. */ struct task *const task = TAILQ_FIRST(queue); TAILQ_REMOVE(queue, task, task_entry); /* We had better not be working on anything yet. */ KASSERT(worker->tw_current_task == NULL); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)); /* Mark it current and run it. */ worker->tw_current_task = task; mutex_exit(&worker->tw_lock); (*task->task_fn)(task); mutex_enter(&worker->tw_lock); KASSERT(worker->tw_current_task == task); /* Mark it done. */ if (!ISSET(worker->tw_flags, TASKWORKER_DONE)) taskworker_done(worker, task); /* Can't touch task after this. */ KASSERT(ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_CALLOUT_ACK)); /* Clear the flags related to this task. */ worker->tw_flags &= ~(TASKWORKER_DONE | TASKWORKER_REQUEUED | TASKWORKER_CALLOUT_ACK); } /* * taskworker_done: Mark task done. The worker will not touch it after * this, and the task can be executed again if scheduled afterward. If * someone had already requeued the task, acknowledge this; otherwise, * disassociate the task from the worker. */ static void taskworker_done(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(worker->tw_current_task == task); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(task->task_worker == worker); /* XXX KLUDGE */ if (ISSET(worker->tw_flags, TASKWORKER_CALLOUT_ACK)) { struct delayed_task *const dt = container_of(task, struct delayed_task, dt_task); /* Clear the CALLOUT_INVOKING flag before it runs again. */ callout_ack(&dt->dt_callout); worker->tw_flags &= ~TASKWORKER_CALLOUT_ACK; } if (ISSET(worker->tw_flags, TASKWORKER_REQUEUED)) { worker->tw_flags &= ~TASKWORKER_REQUEUED; } else { struct taskworker *const worker0 __diagused = atomic_swap_ptr(&task->task_worker, NULL); /* Can't touch task after this. Notify task_cancel. */ KASSERT(worker0 == worker); worker->tw_current_task = NULL; cv_broadcast(&worker->tw_cv); } worker->tw_flags |= TASKWORKER_DONE; } /* Worker soft interrupt handler and threadpool job */ static void taskworker_softintr(void *arg) { struct taskqueue *const taskqueue = arg; struct task_head queue = TAILQ_HEAD_INITIALIZER(queue); struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; percpu_putref(taskqueue->tq_percpu); /* * Grab a batch and run it. We block other softints at the * same priority, so if any new tasks are scheduled, we'll * handle them in the next batch, in order to let other * softints get a chance to run. */ mutex_enter(&worker->tw_lock); TAILQ_CONCAT(&queue, &worker->tw_tasks, task_entry); while (!TAILQ_EMPTY(&queue)) taskworker_run_1(worker, &queue); mutex_exit(&worker->tw_lock); } static void taskworker_job(struct threadpool_job *job) { struct taskworker_thread *const worker_thread = container_of(job, struct taskworker_thread, twt_job); struct taskworker *const worker = &worker_thread->twt_worker; struct taskqueue *const taskqueue = worker->tw_taskqueue; mutex_enter(&worker->tw_lock); for (;;) { /* * Wait until we have a task, or give up if nobody's * handed us any after a little while. */ while (__predict_false(TAILQ_EMPTY(&worker->tw_tasks))) { if (ISSET(worker->tw_flags, TASKWORKER_DYING)) goto out; if (cv_timedwait(&worker->tw_cv, &worker->tw_lock, TASKWORKER_IDLE_TICKS)) { if (TAILQ_EMPTY(&worker->tw_tasks)) goto out; } } /* * Run one task at a time. Preemption is allowed while * the task is running, so there's no need to process * tasks in batches like for softints. */ while (!TAILQ_EMPTY(&worker->tw_tasks)) taskworker_run_1(worker, &worker->tw_tasks); } out: KASSERT(TAILQ_EMPTY(&worker->tw_tasks)); threadpool_job_done(job); mutex_exit(&worker->tw_lock); } /* Delayed task callout handler */ void delayed_task_timeout(void *arg) { struct delayed_task *const dt = arg; struct taskworker *worker = dt->dt_task.task_worker; /* We had better have been assigned a worker. */ KASSERT(worker != NULL); mutex_enter(&worker->tw_lock); /* We had better still be on this worker. */ KASSERT(dt->dt_task.task_worker == worker); if (worker->tw_current_task == &dt->dt_task) { /* * The task was already running and the timeout * triggered because it was scheduled again. Just put * it on the end of the queue and mark it requeued. */ if (!ISSET(worker->tw_flags, TASKWORKER_REQUEUED)) worker->tw_flags |= TASKWORKER_REQUEUED; TAILQ_INSERT_TAIL(&worker->tw_tasks, &dt->dt_task, task_entry); } else { /* * Make sure we're putting it on the current worker's * CPU, and schedule it anew. */ taskworker_maybe_switch(&worker, &dt->dt_task); TAILQ_INSERT_TAIL(&worker->tw_tasks, &dt->dt_task, task_entry); taskworker_schedule(worker); } mutex_exit(&worker->tw_lock); } /* Draining task queues */ struct drain { unsigned int d_count; struct taskworker d_worker; struct drain_task *d_tasks; }; struct drain_task { struct task drt_task; struct drain *drt_drain; }; void taskqueue_drain(struct taskqueue *taskqueue) { xcfunc_t xcfunc; struct drain drain; /* We are going to sleep big-time. */ ASSERT_SLEEPABLE(); /* Set up some temporary state for draining. */ drain.d_count = ncpu; taskworker_init(&drain.d_worker, "tqdrain", taskqueue); CTASSERT(MAXCPUS <= (SIZE_MAX / sizeof(drain.d_tasks[0]))); /* XXX Allocation to destroy is sketchy... */ drain.d_tasks = kmem_alloc((sizeof(drain.d_tasks[0]) * ncpu), KM_SLEEP); /* Do the per-CPU draining setup. */ switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: xcfunc = &taskqueue_drain_softint_xc; break; case TASKQUEUE_THREAD: xcfunc = &taskqueue_drain_thread_xc; break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } xc_wait(xc_broadcast(0, xcfunc, taskqueue, &drain)); mutex_enter(&drain.d_worker.tw_lock); /* Wait for the tasks or xcalls running on other CPUs to complete. */ while (0 < drain.d_count) cv_wait(&drain.d_worker.tw_cv, &drain.d_worker.tw_lock); /* Run all the tasks that couldn't run on other CPUs. */ while (!TAILQ_EMPTY(&drain.d_worker.tw_tasks)) taskworker_run_1(&drain.d_worker, &drain.d_worker.tw_tasks); mutex_exit(&drain.d_worker.tw_lock); /* Nuke the temporary drain state. */ kmem_free(&drain.d_tasks, (sizeof(drain.d_tasks[0]) * ncpu)); taskworker_destroy(&drain.d_worker); KASSERT(drain.d_count == 0); } static void drain_task(struct task *task) { struct drain_task *const drt = container_of(task, struct drain_task, drt_task); mutex_enter(&drt->drt_drain->d_worker.tw_lock); if (--drt->drt_drain->d_count == 0) cv_broadcast(&drt->drt_drain->d_worker.tw_cv); mutex_exit(&drt->drt_drain->d_worker.tw_lock); } static void taskqueue_drain_softint_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const drt = &drain->d_tasks[cpu_index(curcpu())]; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; /* * Just schedule the drain task -- softints are supposed to be * quick, are not allowed to sleep, and are not preemptive with * one another, so there is no point in pooling them. */ task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; TAILQ_INSERT_TAIL(&worker->tw_tasks, &drt->drt_task, task_entry); taskworker_schedule(worker); percpu_putref(taskqueue->tq_percpu); } static void taskqueue_drain_thread_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; struct drain_task *const drt = &drain->d_tasks[cpu_index(curcpu())]; struct task *task, *next; struct taskworker **const workerp = percpu_getref(taskqueue->tq_percpu); struct taskworker *const worker = *workerp; percpu_putref(taskqueue->tq_percpu); struct taskworker_thread *const worker_thread = container_of(worker, struct taskworker_thread, twt_worker); struct threadpool *const pool = threadpool_percpu_ref(taskqueue->tq_u.thread.pool_percpu); mutex_enter(&worker->tw_lock); if (threadpool_cancel_job_async(pool, &worker_thread->twt_job)) { /* Cancelled. Move all the tasks to our drain worker. */ mutex_enter(&drain->d_worker.tw_lock); TAILQ_FOREACH_SAFE(task, &worker->tw_tasks, task_entry, next) { TAILQ_REMOVE(&worker->tw_tasks, task, task_entry); TAILQ_INSERT_TAIL(&drain->d_worker.tw_tasks, task, task_entry); } if (--drain->d_count == 0) cv_broadcast(&drain->d_worker.tw_cv); mutex_exit(&drain->d_worker.tw_lock); } else { /* Job is running. Let it run the drain task. */ struct taskworker *const worker0 __diagused = atomic_swap_ptr(&drt->drt_task.task_worker, worker); KASSERT(worker0 == NULL); KASSERT(worker->tw_current_task != &drt->drt_task); task_init(&drt->drt_task, &drain_task); drt->drt_drain = drain; TAILQ_INSERT_TAIL(&worker->tw_tasks, &drt->drt_task, task_entry); /* No need to call taskworker_schedule -- already running. */ } mutex_exit(&worker->tw_lock); }