/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code does not work! I have not even compile-tested it. It is * a draft of an idea. * * This implementation of workqueue(9) has the following features: * * - Softint priority levels, not just thread priority levels. * - Shared pool of threads -- no extra threads for idle workqueues. * - No interprocessor synchronization for oft-used workqueues. * - workqueue_drain operation to wait for all work to complete. * * Disadvantages: * * - More code than the current subr_workqueue.c. * - Cross-CPU workqueue_enqueue may sleep. * - No unbound (non-per-CPU) workqueues (yet?). */ /*- * Copyright (c) 2014 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define container_of(PTR, TYPE, FIELD) \ ((void)sizeof((PTR) - \ &((TYPE *)(((char *)(PTR)) - \ offsetof(TYPE, FIELD)))->FIELD), \ ((TYPE *)(((char *)(PTR)) - offsetof(TYPE, FIELD)))) #define WORKQUEUE_IDLE_TICKS mstohz(100) /* Data structures */ SIMPLEQ_HEAD(work_head, work); struct workqueue { struct percpu *wq_percpu; /* struct workqueue_cpu * */ union { struct { void *cookie; } softint; struct { struct threadpool_percpu *pool_percpu; } thread; } wq_type_u; enum workqueue_type { WORKQUEUE_SOFTINT, WORKQUEUE_THREAD, } wq_type; work_fn_t wq_fn; void *wq_arg; pri_t wq_pri; int wq_ipl; ipl_cookie_t wq_ipl_cookie; int wq_flags; #define WORKQUEUE_DYING 0x0100 kmutex_t wq_drain_lock; kcondvar_t wq_drain_cv; volatile unsigned int wq_draining; }; struct workqueue_cpu { struct work_head wqc_queue; }; struct workqueue_softint { struct workqueue_cpu wqs_cpu; }; struct workqueue_thread { struct workqueue_cpu wqt_cpu; kmutex_t wqt_lock; kcondvar_t wqt_cv; struct threadpool_job wqt_job; bool wqt_running; struct workqueue *wqt_workqueue; }; static void workqueue_cpu_init(struct workqueue_cpu *, struct workqueue *, struct cpu_info *); static int workqueue_init_softint(struct workqueue *, pri_t); static int workqueue_init_thread(struct workqueue *, const char *, pri_t, int); static void workqueue_cpu_destroy(struct workqueue_cpu *); static void workqueue_destroy_softint(struct workqueue *); static void workqueue_destroy_thread(struct workqueue *); static xcfunc_t workqueue_enqueue_xc; static void workqueue_drain_notify(struct workqueue *); static void workqueue_drain_notify_outofline(struct workqueue *); static xfunc_t workqueue_drain_xc; static int workqueue_cpu_lock(struct workqueue *); static void workqueue_cpu_unlock(struct workqueue *, int); static void workqueue_softintr(void *); static void workqueue_job(struct threadpool_job *); static struct pool workqueue_pool; static struct pool workqueue_softint_pool; static struct pool workqueue_thread_pool; void workqueues_init(void) { pool_init(&workqueue_pool, sizeof(struct workqueue), 0, 0, 0, "workq", NULL, IPL_NONE); pool_init(&workqueue_softint_pool, sizeof(struct workqueue_softint), 0, 0, 0, "workqsih", NULL, IPL_NONE); pool_init(&workqueue_thread_pool, sizeof(struct workqueue_thread), 0, 0, 0, "workqthr", NULL, IPL_NONE); } /* Creating workqueues */ int workqueue_create(struct workqueue **workqueuep, const char *name, work_fn_t fn, void *arg, pri_t work_pri, int schedule_ipl, int flags) { struct workqueue *workqueue; struct percpu *percpu; size_t percpu_size; struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); workqueue = pool_get(&workqueue_pool, PR_WAITOK); workqueue->wq_fn = fn; workqueue->wq_arg = arg; workqueue->wq_pri = work_pri; workqueue->wq_ipl = schedule_ipl; workqueue->wq_ipl = makeiplcookie(schedule_ipl); workqueue->wq_flags = flags; mutex_init(&workqueue->wq_drain_lock, MUTEX_DEFAULT, schedule_ipl); cv_init(&workqueue->wq_drain_cv, "wqdrain"); workqueue->wq_draining = 0; workqueue->wq_percpu = percpu_alloc(sizeof(struct workqueue_cpu *)); if (workqueue->wq_percpu == NULL) { error = ENOMEM; goto fail0; } KASSERT(work_pri <= MAXPRI_WORKQUEUE); if (MINPRI_SOFTINT <= work_pri) { error = workqueue_init_softint(workqueue, work_pri); } else { KASSERT(work_pri <= MAXPRI_KTHREAD); error = workqueue_init_thread(workqueue, name, work_pri, schedule_ipl); } if (error) goto fail1; /* Success! */ *workqueuep = workqueue; return 0; fail1: percpu_free(workqueue->wq_percpu, sizeof(struct workqueue_cpu *)); fail0: pool_put(&workqueue_pool, workqueue); return error; } static void workqueue_cpu_init(struct workqueue_cpu *wqc, struct workqueue *workqueue, struct cpu_info *ci) { SIMPLEQ_INIT(&wqc->wqc_queue); percpu_traverse_enter(); struct workqueue_cpu **const wqcp = percpu_getptr_remote(workqueue->wq_percpu, ci); *wqcp = wqc; percpu_traverse_exit(); } static int workqueue_init_softint(struct workqueue *workqueue, pri_t work_pri) { int work_softpri; workqueue->wq_type = WORKQUEUE_SOFTINT; /* XXX This table should go somewhere else. */ if (PRI_SOFTSERIAL <= work_pri) work_softpri = SOFTINT_SERIAL; else if (PRI_SOFTNET <= work_pri) work_softpri = SOFTINT_NET; else if (PRI_SOFTBIO <= work_pri) work_softpri = SOFTINT_BIO; else if (PRI_SOFTCLOCK <= work_pri) work_softpri = SOFTINT_CLOCK; else panic("invalid softint-level priority: %d", (int)work_pri); workqueue->wq_u.softint.cookie = softint_establish((work_softpri | SOFTINT_MPSAFE), &workqueue_softintr, workqueue); if (workqueue->wq_u.softint.cookie == NULL) { error = ENOMEM; goto fail0; } for (CPU_INFO_FOREACH(cii, ci)) { struct workqueue_softint *const wqs = pool_get(&workqueue_softint_pool, PR_WAITOK); workqueue_cpu_init(&wqs->wqs_cpu, workqueue, ci); } /* Success! */ return 0; fail0: return error; } static int workqueue_init_thread(struct workqueue *workqueue, const char *name, pri_t work_pri, int schedule_ipl) { int error; workqueue->wq_type = WORKQUEUE_THREAD; error = threadpool_percpu_get(&workqueue->wq_u.thread.pool_percpu, work_pri); if (error) goto fail0; for (CPU_INFO_FOREACH(cii, ci)) { struct workqueue_thread *const wqt = pool_get(&workqueue_thread_pool, PR_WAITOK); mutex_init(&wqt->wqt_lock, MUTEX_DEFAULT, schedule_ipl); cv_init(&wqt->wqt_cv, name); threadpool_job_init(&wqt->wqt_job, &workqueue_job, &wqt->wqt_lock, "%s/%u", name, cpu_index(ci)); wqt->wqt_running = false; wqt->wqt_workqueue = workqueue; workqueue_cpu_init(&wqt->wqt_cpu, workqueue, ci); } /* Success! */ return 0; fail0: return error; } /* Destroying workqueues */ void workqueue_destroy(struct workqueue *workqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); workqueue->wq_flags |= WORKQUEUE_DYING; /* * Guarantee the flag is visible before cv_broadcast. * mutex_enter provides the necessary read barrier. */ membar_producer(); switch (workqueue->wq_type) { case WORKQUEUE_SOFTINT: workqueue_destroy_softint(workqueue); break; case WORKQUEUE_THREAD: workqueue_destroy_thread(workqueue); break; default: panic("workqueue %p has invalid type: %d", workqueue, (int)workqueue->wq_type); } KASSERT(workqueue->wq_draining == 0); KASSERT(!cv_has_waiters(&workqueue->wq_drain_cv)); cv_destroy(&workqueue->wq_drain_cv); mutex_destroy(&workqueue->wq_drain_lock); percpu_free(workqueue->wq_percpu, sizeof(struct workqueue_cpu *)); pool_put(&workqueue_pool, workqueue); } static void workqueue_cpu_destroy(struct workqueue_cpu *wqc __diagused) { KASSERT(SIMPLEQ_EMPTY(&wqc->wqc_queue)); } static void workqueue_destroy_softint(struct workqueue *workqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; /* First wait for pending calls to the softint to complete. */ softint_disestablish(workqueue->wq_u.softint.cookie); /* Then free the per-CPU data. */ for (CPU_INFO_FOREACH(cii, ci)) { percpu_traverse_enter(); struct workqueue_cpu **const wqcp = percpu_getptr_remote(workqueue->wq_percpu, ci); struct workqueue_cpu *const wqc = *wqcp; *wqcp = NULL; percpu_traverse_exit(); struct workqueue_softint *const wqs = container_of(wqc, struct workqueue_softint, wqs_cpu); workqueue_cpu_destroy(&wqs->wqs_cpu); pool_put(&workqueue_softint_pool, wqs); } } static void workqueue_destroy_thread(struct workqueue *workqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; for (CPU_INFO_FOREACH(cii, ci)) { percpu_traverse_enter(); struct workqueue_cpu **const wqcp = percpu_getptr_remote(workqueue->wq_u.thread.percpu, ci); struct workqueue_cpu *const wqc = *wqcp; *wqcp = NULL; percpu_traverse_exit(); struct workqueue_thread *const wqt = container_of(wqc, struct workqueue_thread, wqt_cpu); /* * Cancel the job. It is the caller's responsibility * to process any outstanding work, e.g. by draining * first or maintaining a separate list of all work. */ struct threadpool *const pool = threadpool_percpu_ref_remote( workqueue->wq_u.thread.pool_percpu, ci); mutex_enter(&wqt->wqt_lock); /* We set WORKQUEUE_DYING earlier; wake job if waiting. */ cv_broadcast(&wqt->wqt_cv); /* Cancel it if not. */ threadpool_cancel_job(pool, &wqt->wqt_job); mutex_exit(&wqt->wqt_lock); KASSERT(wqt->wqt_workqueue == workqueue); KASSERT(!wqt->wqt_running); threadpool_job_destroy(&wqt->wqt_job); KASSERT(!cv_has_waiters(&wqt->wqt_cv)); cv_destroy(&wqt->wqt_cv); mutex_destroy(&wqt->wqt_lock); workqueue_cpu_destroy(&wqt->wqt_cpu); pool_put(&workqueue_thread_pool, wqt); } threadpool_percpu_put(workqueue->wq_u.thread.pool_percpu, workqueue->wq_pri); } /* Scheduling work */ void workqueue_schedule(struct workqueue *workqueue, struct work *work) { int s; struct workqueue_cpu **const wqcp = percpu_getref(&workqueue->wq_percpu); struct workqueue_cpu *const wqc = *wqcp; s = workqueue_cpu_lock(workqueue); SIMPLEQ_INSERT_TAIL(&wqc->wqc_queue, work, w_entry); switch (workqueue->wq_type) { case WORKQUEUE_SOFTINT: softint_schedule(workqueue->wq_u.softint.cookie); break; case WORKQUEUE_THREAD: /* If it's not already running, kick the thread. */ if (__predict_false(!wqt->wqt_running)) { struct workqueue_thread *const wqt = container_of(wqc, struct workqueue_thread, wqt_cpu); struct threadpool *const pool = threadpool_percpu_ref( workqueue->wq_u.thread.pool_percpu); workqueue_cpu_unlock(workqueue, s); percpu_putref(workqueue->wq_percpu); mutex_enter(&wqt->wqt_lock); threadpool_schedule_job(pool, &wqt->wqt_job); wqt->wqt_running = true; cv_signal(&wqt->wqt_cv); mutex_exit(&wqt->wqt_lock); return; } break; } workqueue_cpu_unlock(workqueue, s); percpu_putref(&workqueue->wq_percpu); } /* Scheduling work on another CPU */ void workqueue_enqueue(struct workqueue *workqueue, struct work *work, struct cpu_info *ci) { int flags; /* If the caller doesn't care, use the current CPU. */ if (ci == NULL) { workqueue_schedule(workqueue, work); goto out0; } /* Otherwise, we may need to do an xcall, which requires sleeping. */ ASSERT_SLEEPABLE(); /* Don't switch between calling curcpu and workqueue_schedule. */ kpreempt_disable(); /* If the CPU the caller wants is the current one, easy. */ if (ci == curcpu()) { workqueue_schedule(workqueue, work); goto out1; } /* Otherwise, do an xcall, highpri if softint and lowpri if thread. */ switch (workqueue->wq_type) { case WORKQUEUE_SOFTINT: flags = XC_HIGHPRI; break; case WOKRQUEUE_THREAD: flags = 0; break; default: panic("workqueue %p has invalid type: %d", workqueue, (int)workqueue->wq_type); } xc_wait(xc_unicast(flags, &workqueue_enqueue_xc, workqueue, work, ci)); out1: kpreempt_enable(); out0: return; } static void workqueue_enqueue_xc(void *workqueue_v, void *work_v) { struct workqueue *const workqueue = workqueue_v; struct work *const work = work_v; workqueue_schedule(workqueue, work); } /* Draining work */ void workqueue_drain(struct workqueue *workqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); mutex_enter(&workqueue->wq_drain_lock); /* Wait for any pending drain to complete. */ while (workqueue->wq_draining) cv_wait(&workqueue->wq_drain_cv, &wq->wq_drain_lock); /* Initiate a drain. */ workqueue->wq_draining = ncpu; mutex_exit(&workqueue->wq_drain_lock); /* Force the drain to happen on each CPU. */ xc_wait(xc_broadcast(0, &workqueue_drain_xc, workqueue, NULL)); mutex_enter(&workqueue->wq_drain_lock); /* Wait for the drain to complete. */ while (workqueue->wq_draining) cv_wait(&workqueue->wq_drain_cv, &wq->wq_drain_lock); mutex_exit(&workqueue->wq_draining); } static inline void workqueue_drain_notify(struct workqueue *workqueue) { if (__predict_false(workqueue->wq_draining)) workqueue_drain_notify_outofline(workqueue); } static void __noinline workqueue_drain_notify_outofline(struct workqueue *workqueue) { unsigned int draining; do { draining = workqueue->wq_draining; KASSERT(0 < draining); if (draining == 1) { mutex_enter(&workqueue->wq_drain_lock); draining = atomic_dec_uint_nv(&workqueue->wq_draining); KASSERT(draining != UINT_MAX); if (draining == 0) cv_broadcast(&workqueue->wq_drain_cv); mutex_exit(&workqueue->wq_drain_lock); return; } } while (atomic_cas_uint(&workqueue->wq_draining, draining, (draining - 1)) != draining); } static void workqueue_drain_xc(void *workqueue_v, void *dummy __unused) { struct workqueue *const workqueue = workqueue_v; switch (workqueue->wq_type) { case WORKQUEUE_SOFTINT: softint_schedule(workqueue->wq_u.softint.cookie); break; case WORKQUEUE_THREAD: { struct threadpool *const pool = threadpool_percpu_ref(workqueue->wq_u.thread.pool_percpu); struct workqueue_cpu **const wqcp = percpu_getref(workqueue->wq_percpu); struct workqueue_cpu *const wqc = *wqcp; percpu_putref(workqueue->wq_percpu); struct workqueue_thread *const wqt = container_of(wqc, struct workqueue_thread, wqt_cpu); mutex_enter(&wqt->wqt_lock); threadpool_schedule_job(pool, &wqt->wqt_job); wqt->wqt_running = true; cv_signal(&wqt->wqt_cv); mutex_exit(&wqt->wqt_lock); break; } default: panic("workqueue %p has invalid type: %d", workqueue, (int)workqueue->wq_type); } } /* Workqueue per-CPU locking */ /* * In order to avoid interprocessor synchronization, scheduling and * doing work, which are guaranteed to happen only on the same CPU, are * excluded by disabling kernel preemption (if scheduling work happens * only in thread context) or by disabling interrupts (if scheduling * work can happen in an interrupt handler), rather than by mutexes or * atomics. * * All access to struct workqueue_cpu::wqc_queue is serialized by the * workqueue CPU lock and limited to the CPU to which it is assigned. */ static inline int workqueue_cpu_lock(struct workqueue *workqueue) { int s; if (workqueue->wq_ipl == IPL_NONE) { kpreempt_disable(); s = 0x1eafcafe; } else { s = splraiseipl(workqueue->wq_ipl_cookie); } return s; } static inline void workqueue_cpu_unlock(struct workqueue *workqueue, int s) { if (workqueue->wq_ipl == IPL_NONE) { KASSERT(s == 0x1eafcafe); kpreempt_enable(); } else { splx(s); } } /* Doing work */ /* * When work is scheduled, we either schedule a softint or kick a * pooled thread. In both cases, the party responsible for doing the * work -- the softint or the thread -- takes the CPU lock for a * constant duration to grab a batch of work items, and then executes * them with the CPU lock dropped. */ /* * Scheduling softints is cheap, so if the softint is running and * someone wants to schedule more work, it will just go in the next * batch and the softint will be scheduled again. */ static void workqueue_softintr(void *arg) { struct workqueue *const workqueue = arg; struct work_head tmp = SIMPLEQ_HEAD_INITIALIZER(tmp); struct work *work; int s; struct workqueue_cpu **const wqcp = percpu_getref(workqueue->wq_percpu); struct workqueue_cpu *const wqc = *wqcp; percpu_putref(workqueue->wq_percpu); s = workqueue_cpu_lock(workqueue); SIMPLEQ_CONCAT(&tmp, &wqc->wqc_queue); workqueue_cpu_unlock(workqueue, s); SIMPLEQ_FOREACH(work, tmp, w_entry) (*workqueue->wq_fn)(work, workqueue->wq_arg); workqueue_drain_notify(workqueue); } /* * Scheduling thread pool jobs is not cheap, so we try to keep the * thread assigned to our job for a little while, and use the * wqt->wqt_running flag to avoid interprocessor synchronization. * * Transition of wqt->wqt_running from false to true happens only under * wqt->wqt_lock and is signalled by wqt->wqt_cv when scheduling work. * Transition of wqt->wqt_running from true to false happens only under * the workqueue CPU lock when the thread has found that the queue is * empty and is about to wait for work. * * If you are on the same CPU, and if, under the workqueue CPU lock, * you read that wqt->wqt_running is true, then you are guaranteed that * after you drop the workqueue CPU lock, the thread will re-examine * the queue. Otherwise, it is necessary to schedule the job, set * wqt->wqt_running to true, and signal wqt->wqt_cv, all under * wqt->wqt_lock. */ static void workqueue_job(struct threadpool_job *job) { struct workqueue_thread *const wqt = container_of(job, struct workqueue_thread, wqt_job); struct workqueue *const workqueue = wqt->wqt_workqueue; struct work_head *queue = &wqt->wqt_cpu.wqc_queue; struct work_head tmp = SIMPLEQ_HEAD_INITIALIZER(tmp); struct work *work; int s; for (;;) { s = workqueue_cpu_lock(workqueue); /* If there's no work to be done, wait for it. */ while (__predict_false(SIMPLEQ_EMPTY(queue))) { wqt->wqt_running = false; workqueue_cpu_unlock(workqueue, s); mutex_enter(&wqt->wqt_lock); while (!wqt->wqt_running) { /* Before we wait, report no work to drain. */ workqueue_drain_notify(workqueue); if (ISSET(workqueue->wq_flags, WORKQUEUE_DYING)) goto out; if (cv_timedwait(&wqt->wqt_cv, &wqt->wqt_lock, WORKQUEUE_IDLE_TICKS)) { if (!wqt->wqt_running) goto out; } } mutex_exit(&wqt->wqt_lock); s = workqueue_cpu_lock(workqueue); } KASSERT(!SIMPLEQ_EMPTY(queue)); KASSERT(wqt->wqt_running); /* Grab a batch of work off the queue. */ SIMPLEQ_CONCAT(&tmp, queue); workqueue_cpu_unlock(workqueue, s); /* Run it all. */ SIMPLEQ_FOREACH(work, tmp, w_entry) (*workqueue->wq_fn)(work, workqueue->wq_arg); SIMPLEQ_INIT(&tmp); /* reset */ /* Report that all work scheduled so far is done. */ workqueue_drain_notify(workqueue); } out: threadpool_job_done(job); mutex_exit(&wqt->wqt_lock); }