/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code does not run! I have not even compile-tested it. It is a * draft of an idea. See below the copyright notice for a summary. */ /*- * Copyright (c) 2014 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /* * Tasks * * A /task/ is an action to be executed once asynchronously. Tasks are * cheap: each one is four pointers long, initializing them is quick, * and scheduling them requires constant time and writing only to * per-CPU memory. * * Tasks are executed by putting them on /task queues/ to be executed * by workers, either in soft interrupt context or in thread context, * and at any priority level, depending on the task queue. To choose a * task queue, either get one of the shared per-CPU system ones, * * struct taskqueue *taskqueue; * * error = taskqueue_get(&taskqueue, PRI_SOFTNET); * if (error) * goto fail; * ... * taskqueue_put(taskqueue); * * or create your own, for interrupt handlers running at IPL_BIO to run * tasks at priority PRI_NONE in a per-CPU worker thread: * * error = taskqueue_create(&taskqueue, "mytaskq", PRI_NONE, IPL_BIO, * TASKQUEUE_PERCPU); * if (error) * goto fail; * ... * taskqueue_destroy(taskqueue); * * (If you pass 0 instead of TASKQUEUE_PERCPU, there will be only one * global worker thread, which is all that most drivers need, rather * than a set of per-CPU worker threads.) * * Once you have a task queue, and you have allocated storage for a * struct task, you can execute your task on the task queue by * initializing and scheduling the task: * * task_init(task, &my_task_action); * task_schedule(taskqueue, task); * * static void * my_task_action(struct task *task) * { * printf("my task ran!\n"); * } * * task_init and task_schedule never fail, and you can use them in any * context, including hard interrupt context, as long as the interrupt * priority level is not above the one used to create the task queue -- * IPL_BIO in the taskqueue_create example above, or IPL_VM for all the * shared system task queues. * * If you schedule a task multiple times before it runs, it will be * executed only once. If you schedule a task again after it has begun * executing, it will not begin executing again until its action has * either returned or chosen to call task_done. * * Once you are done with a task, and you have made sure it has * finished executing, you must destroy it with task_destroy: * * task_destroy(task); * * If you're not sure whether a task is scheduled or not, or whether it * has finished executing or not, you can cancel it and wait for it to * complete with task_cancel. There are two tricky details about task * cancellation: * * 1. The task might need a lock that the caller of task_cancel holds. * In that case, you must pass the lock to task_cancel so that it * can drop the lock before waiting for the task. * * 2. The task might be responsible for releasing a resource, even a * resource such as the memory containing its struct task. In that * case, if the task was about to run but is cancelled, task_cancel * returns true to indicate the caller must take responsibility for * the resource. Otherwise, task_cancel returns false. * * The following contrived example illustrates a pattern that might * arise in a device driver using tasks. Since the task action frees * the struct task, it must first call task_done before returning; * otherwise the worker running the task will continue to use the newly * freed memory. * * static void * my_driver_action(struct task *task) * { * printf("my driver's task ran!\n"); * mutex_enter(&sc->sc_lock); * KASSERT(sc->sc_curtask == task); * sc->sc_curtask = NULL; * mutex_exit(&sc->sc_lock); * task_done(task); * task_destroy(task); * kmem_free(task, sizeof(*task)); * } * * // Set up a task, if we need one. * struct task *tmp = kmem_alloc(sizeof(*task), KM_SLEEP); * mutex_enter(&sc->sc_lock); * if (sc->sc_curtask == NULL) { * sc->sc_curtask = tmp; * tmp = NULL; * task_init(task, &my_task_action); * task_schedule(taskqueue, &sc->sc_task); * } * mutex_exit(&sc->sc_lock); * if (tmp != NULL) * kmem_free(tmp, sizeof(*tmp)); * * ... * * // Cancel the task, if there is one. * struct task *task = NULL; * mutex_enter(&sc->sc_lock); * if (sc->sc_curtask != NULL) { * if (task_cancel(sc->sc_curtask, &sc->sc_lock)) { * // We cancelled it, so we have to clean it up. * task = sc->sc_curtask; * sc->sc_curtask = NULL; * } * } * mutex_exit(&sc->sc_lock); * task_destroy(task); * kmem_free(task, sizeof(*task)); * * If you haven't kept track of all your driver's tasks, but your * device is detaching and you used a shared system task queue, instead * of cancelling them all you can wait for them to complete by draining * the task queue with taskqueue_drain. * * Draining means waiting for any other tasks anyone else sharing the * task queue has scheduled on it, so tasks on the shared system task * queues are not allowed long sleeps! Nevertheless, taskqueue_drain * may take a long time simply because it must wait for all tasks on * the queue on every CPU to complete. * * static void * mydev_attach(device_t self, cfdata_t match, void *aux) * { * struct mydev_softc *sc = device_private(self); * int error; * ... * error = taskqueue_get(&sc->sc_taskqueue, PRI_SOFTNET); * if (error) { * sc->sc_taskqueue = NULL; * aprint_error_dev(self, "unable to get softnet taskqueue: %d\n", * error); * goto fail; * } * ... * } * * static void * mydev_detach(device_t self) * { * struct mydev_softc *sc = device_private(self); * ... * if (sc->sc_taskqueue != NULL) { * taskqueue_drain(sc->sc_taskqueue); * taskqueue_put(sc->sc_taskqueue); * sc->sc_taskqueue = NULL; * } * } * * === Appendix * * Tasks are meant to replace most applications of workqueues, * callouts, and softints. However, that doesn't mean that those * abstractions will go away. This code itself is an application of * callouts and softints, and while scheduling a task is more flexible * than queueing work, the latter is even lighter-weight, so * applications not needing the extra flexibility may reasonably stay * with workqueues. * * The task abstraction was influenced by Linux workqueues, FreeBSD * taskqueues, NetBSD workqueues/callouts/softints, and design notes * for kconts (`kernel continuations'). There are a few more * operations not mentioned in this little tutorial, including delayed * tasks; for details, see the comments below, above each routine. */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include TAILQ_HEAD(task_head, task); struct taskqueue { void *tq_worker_array; unsigned int tq_nworkers; union { struct { void *cookie; } softint; } tq_u; enum taskqueue_type { TASKQUEUE_THREAD, TASKQUEUE_SOFTINT, } tq_type; int tq_flags; pri_t tq_pri; }; struct taskworker { kmutex_t tw_lock; struct taskqueue *tw_taskqueue; union { struct { kcondvar_t run_cv; struct lwp *lwp; } thread; } tw_u; kcondvar_t tw_done_cv; struct task_head tw_queue; struct task *tw_current_task; int tw_flags; #define TASKWORKER_DONE 0x01 #define TASKWORKER_RESCHEDULED 0x02 #define TASKWORKER_EXIT 0x04 }; static int taskqueue_create_softint(struct taskqueue **, const char *, pri_t, int, int, int); static int taskqueue_create_thread(struct taskqueue **, const char *, pri_t, int, int); static int taskqueue_alloc_workers(struct taskqueue *, unsigned int); static void taskqueue_free_workers(struct taskqueue *); static struct taskqueue_worker * taskqueue_worker(struct taskqueue *, unsigned int); static struct taskqueue_worker * taskqueue_current_worker(struct taskqueue *); static void taskworker_init(struct taskworker *, struct taskqueue **, const char *, int); static void taskworker_destroy(struct taskworker *); static void taskworker_schedule(struct taskworker *, struct task *); static void taskworker_enqueue(struct taskworker *, struct task *); static void taskworker_schedule_after_ticks(struct taskworker *, struct delayed_task *, int); static void taskworker_run_1(struct taskworker *, struct task_head *); static void taskworker_done(struct taskworker *, struct task *); static void taskworker_thread(void *) __dead; static void taskworker_softintr(void *); static void task_error(struct task *); /* Initialization and shared system task queues */ static pool_cache_t taskqueues_pool; static kmutex_t system_taskqueues_lock __cacheline_aligned; static struct { struct taskqueue *tq; unsigned int refcnt; } system_taskqueues[PRI_COUNT + 1]; /* * tasks_init: Initialize the task subsystem. */ void tasks_init(void) { taskqueues_pool = pool_cache_init(sizeof(struct taskqueue), 0, 0, 0, "taskqueues", NULL, IPL_NONE, NULL, NULL, NULL); mutex_init(&system_taskqueues_lock, MUTEX_DEFAULT, IPL_NONE); } /* * taskqueue_put: Release a reference to a shared system taskqueue * obtained with taskqueue_get. If this is the last one, destroy it. * Do not pass a taskqueue created with taskqueue_create. May sleep. */ void taskqueue_put(struct taskqueue *taskqueue) { const unsigned int i = (taskqueue->tq_pri == PRI_NONE? PRI_COUNT : taskqueue->tq_pri); bool destroy = false; ASSERT_SLEEPABLE(); mutex_enter(&system_taskqueues_lock); KASSERT(system_taskqueues[i].taskqueue == taskqueue); if (--system_taskqueues[i].refcnt == 0) { system_taskqueues[i].taskqueue = NULL; destroy = true; } mutex_exit(&system_taskqueues_lock); if (destroy) taskqueue_destroy(taskqueue); } /* * taskqueue_get: Obtain a reference to a shared system task queue * running at the specified priority. The resulting task queue can be * used at any IPL up to and including IPL_VM. it must be released * with taskqueue_put -- do not pass it to taskqueue_destroy. May * sleep. */ int taskqueue_get(struct taskqueue **taskqueue_ret, pri_t pri) { unsigned int i; char buf[9]; struct taskqueue *taskqueue, *tmp = NULL; int error; ASSERT_SLEEPABLE(); if (pri == PRI_NONE) i = PRI_COUNT; else if ((pri < 0) || (PRI_COUNT <= pri)) return EINVAL; else i = pri; /* Try to get it. */ mutex_enter(&system_taskqueues_lock); taskqueue = system_taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Not there. Drop the lock to create a new one. */ KASSERT(system_taskqueues[i].refcnt == 0); mutex_exit(&system_taskqueues_lock); /* Three digits will fit in the buffer. */ (void)snprintf(buf, sizeof buf, "taskq%u", (unsigned)pri); error = taskqueue_create(&tmp, buf, pri, IPL_VM, TASKQUEUE_PERCPU); if (error) return error; /* Try again, but we may have raced. */ mutex_enter(&system_taskqueues_lock); taskqueue = system_taskqueues[i].taskqueue; if (taskqueue == NULL) { /* Nobody raced us. Commit tmp. */ KASSERT(system_taskqueues[i].refcnt == 0); taskqueue = system_taskqueues[i].taskqueue = tmp; tmp = NULL; } } /* Bump the reference count. */ if (system_taskqueues[i].refcnt == UINT_MAX) { mutex_exit(&system_taskqueues_lock); if (tmp != NULL) taskqueue_destroy(tmp); return EBUSY; } system_taskqueues[i].refcnt++; mutex_exit(&system_taskqueues_lock); /* If we created tmp but didn't end up using it, destroy it. */ if (tmp != NULL) taskqueue_destroy(tmp); KASSERT(taskqueue != NULL); *taskqueue_ret = taskqueue; return 0; } /* Taskqueue creation and destruction */ /* * taskqueue_create: Create a taskqueue to run tasks at priority * task_pri which can be scheduled from IPL schedule_ipl. If task_pri * is a softint priority, flags must have TASKQUEUE_PERCPU -- softints * are always per-CPU. Must be destroyed with taskqueue_destroy -- do * not pass this to taskqueue_put. May sleep. */ int taskqueue_create(struct taskqueue **taskqueue_ret, const char *name, pri_t task_pri, int schedule_ipl, int flags) { int task_ipl; ASSERT_SLEEPABLE(); /* XXX Kludge! This table should go elsewhere. */ if (MAXPRI_KERNEL_RT < task_pri) { return EINVAL; } else if (PRI_SOFTSERIAL <= task_pri) { task_ipl = SOFTINT_SERIAL; } else if (PRI_SOFTNET <= task_pri) { task_ipl = SOFTINT_NET; } else if (PRI_SOFTBIO <= task_pri) { task_ipl = SOFTINT_BIO; } else { /* Not softint at all. Use a thread. */ return taskqueue_create_thread(taskqueue_ret, name, task_pri, schedule_ipl, flags); } return taskqueue_create_softint(taskqueue_ret, name, task_pri, task_ipl, schedule_ipl, flags); } /* * taskqueue_destroy: Wait for all tasks on taskqueue to complete and * destroy it. The taskqueue must have been created with * taskqueue_create. May sleep. */ void taskqueue_destroy(struct taskqueue *taskqueue) { unsigned int i; ASSERT_SLEEPABLE(); switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: softint_disestablish(taskqueue->tq_u.softint.cookie); break; case TASKQUEUE_THREAD: break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } for (i = 0; i < taskqueue->tq_nworkers; i++) taskworker_destroy(taskqueue_worker(taskqueue, i)); taskqueue_free_workers(taskqueue); pool_cache_put(&taskqueue_pool, taskqueue); } /* * taskqueue_create_softint: Create a taskqueue to run tasks in softint * context. Tasks can be scheduled at IPL schedule_ipl and will be run * at softint IPL task_ipl, which must be one of the softint priority * levels SOFTINT_CLOCK, SOFTINT_BIO, SOFTINT_NET, or SOFTINT_SERIAL. * Flags must be TASKQUEUE_PERCPU -- softints are always per-CPU. */ static int taskqueue_create_softint(struct taskqueue **taskqueue_ret, const char *name, pri_t task_pri, int task_ipl, int schedule_ipl, int flags) { struct taskqueue *taskqueue; const unsigned int nworkers = ncpu; unsigned int i; int error; KASSERT(ISSET(flags, TASKQUEUE_PERCPU)); KASSERT(!ISSET(flags, ~TASKQUEUE_PERCPU)); taskqueue = pool_cache_get(&taskqueue_pool, PR_WAITOK); taskqueue->tq_type = TASKQUEUE_SOFTINT; taskqueue->tq_flags = flags; taskqueue->tq_pri = task_pri; error = taskqueue_alloc_workers(taskqueue, nworkers); if (error) goto fail0; taskqueue->tq_u.softint.cookie = softint_establish( (task_ipl | SOFTINT_MPSAFE), &taskworker_softintr, taskqueue); if (taskqueue->tq_u.softint.cookie == NULL) { error = ENOMEM; goto fail1; } for (i = 0; i < nworkers; i++) taskworker_init(taskqueue_worker(taskqueue, i), taskqueue, name, schedule_ipl); /* Success! */ *taskqueue_ret = taskqueue; return 0; fail1: taskqueue_free_workers(taskqueue); fail0: pool_cache_put(&taskqueue_pool, taskqueue); return error; } /* * taskqueue_create_thread: Create a taskqueue to run tasks in thread * context. Tasks can be scheduled at IPL schedule_ipl and will be run * at priority task_pri. If flags has TASKQUEUE_PERCPU set, one thread * will be created for each CPU and bound to that CPU; otherwise, only * one thread will be created, not bound to any CPU. */ static int taskqueue_create_thread(struct taskqueue **taskqueue_ret, const char *name, pri_t task_pri, int schedule_ipl, int flags) { struct taskqueue *taskqueue; const unsigned int nworkers = ISSET(flags, TASKQUEUE_PERCPU)? ncpu : 1; struct cpu_info *ci; CPU_INFO_ITERATOR *cii; unsigned int i, j; int error; KASSERT(!ISSET(flags, ~TASKQUEUE_PERCPU)); taskqueue = pool_cache_get(&taskqueue_pool, PR_WAITOK); taskqueue->tq_type = TASKQUEUE_THREAD; taskqueue->tq_flags = flags; taskqueue->tq_pri = task_pri; error = taskqueue_alloc_workers(taskqueue, nworkers); if (error) goto fail0; if (ISSET(flags, TASKQUEUE_PERCPU)) { for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) { struct taskworker *const worker = taskqueue_worker(taskqueue, cpu_index(ci)); taskworker_init(worker, taskqueue, name, schedule_ipl); worker->tw_u.thread.lwp = NULL; /* XXX KTHREAD_TS? Workqueue(9) uses it. */ error = kthread_create(task_pri, KTHREAD_MPSAFE, ci, &taskworker_thread, worker, &worker->tw_u.thread.lwp, "%s/%u", name, cpu_index(ci)); if (error) goto fail2; } } else { struct taskworker *const worker = taskqueue_worker(taskqueue, 0); taskworker_init(worker, taskqueue, name, schedule_ipl); worker->tw_u.thread.lwp = NULL; /* XXX KTHREAD_TS? Workqueue(9) uses it. */ error = kthread_create(task_pri, KTHREAD_MPSAFE, NULL, &taskworker_thread, worker, &worker->tw_u.thread.lwp, "%s", name); if (error) goto fail1; } fail2: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) { if (i <= j) break; taskworker_destroy(taskqueue_worker(taskqueue, cpu_index(ci))); } fail1: taskqueue_free_workers(taskqueue); fail0: pool_cache_put(&taskqueue_pool, taskqueue); return error; } /* Draining task queues */ struct drain { kmutex_t d_lock; kcondvar_t d_cv; unsigned int d_count; }; struct drain_task { struct task dt_task; struct drain *dt_drain; }; /* * taskqueue_drain: Wait for all tasks currently scheduled on taskqueue * to complete. This excludes delayed tasks -- you are responsible for * cancelling them. May sleep. */ void taskqueue_drain(struct taskqueue *taskqueue) { struct drain drain; ASSERT_SLEEPABLE(); mutex_init(&drain->d_lock, MUTEX_DEFAULT, IPL_VM); cv_init(&drain->d_cv, "tqdrain"); drain->d_count = taskqueue->tq_nworkers; switch (taskqueue->tq_type) { case TASKQUEUE_SOFTINT: KASSERT(taskqueue->tq_nworkers == ncpu); /* * Send a low-priority xcall to schedule a task on * every CPU's worker. */ xc_wait(xc_broadcast(0, &taskqueue_drain_xc, taskqueue, &drain)); break; case TASKQUEUE_THREAD: { unsigned int i; for (i = 0; i < taskqueue->tq_nworkers; i++) taskworker_drain(taskqueue_worker(taskqueue, i), &drain); break; } default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } mutex_spin_enter(&drain->d_lock); while (0 < drain->d_count) cv_wait(&drain->d_cv, &drain->d_lock); mutex_spin_exit(&drain->d_lock); KASSERT(drain->d_count == 0); cv_destroy(&drain->d_cv); mutex_destroy(&drain->d_lock); } /* * taskqueue_drain_xc: Drain the current CPU's worker for taskqueue, * reporting to drain. */ static void taskqueue_drain_xc(void *taskqueue_v, void *drain_v) { struct taskqueue *const taskqueue = taskqueue_v; struct drain *const drain = drain_v; /* * We kpreempt_disable to appease taskworker_current_worker, * which kasserts that preemption is disabled so that curcpu() * is stable. This is not necessary: the xcall mechanism * guarantees that this code runs bound to a CPU, so curcpu() * is stable even though we may be preempted. I don't know a * clearer way to kassert that curcpu() is stable, though. */ kpreempt_disable(); taskworker_drain(taskqueue_current_worker(taskqueue), drain); kpreempt_enable(); } /* * taskworker_drain: Schedule a task for worker to execute to report to * drain that all tasks until it have drained. */ static void taskworker_drain(struct taskworker *worker, struct drain *drain) { struct drain_task dt; task_init(&dt->dt_task, &drain_task); dt->dt_drain = drain; mutex_enter(&worker->tw_lock); taskworker_enqueue(worker, &dt->dt_task); mutex_exit(&worker->tw_lock); } /* * drain_task: Count down and signal to taskqueue_drain if we hit zero. */ static void drain_task(struct task *task) { struct drain_task *const dt = container_of(task, struct drain_task, dt_task); mutex_enter(&dt->dt_drain->d_lock); if (--dt->dt_drain->d_count == 0) cv_signal(&dt->dt_drain->d_cv); mutex_exit(&dt->dt_drain->d_lock); } /* Taskqueue worker array */ /* * We store a task queue's workers in an array aligned to cache lines, * so that each worker's state lives in a separate cache line. The * taskworker structure may span multiple cache lines (and it will on * 64-bit systems), but no cache line will be shared by two workers. */ /* * taskqueue_alloc_workers: Allocate taskqueue's array of workers. */ static int taskqueue_alloc_workers(struct taskqueue *taskqueue, unsigned int nworkers) { if (nworkers > (SIZE_MAX / roundup2(sizeof(struct taskworker), coherency_unit))) return ENOMEM; taskqueue->tq_worker_array = kmem_alloc((nworkers * roundup2(sizeof(struct taskworker), coherency_unit)), KM_SLEEP); taskqueue->tq_nworkers = nworkers; } /* * taskqueue_free_workers: Free taskqueue's array of workers. */ static void taskqueue_free_workers(struct taskqueue *taskqueue) { kmem_free(taskqueue->tq_worker_array, (taskqueue->tq_nworkers * roundup2(sizeof(struct taskworker), coherency_unit))); taskqueue->tq_worker_array = NULL; taskqueue->tq_nworkers = 0; } /* * taskqueue_worker: Return the ith worker for taskqueue. */ static struct taskworker * taskqueue_worker(struct taskqueue *taskqueue, unsigned int i) { char *array = taskqueue->tq_worker_array; KASSERT(i < taskqueue->tq_nworkers); return (void *)&array[i * roundup2(sizeof(struct taskworker), coherency_unit)]; } /* * taskqueue_current_worker: Return the worker on the current CPU. */ static struct taskworker * taskqueue_current_worker(struct taskqueue *taskqueue) { unsigned int i; KASSERT(kpreempt_disabled()); KASSERT(ISSET(taskqueue->tq_flags, TASKQUEUE_PERCPU)); i = cpu_index(curcpu()); if (__predict_false(taskqueue->tq_nworkers <= i)) /* XXX Hare-brained fallback should cpu_index go bonkers. */ i %= taskqueue->tq_nworkers; return taskqueue_worker(taskqueue, i); } /* Taskqueue workers */ /* * taskworker_init: Initialize a task worker. */ static void taskworker_init(struct taskworker *worker, struct taskqueue *taskqueue, const char *name, int schedule_ipl) { mutex_init(&worker->tw_lock, MUTEX_DEFAULT, schedule_ipl); worker->tw_taskqueue = taskqueue; cv_init(&worker->tw_done_cv, name); TAILQ_INIT(&worker->tw_queue); worker->tw_current_task = NULL; worker->tw_flags = 0; } /* * taskworker_destroy: Wait for all worker's tasks to complete and * destroy the resources used by it. May sleep. * * XXX This currently halts delayed tasks. Is that sensible? */ static void taskworker_destroy(struct taskworker *worker) { switch (worker->tw_taskqueue->tq_type) { case TASKQUEUE_SOFTINT: /* Softint cookie is global to the queue, not per-worker. */ break; case TASKQUEUE_THREAD: if (worker->tw_u.thread.lwp == NULL) /* Never got started. */ break; mutex_enter(&worker->tw_lock); worker->tw_flags |= TASKWORKER_EXIT; cv_signal(&worker->tw_u.thread.run_cv); mutex_exit(&worker->tw_lock); (void)kthread_join(&worker->tw_u.thread.lwp); worker->tw_u.thread.lwp = NULL; cv_destroy(&worker->tw_u.thread.run_cv); break; default: panic("taskqueue %p has invalid type: %d", taskworker->tw_taskqueue, (int)taskworker->tw_taskqueue->tq_type); } KASSERT(worker->tw_current_task == NULL); KASSERT(TAILQ_EMPTY(&worker->tw_queue)); cv_destroy(&worker->tw_done_cv); mutex_destroy(&worker->tw_lock); } /* * taskworker_schedule: Schedule task to run on worker, or if task is * currently being run by another worker, ask that worker to reschedule * it. If task is already on a queue, do nothing. */ static void taskworker_schedule(struct taskworker *worker, struct task *task) { struct taskworker *worker0; mutex_enter(&worker->tw_lock); /* Try to grab the task. */ if ((worker0 = atomic_cas_ptr(&task->task_worker, NULL, worker)) != NULL) { /* Someone else got in first. */ if (worker != worker0) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (task->task_worker != worker0) { /* worker0 already ran it. */ mutex_exit(&worker0->tw_lock); return; } if (worker0->tw_current_task != task) { /* worker0 is still scheduled to run it. */ mutex_exit(&worker0->tw_lock); return; } if (ISSET(worker0->tw_flags, TASKWORKER_RESCHEDULED)) { /* It's running and already rescheduled. */ mutex_exit(&worker0->tw_lock); return; } /* * It's running. Put it back on the queue and notify * everyone else that it's been rescheduled. */ worker0->tw_flags |= TASKWORKER_RESCHEDULED; TAILQ_INSERT_TAIL(&worker0->tw_queue, task, task_entry); mutex_exit(&worker0->tw_lock); return; } /* Task is grabbed. Put it on the queue and notify the worker. */ taskworker_enqueue(worker, task); mutex_exit(&worker->tw_lock); } /* * taskworker_enqueue: Put task on worker's queue and cause the worker * to run. Caller must hold worker's lock. */ static void taskworker_enqueue(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); TAILQ_INSERT_TAIL(&worker->tw_queue, task, task_entry); switch (worker->tw_taskqueue->tq_type) { case TASKQUEUE_SOFTINT: /* * We had better have just put it on the right queue. * Otherwise, the softint handler won't get it! */ KASSERT(taskqueue_current_worker(worker->tw_taskqueue) == worker); softint_schedule(taskqueue->tq_u.softint.cookie); break; case TASKQUEUE_THREAD: cv_signal(&worker->tw_u.thread.run_cv); break; default: panic("taskqueue %p has invalid type: %d", taskqueue, (int)taskqueue->tq_type); } } /* * taskworker_schedule_after_ticks: Schedule dt to run on worker after * the specified number of ticks. If it is already running, schedule * it again. If it is already scheduled on another worker, leave it be * -- don't change the time at which it is scheduled to run. * * XXX It's not clear that refusing to change the timeout is a good * idea. On the other hand, users can do that explicitly by calling * delayed_task_cancel_async first. */ static void taskworker_schedule_after_ticks(struct taskworker *worker, struct delayed_task *dt, int ticks) { struct taskworker *worker0; if (__predict_false(ticks <= 0)) { if (ticks == 0) taskworker_enqueue(worker, &dt->dt_task); return; } mutex_enter(&worker->tw_lock); /* Try to grab the task. */ if ((worker0 = atomic_cas_ptr(&task->task_worker, NULL, worker)) != NULL) { /* Someone else got in first. */ if (worker != worker0) { mutex_exit(&worker->tw_lock); mutex_enter(&worker0->tw_lock); } if (task->task_worker != worker0) { /* worker0 already ran it. */ mutex_exit(&worker0->tw_lock); return; } if (worker0->tw_current_task != task) { /* worker0 is still scheduled to run it. */ mutex_exit(&worker0->tw_lock); return; } if (ISSET(worker0->tw_flags, TASKWORKER_RESCHEDULED)) { /* It's running and already rescheduled. */ mutex_exit(&worker0->tw_lock); return; } /* * It's running. Reschedule the callout and notify * everyone that it's been rescheduled. */ worker0->tw_flags |= TASKWORKER_RESCHEDULED; callout_schedule(&dt->dt_callout, ticks); mutex_exit(&worker0->tw_lock); return; } /* Task is grabbed. Schedule it. */ callout_schedule(&dt->dt_callout, ticks); mutex_exit(&worker->tw_lock); } void delayed_task_timeout(void *arg) { struct delayed_task *const dt = arg; struct task *const task = &dt->dt_task; struct worker *const worker = task->task_worker; KASSERT(worker != NULL); mutex_enter(&worker->tw_lock); KASSERT(worker->tw_current_task != worker); KASSERT(task->task_worker == worker); taskworker_enqueue(worker, task); mutex_exit(&worker->tw_lock); } /* * taskworker_run_1: Pick one task off queue and execute it. To be * called from a worker thread or softint handler. */ static void taskworker_run_1(struct taskworker *worker, struct task_head *queue) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(!TAILQ_EMPTY(queue)); /* Grab the task from the queue. */ struct task *const task = TAILQ_FIRST(queue); TAILQ_REMOVE(queue, task, task_entry); /* We had better not be working on anything yet. */ KASSERT(worker->tw_current_task == NULL); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_RESCHEDULE)); /* Mark it current and run it. */ worker->tw_current_task = task; mutex_exit(&worker->tw_lock); (*task->task_fn)(task); mutex_enter(&worker->tw_lock); KASSERT(worker->tw_current_task == task); /* Mark it done. */ if (!ISSET(worker->tw_flags, TASKWORKER_DONE)) taskworker_done(worker, task); /* Can't touch task after this. */ KASSERT(ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_RESCHEDULE)); /* Clear the flags related to this task. */ worker->tw_flags &= ~(TASKWORKER_DONE | TASKWORKER_RESCHEDULE); /* Notify anyone waiting to drain. */ worker->tw_current_task = NULL; cv_broadcast(&worker->tw_done_cv); } /* * taskworker_done: Mark task done. The worker will not touch it after * this, and the task can be executed again if rescheduled. If someone * had already asked to reschedule the task, acknowledge this; * otherwise, disassociate the task from the worker. */ static void taskworker_done(struct taskworker *worker, struct task *task) { KASSERT(mutex_owned(&worker->tw_lock)); KASSERT(taskworker->tw_current_task == task); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_DONE)); KASSERT(task->tw_worker == worker); if (ISSET(worker->tw_flags, TASKWORKER_RESCHEDULED)) { worker->tw_flags &= ~TASKWORKER_RESCHEDULE; } else { struct taskworker *const worker0 __diagused = atomic_swap_ptr(&task->task_worker, NULL); KASSERT(worker0 == worker); /* Can't touch task after this. */ } worker->tw_flags |= TASKWORKER_DONE; } /* * taskworker_thread: Execute tasks until we're told to exit. To be * called as the start routine of a kthread. */ static void __dead taskworker_thread(void *arg) { struct taskworker *const worker = arg; mutex_enter(&worker->tw_lock); for (;;) { while (TAILQ_EMPTY(&worker->tw_queue) && !ISSET(worker->tw_flags, TASKWORKER_EXIT)) cv_wait(&worker->tw_cv, &worker->tw_lock); if (ISSET(worker->tw_flags, TASKWORKER_EXIT)) break; taskworker_run_1(worker, &worker->tw_queue); } mutex_exit(&worker->tw_lock); kthread_exit(0); } /* * taskworker_softintr: Gather all the tasks queued so far and execute * them. This does not execute any tasks that get queued after it * begins. */ static void taskworker_softintr(void *arg) { struct taskqueue *const taskqueue = arg; struct taskworker *const worker = taskqueue_current_worker(taskqueue); struct task *task; struct task_head queue = TAILQ_HEAD_INITIALIZER(&queue); mutex_enter(&worker->tw_lock); /* XXX Is there a quicker way to grab everything off a tailq? */ while (!TAILQ_EMPTY(&worker->tw_queue)) { task = TAILQ_FIRST(&worker->tw_queue); TAILQ_REMOVE(&worker->tw_queue, task, task_entry); TAILQ_INSERT_TAIL(&queue, task, task_entry); } while (!TAILQ_EMPTY(&queue)) taskworker_run_1(worker, &queue); mutex_exit(&worker->tw_lock); } /* Tasks */ static void task_error(struct task *task) { panic("destroyed task got run: %p", task); } /* * task_schedule: Schedule task to run on taskqueue. */ void task_schedule(struct taskqueue *taskqueue, struct task *task) { KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't schedule: %p", task); if (ISSET(taskqueue->tq_flags, TASKQUEUE_PERCPU)) { kpreempt_disable(); taskworker_schedule(taskqueue_current_worker(taskqueue), task); kpreempt_enable(); } else { taskworker_schedule(taskqueue_worker(taskqueue, 0), task); } } /* * task_destroy: Destroy a task initialized with task_init. Must no * longer be scheduled. Use task_cancel first if this is not already * guaranteed. task_cancel_async is not enough. If you want to call * this inside a task callback, you must use task_done first. * * Not strictly necessary, but this is part of the documented API in * case it ever becomes necessary. */ void task_destroy(struct task *task) { KASSERTMSG((task->task_fn != &task_error), "task already destroyed: %p", task); KASSERTMSG((task->task_worker == NULL), "task still scheduled: %p", task); task->task_fn = task_error; } /* * task_done: To be called in a task callback to notify the task worker * that the task is done and can be rescheduled. * * If you do not call this, the task worker will do the equivalent when * the task callback returns. You MUST call this if the task callback * will free memory containing the struct task. */ void task_done(struct task *task) { struct taskworker *const worker = task->task_worker; KASSERTMSG((worker != NULL), "task not running: %p", task); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't be completing: %p", task); mutex_enter(&worker->tw_lock); KASSERTMSG((task->task_worker == worker), "task %p changed workers: %p -> %p", task, worker, task->task_worker); KASSERTMSG((worker->tw_current_work == task), "task %p is not current on worker %p", task, worker); KASSERTMSG(!ISSET(worker->tw_flags, TASKWORKER_DONE), "task %p is already done on worker %p", task, worker); taskworker_done(worker, task); mutex_exit(&worker->tw_lock); } /* * task_cancel: Cancel task. If it has already begun, wait for it to * complete (or to call task_done). May drop interlock if it is * necessary to wait. May sleep. * * Returns true if task was scheduled and we prevented it from running. * Returns false if the task was not scheduled or had already begun. */ bool task_cancel(struct task *task, kmutex_t *interlock) { struct taskworker *worker = task->task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); /* If it's not scheduled, it never ran. */ if (worker == NULL) return true; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == worker) { mutex_exit(interlock); while ((worker->tw_current_task == worker) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_done_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* Got it before it ran! Remove it from the queue. */ TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* * task_cancel_async: Like task_cancel, but returns immediately even if * the task is in flight. Hence there is no need for an interlock. */ bool task_cancel_async(struct task *task) { struct taskworker *worker = task->task_worker; KASSERTMSG((task->task_fn != &task_error), "task destroyed, can't cancel: %p", task); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (worker->tw_current_task == worker) { mutex_exit(&worker->tw_lock); return false; } TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); return true; } /* Delayed tasks */ /* * delayed_task_destroy: Destroy a task initialized with dt. Must no * longer be scheduled. Use delayed_task_cancel first if this is not * already guaranteed. delayed_task_cancel_async is not enough. */ void delayed_task_destroy(struct delayed_task *dt) { KASSERTMSG((!callout_pending(&dt->dt_callout)), "delayed task still pending timeout: %p", dt); KASSERTMSG((dt->dt_task.task_worker == NULL), "delayed task still scheduled: %p", dt); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't cancel: %p", dt); callout_destroy(&dt->dt_callout); task_destroy(&dt->dt_task); } void delayed_task_schedule_ticks(struct taskqueue *taskqueue, struct delayed_task *dt, int ticks) { KASSERTMSG((taskqueue->tq_pri <= PRI_SOFTCLOCK), "taskqueue %p priority too high for delayed task %p: %"PRIdMAX, taskqueue, dt, (intmax_t)taskqueue->tq_pri); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); KASSERTMSG((0 <= ticks), "can't schedule delayed task %p in the past, %d ticks from now!", dt, ticks); if (ISSET(taskqueue->tq_flags, TASKQUEUE_PERCPU)) { kpreempt_disable(); taskworker_schedule_after_ticks( taskqueue_current_worker(taskqueue), task, ticks); kpreempt_enable(); } else { taskworker_schedule_after_ticks(taskqueue_worker(taskqueue, 0), task, ticks); } } /* * delayed_task_cancel_async: Like delayed_task_cancel, but returns * immediately even if the task is in flight. Hence there is no need * for an interlock. */ bool delayed_task_cancel_async(struct delayed_task *dt) { struct taskworker *worker = task->task_worker; KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); if (worker == NULL) return false; mutex_enter(&worker->tw_lock); if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } if (!callout_stop(&dt->dt_callout)) { mutex_exit(&worker->tw_lock); return true; } if (worker->tw_current_task != worker) { TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); return true; } mutex_exit(&worker->tw_lock); return false; } /* * delayed_task_cancel: Cancel the delayed task dt. If it has already * begun, wait for it to complete (or to call task_done). May drop * interlock if it is necessary to wait, either for the callout to * complete or for the task to complete. May sleep. * * Returns true if dt was scheduled and we prevented it from running. * Returns false if dt was not scheduled or had already begun. */ bool delayed_task_cancel(struct delayed_task *dt, kmutex_t *interlock) { struct taskworker *worker = task->task_worker; ASSERT_SLEEPABLE(); KASSERTMSG((dt->dt_task.task_fn != &task_error), "delayed task destroyed, can't schedule: %p", dt); /* If it's not scheduled, we can't cancel it. */ if (worker == NULL) return false; mutex_enter(&worker->tw_lock); /* If it moved to another queue, it already ran so we can't cancel. */ if (task->task_worker != worker) { mutex_exit(&worker->tw_lock); return false; } /* If it is running, it's too late to cancel. Wait for completion. */ if (worker->tw_current_task == worker) { mutex_exit(interlock); while ((worker->tw_current_task == worker) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_done_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* * Try to cancel it without dropping the interlock. This is an * optimization; we don't need to do it. */ if (!callout_stop(&dt->dt_callout)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); return true; } /* * We have to drop the interlock here because we need to drop * the interlock and the worker lock in order to sleep if the * callout has fired, but callout_halt can't drop two locks. * * XXX An alternative would be to have a caller-supplied * interlock stored in the delayed task structure. That would * simplify the logic in scheduling delayed tasks, too. * However, it would complicate the API for callers that don't * need interlocking in task_cancel (XXX are there any, * really?). */ mutex_exit(interlock); /* Try to cancel the callout. */ if (!callout_halt(&dt->dt_callout, &worker->tw_lock)) { /* Callout had not fired, so we cancelled it. */ mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; } /* It already fired. Try to cancel the task. */ if (worker->tw_current_task == worker) { /* The task is already running. Wait to complete. */ while ((worker->tw_current_work == worker) && !ISSET(worker->tw_flags, TASKWORKER_DONE)) cv_wait(&worker->tw_done_cv, &worker->tw_lock); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return false; } /* The task has not yet run. Don't let it. */ TAILQ_REMOVE(&worker->tw_queue, task, task_entry); mutex_exit(&worker->tw_lock); mutex_enter(interlock); return true; }