/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code does not work! I have not even compile-tested it. It is * a draft of an idea. * * This implementation of workqueue(9) has the following features: * * - Shared pool of threads -- no extra threads for idle workqueues. * - No interprocessor synchronization for oft-used workqueues. * - workqueue_drain operation to wait for all work to complete. * * Disadvantages: * * - More code than the current subr_workqueue.c. * - Cross-CPU workqueue_enqueue may sleep. * - No unbound (non-per-CPU) workqueues (yet?). */ /*- * Copyright (c) 2014 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define container_of(PTR, TYPE, FIELD) \ ((void)sizeof((PTR) - \ &((TYPE *)(((char *)(PTR)) - \ offsetof(TYPE, FIELD)))->FIELD), \ ((TYPE *)(((char *)(PTR)) - offsetof(TYPE, FIELD)))) #define WORKQUEUE_IDLE_TICKS mstohz(100) /* Data structures */ SIMPLEQ_HEAD(work_head, work); struct workqueue { struct percpu *wq_percpu; /* struct workqueue_cpu * */ struct threadpool_perpcu *wq_pool_percpu; work_fn_t wq_fn; void *wq_arg; pri_t wq_pri; int wq_flags; #define WORKQUEUE_DYING 0x0100 kmutex_t wq_drain_lock; kcondvar_t wq_drain_cv; volatile unsigned int wq_draining; }; struct workqueue_cpu { struct work_head wqc_queue; kmutex_t wqc_lock; kcondvar_t wqc_cv; struct threadpool_job wqc_job; bool wqc_running; struct workqueue *wqc_workqueue; }; static xcfunc_t workqueue_enqueue_xc; static void workqueue_drain_notify(struct workqueue *); static void workqueue_drain_notify_outofline(struct workqueue *); static xfunc_t workqueue_drain_xc; static void workqueue_job(struct threadpool_job *); static struct pool workqueue_pool; static struct pool workqueue_cpu_pool; void workqueues_init(void) { pool_init(&workqueue_pool, sizeof(struct workqueue), 0, 0, 0, "workq", NULL, IPL_NONE); pool_init(&workqueue_cpu_pool, sizeof(struct workqueue_cpu), 0, 0, 0, "workqcpu", NULL, IPL_NONE); } /* Creating workqueues */ int workqueue_create(struct workqueue **workqueuep, const char *name, work_fn_t fn, void *arg, pri_t work_pri, int schedule_ipl, int flags) { struct workqueue *workqueue; struct percpu *percpu; size_t percpu_size; struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); workqueue = pool_get(&workqueue_pool, PR_WAITOK); workqueue->wq_fn = fn; workqueue->wq_arg = arg; workqueue->wq_pri = work_pri; workqueue->wq_flags = flags; mutex_init(&workqueue->wq_drain_lock, MUTEX_DEFAULT, schedule_ipl); cv_init(&workqueue->wq_drain_cv, "wqdrain"); workqueue->wq_draining = 0; workqueue->wq_percpu = percpu_alloc(sizeof(struct workqueue_cpu *)); if (workqueue->wq_percpu == NULL) { error = ENOMEM; goto fail0; } KASSERT(work_pri <= MAXPRI_WORKQUEUE); error = threadpool_percpu_get(&workqueue->wq_pool_percpu, work_pri); if (error) goto fail1; for (CPU_INFO_FOREACH(cii, ci)) { struct workqueue_cpu *const wqc = pool_get(&workqueue_cpu_pool, PR_WAITOK); SIMPLEQ_INIT(&wqc->wqc_queue); mutex_init(&wqc->wqc_lock, MUTEX_DEFAULT, schedule_ipl); cv_init(&wqc->wqc_cv, name); threadpool_job_init(&wqc->wqc_job, &workqueue_job, &wqc->wqc_lock, "%s/%u", name, cpu_index(ci)); wqc->wqc_running = false; wqc->wqc_workqueue = workqueue; percpu_traverse_enter(); struct workqueue_cpu **const wqcp = percpu_getptr_remote(workqueue->wq_percpu, ci); *wqcp = wqc; percpu_traverse_exit(); } /* Success! */ *workqueuep = workqueue; return 0; fail1: percpu_free(workqueue->wq_percpu, sizeof(struct workqueue_cpu *)); fail0: pool_put(&workqueue_pool, workqueue); return error; } /* Destroying workqueues */ void workqueue_destroy(struct workqueue *workqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); workqueue->wq_flags |= WORKQUEUE_DYING; /* * Guarantee the flag is visible before cv_broadcast. * mutex_enter provides the necessary read barrier. */ membar_producer(); for (CPU_INFO_FOREACH(cii, ci)) { percpu_traverse_enter(); struct workqueue_cpu **const wqcp = percpu_getptr_remote(workqueue->wq_percpu, ci); struct workqueue_cpu *const wqc = *wqcp; *wqcp = NULL; percpu_traverse_exit(); /* * Cancel the job. It is the caller's responsibility * to process any outstanding work, e.g. by draining * first or maintaining a separate list of all work. */ struct threadpool *const pool = threadpool_percpu_ref_remote( workqueue->wq_pool_percpu, ci); mutex_enter(&wqc->wqc_lock); /* We set WORKQUEUE_DYING earlier; wake job if waiting. */ cv_broadcast(&wqc->wqc_cv); /* Cancel it if not. */ threadpool_cancel_job(pool, &wqc->wqc_job); mutex_exit(&wqc->wqc_lock); KASSERT(wqc->wqc_workqueue == workqueue); KASSERT(!wqc->wqc_running); threadpool_job_destroy(&wqc->wqc_job); KASSERT(!cv_has_waiters(&wqc->wqc_cv)); cv_destroy(&wqc->wqc_cv); mutex_destroy(&wqc->wqc_lock); KASSERT(SIMPLEQ_EMPTY(&wqc->wqc_queue)); pool_put(&workqueue_thread_pool, wqc); } threadpool_percpu_put(workqueue->wq_pool_percpu, workqueue->wq_pri); KASSERT(workqueue->wq_draining == 0); KASSERT(!cv_has_waiters(&workqueue->wq_drain_cv)); cv_destroy(&workqueue->wq_drain_cv); mutex_destroy(&workqueue->wq_drain_lock); percpu_free(workqueue->wq_percpu, sizeof(struct workqueue_cpu *)); pool_put(&workqueue_pool, workqueue); } /* Scheduling work */ void workqueue_schedule(struct workqueue *workqueue, struct work *work) { struct workqueue_cpu **const wqcp = percpu_getref(&workqueue->wq_percpu); struct workqueue_cpu *const wqc = *wqcp; SIMPLEQ_INSERT_TAIL(&wqc->wqc_queue, work, w_entry); /* If it's not already running, kick the thread. */ if (__predict_false(!wqc->wqc_running)) { struct threadpool *const pool = threadpool_percpu_ref(workqueue->wq_pool_percpu); percpu_putref(workqueue->wq_percpu); mutex_enter(&wqc->wqc_lock); threadpool_schedule_job(pool, &wqc->wqc_job); wqc->wqc_running = true; cv_signal(&wqc->wqc_cv); mutex_exit(&wqc->wqc_lock); return; } percpu_putref(&workqueue->wq_percpu); } /* Scheduling work on another CPU */ void workqueue_enqueue(struct workqueue *workqueue, struct work *work, struct cpu_info *ci) { int flags; /* If the caller doesn't care, use the current CPU. */ if (ci == NULL) { workqueue_schedule(workqueue, work); goto out0; } /* Otherwise, we may need to do an xcall, which requires sleeping. */ ASSERT_SLEEPABLE(); /* Don't switch between calling curcpu and workqueue_schedule. */ kpreempt_disable(); /* If the CPU the caller wants is the current one, easy. */ if (ci == curcpu()) { workqueue_schedule(workqueue, work); goto out1; } xc_wait(xc_unicast(0, &workqueue_enqueue_xc, workqueue, work, ci)); out1: kpreempt_enable(); out0: return; } static void workqueue_enqueue_xc(void *workqueue_v, void *work_v) { struct workqueue *const workqueue = workqueue_v; struct work *const work = work_v; workqueue_schedule(workqueue, work); } /* Draining work */ void workqueue_drain(struct workqueue *workqueue) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; ASSERT_SLEEPABLE(); mutex_enter(&workqueue->wq_drain_lock); /* Wait for any pending drain to complete. */ while (workqueue->wq_draining) cv_wait(&workqueue->wq_drain_cv, &wq->wq_drain_lock); /* Initiate a drain. */ workqueue->wq_draining = ncpu; mutex_exit(&workqueue->wq_drain_lock); /* Force the drain to happen on each CPU. */ xc_wait(xc_broadcast(0, &workqueue_drain_xc, workqueue, NULL)); mutex_enter(&workqueue->wq_drain_lock); /* Wait for the drain to complete. */ while (workqueue->wq_draining) cv_wait(&workqueue->wq_drain_cv, &wq->wq_drain_lock); mutex_exit(&workqueue->wq_draining); } static inline void workqueue_drain_notify(struct workqueue *workqueue) { if (__predict_false(workqueue->wq_draining)) workqueue_drain_notify_outofline(workqueue); } static void __noinline workqueue_drain_notify_outofline(struct workqueue *workqueue) { unsigned int draining; do { draining = workqueue->wq_draining; KASSERT(0 < draining); if (draining == 1) { mutex_enter(&workqueue->wq_drain_lock); draining = atomic_dec_uint_nv(&workqueue->wq_draining); KASSERT(draining != UINT_MAX); if (draining == 0) cv_broadcast(&workqueue->wq_drain_cv); mutex_exit(&workqueue->wq_drain_lock); return; } } while (atomic_cas_uint(&workqueue->wq_draining, draining, (draining - 1)) != draining); } static void workqueue_drain_xc(void *workqueue_v, void *dummy __unused) { struct workqueue *const workqueue = workqueue_v; struct threadpool *const pool = threadpool_percpu_ref(workqueue->wq_u.thread.pool_percpu); struct workqueue_cpu **const wqcp = percpu_getref(workqueue->wq_percpu); struct workqueue_cpu *const wqc = *wqcp; percpu_putref(workqueue->wq_percpu); mutex_enter(&wqc->wqc_lock); threadpool_schedule_job(pool, &wqc->wqc_job); wqc->wqc_running = true; cv_signal(&wqc->wqc_cv); mutex_exit(&wqc->wqc_lock); } /* * Scheduling thread pool jobs is not cheap, so we try to keep the * thread assigned to our job for a little while, and use the * wqc->wqc_running flag to avoid interprocessor synchronization. * * Transition of wqc->wqc_running from false to true happens only under * wqc->wqc_lock and is signalled by wqc->wqc_cv when scheduling work. * Transition of wqc->wqc_running from true to false happens only under * the workqueue CPU lock when the thread has found that the queue is * empty and is about to wait for work. * * If you are on the same CPU, and if, under the workqueue CPU lock, * you read that wqc->wqc_running is true, then you are guaranteed that * after you drop the workqueue CPU lock, the thread will re-examine * the queue. Otherwise, it is necessary to schedule the job, set * wqc->wqc_running to true, and signal wqc->wqc_cv, all under * wqc->wqc_lock. */ static void workqueue_job(struct threadpool_job *job) { struct workqueue_cpu *const wqc = container_of(job, struct workqueue_cpu, wqc_job); struct workqueue *const workqueue = wqc->wqc_workqueue; struct work_head *const queue = &wqc->wqc_queue; struct work_head tmp = SIMPLEQ_HEAD_INITIALIZER(tmp); struct work *work; for (;;) { kpreempt_disable(); /* If there's no work to be done, wait for it. */ while (__predict_false(SIMPLEQ_EMPTY(queue))) { wqc->wqc_running = false; kpreempt_enable(); mutex_enter(&wqc->wqc_lock); while (!wqc->wqc_running) { /* Before we wait, report no work to drain. */ workqueue_drain_notify(workqueue); if (ISSET(workqueue->wq_flags, WORKQUEUE_DYING)) goto out; if (cv_timedwait(&wqc->wqc_cv, &wqc->wqc_lock, WORKQUEUE_IDLE_TICKS)) { if (!wqc->wqc_running) goto out; } } mutex_exit(&wqc->wqc_lock); kpreempt_disable(); } KASSERT(!SIMPLEQ_EMPTY(queue)); KASSERT(wqc->wqc_running); /* Grab a batch of work off the queue. */ SIMPLEQ_CONCAT(&tmp, queue); kpreempt_enable(); /* Run it all. */ SIMPLEQ_FOREACH(work, tmp, w_entry) (*workqueue->wq_fn)(work, workqueue->wq_arg); SIMPLEQ_INIT(&tmp); /* reset */ /* Report that all work scheduled so far is done. */ workqueue_drain_notify(workqueue); } out: threadpool_job_done(job); mutex_exit(&wqc->wqc_lock); }