/* $NetBSD$ */ /* * XXX WARNING WARNING WARNING XXX * * This code does not work! I have not even compile-tested it. It is * a draft of an idea. */ /*- * Copyright (c) 2014 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Taylor R. Campbell. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /* * Thread pools. * * A thread pool is a collection of worker threads idle or running * jobs, together with an overseer thread that does not run jobs but * can be given jobs to assign to a worker thread. Scheduling a job in * a thread pool does not allocate or even sleep at all, except perhaps * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so * they do not incur the expense of creating and destroying kthreads * unless there is not much work to be done. * * A per-CPU thread pool (threadpool_percpu) is a collection of thread * pools, one per CPU bound to that CPU. For each priority level in * use, there is one shared unbound thread pool (i.e., pool of threads * not bound to any CPU) and one shared per-CPU thread pool. * * To use the unbound thread pool at priority pri, call * threadpool_get(&pool, pri). When you're done, call * threadpool_put(pool, pri). * * To use the per-CPU thread pools at priority pri, call * threadpool_percpu_get(&pool_percpu, pri), and then use the thread * pool returned by threadpool_percpu_ref(pool_percpu) for the current * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another * CPU. When you're done, call threadpool_percpu_put(pool_percpu, * pri). * * +--MACHINE-----------------------------------------------+ * | +--CPU 0-------+ +--CPU 1-------+ +--CPU n-------+ | * | | | | | ... | | | * | | | | | ... | | | * | | | | | ... | | | * | | . | | . | ... | . | | * | | . | | . | ... | . | | * | | . | | . | ... | . | | * | +--------------+ +--------------+ +--------------+ | * | +--unbound---------+ | * | | | | * | | | | * | | | | * | +------------------+ | * +--------------------------------------------------------+ * * XXX Why one overseer per CPU? I did that originally to avoid * touching remote CPUs' memory when scheduling a job, but that still * requires interprocessor synchronization. Perhaps we could get by * with a single overseer thread, at the expense of another pointer in * struct threadpool_job to identify the CPU on which it must run in * order for the overseer to schedule it correctly. */ #include __KERNEL_RCSID(0, "$NetBSD$"); #include #include #include #include #include #include #include #include #include #include #include /* Data structures */ TAILQ_HEAD(job_head, threadpool_job); TAILQ_HEAD(thread_head, threadpool_thread); struct threadpool_thread { struct lwp *tpt_lwp; struct threadpool *tpt_pool; struct threadpool_job *tpt_job; kcondvar_t tpt_cv; TAILQ_ENTRY(thread) tpt_entry; }; struct threadpool { kmutex_t tp_lock; struct threadpool_thread tp_overseer; struct job_head tp_jobs; struct thread_head tp_idle_threads; unsigned int tp_refcnt; int tp_flags; #define THREADPOOL_DYING 0x01 struct cpu_info *tp_cpu; pri_t tp_pri; }; static int threadpool_hold(struct threadpool *); static void threadpool_rele(struct threadpool *); static int threadpool_percpu_create(struct threadpool_percpu **, pri_t); static void threadpool_percpu_destroy(struct threadpool_percpu *); static threadpool_job_fn_t threadpool_job_dead; static void threadpool_job_hold(struct threadpool_job *); static int threadpool_job_rele(struct threadpool_job *); static void threadpool_overseer_thread(void *) __dead; static void threadpool_thread(void *) __dead; static struct pool threadpool_pool; static pool_cache_t threadpool_thread_pc __read_mostly; static kmutex_t threadpools_lock __cacheline_aligned; static struct { struct threadpool *pool; unsigned int refcnt; }; unbound_threadpools[PRI_COUNT_KTHREAD + 1]; static struct { struct threadpool_percpu *pool_percpu; unsigned int refcnt; } threadpool_percpus[PRI_COUNT_KTHREAD + 1]; void threadpools_init(void) { pool_init(&threadpool_pool, sizeof(struct threadpool), 0, 0, 0, "thrdpool", NULL, IPL_NONE); threadpool_thread_pc = pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0, "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL); mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE); } static size_t threadpool_pri_index(pri_t pri) { if (pri == PRI_NONE) { return (PRI_COUNT_KTHREAD + 1); } else { KASSERTMSG((0 <= pri), "negative priority: %d", (int)pri); KASSERTMSG((pri <= PRI_COUNT_KTHREAD), "priority out of range: %d", (int)pri); return pri; } } /* Thread pool creation */ static int threadpool_create(struct threadpool **poolp, struct cpu_info *ci, pri_t pri) { struct threadpool *const pool = pool_get(&threadpool_pool, PR_WAITOK); struct lwp *lwp; int ktflags; int error; mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM); /* XXX overseer */ TAILQ_INIT(&pool->tp_jobs); TAILQ_INIT(&pool->tp_idle_threads); pool->tp_refcnt = 0; pool->tp_flags = 0; pool->tp_cpu = ci; pool->tp_pri = pri; pool->tp_fn = fn; pool->tp_arg = arg; error = threadpool_hold(pool); KASSERT(error == 0); pool->tp_overseer.tpt_lwp = NULL; pool->tp_overseer.tpt_pool = pool; pool->tp_overseer.tpt_job = NULL; cv_init(&pool->tp_overseer.tpt_cv, "poolover"); ktflags = 0; ktflags |= KTHREAD_MPSAFE; if (pri < PRI_KERNEL) ktflags |= KTHREAD_TS; error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread, &pool->tp_overseer, &lwp, "pooloverseer/%d@%d", (ci? cpu_index(ci) : -1), (int)pri); if (error) goto fail0; mutex_spin_enter(&pool->tp_lock); pool->tp_overseer.tpt_lwp = lwp; cv_broadcast(&pool->tp_overseer.tpt_cv); mutex_spin_exit(&pool->tp_lock); *poolp = pool; return 0; fail0: KASSERT(error); KASSERT(pool->tp_overseer.tpt_job == NULL); KASSERT(pool->tp_overseer.tpt_pool == pool); KASSERT(pool->tp_flags == THREADPOOL_DYING); KASSERT(pool->tp_refcnt == 0); KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); cv_destroy(&pool->tp_overseer.tpt_cv); mutex_destroy(&pool->tp_lock); pool_put(&threadpool_pool, pool); return error; } /* Thread pool destruction */ static void threadpool_destroy(struct threadpool *pool) { struct threadpool_thread *thread; /* Mark the pool dying and wait for threads to commit suicide. */ mutex_spin_enter(&pool->tp_lock); KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); pool->tp_flags |= THREADPOOL_DYING; TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry) cv_broadcast(&thread->tpt_cv); while (0 < pool->tp_refcnt) cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock); mutex_spin_exit(&pool->tp_lock); KASSERT(pool->tp_overseer.tpt_job == NULL); KASSERT(pool->tp_overseer.tpt_pool == pool); KASSERT(pool->tp_flags == THREADPOOL_DYING); KASSERT(pool->tp_refcnt == 0); KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); cv_destroy(&pool->tp_overseer.tpt_cv); mutex_destroy(&pool->tp_lock); pool_put(&threadpool_pool, pool); } static int threadpool_hold(struct threadpool *pool) { unsigned int refcnt; do { refcnt = pool->tp_refcnt; if (refcnt == UINT_MAX) return EBUSY; } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt + 1)) != refcnt); return 0; } static void threadpool_rele(struct threadpool *pool) { unsigned int refcnt; do { refcnt = pool->tp_refcnt; KASSERT(0 < refcnt); if (refcnt == 1) { mutex_spin_enter(&pool->tp_lock); refcnt = atomic_dec_uint_nv(&pool->tp_refcnt); KASSERT(refcnt != UINT_MAX); if (refcnt == 0) cv_broadcast(&pool->tp_overseer.tpt_cv); mutex_spin_exit(&pool->tp_lock); return; } } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt - 1)) != refcnt); } /* Unbound thread pools */ int threadpool_get(struct threadpool **poolp, pri_t pri) { const size_t i = threadpool_pri_index(pri); struct threadpool *pool, *tmp = NULL; int error; ASSERT_SLEEPABLE(); mutex_enter(&threadpools_lock); pool = unbound_threadpools[i].pool; if (pool == NULL) { mutex_exit(&threadpools_lock); error = threadpool_create(&tmp, NULL, pri); if (error) return error; KASSERT(tmp != NULL); mutex_enter(&threadpools_lock); pool = unbound_threadpools[i].pool; if (pool == NULL) { pool = unbound_threadpools[i].pool = tmp; tmp = NULL; } } KASSERT(pool != NULL); if (unbound_threadpools[i].refcnt == UINT_MAX) { mutex_exit(&threadpools_lock); if (tmp != NULL) threadpool_destroy(tmp); return EBUSY; } unbound_threadpools[i].refcnt++; mutex_exit(&threadpools_lock); if (tmp != NULL) threadpool_destroy(tmp); KASSERT(pool != NULL); *poolp = pool; return 0; } void threadpool_put(struct threadpool *pool, pri_t pri) { const size_t i = threadpool_pri_index(pri); ASSERT_SLEEPABLE(); mutex_enter(&threadpools_lock); KASSERT(pool == unbound_threadpools[i].pool); KASSERT(0 < unbound_threadpools[i].refcnt); if (--unbound_threadpools[i].refcnt == 0) unbound_threadpools[i].pool = NULL; else pool = NULL; mutex_exit(&threadpools_lock); if (pool) threadpool_destroy(pool); } /* Per-CPU thread pools */ int threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) { const size_t i = threadpool_pri_index(pri); struct threadpool_percpu *pool_percpu, *tmp = NULL; int error; ASSERT_SLEEPABLE(); mutex_enter(&threadpools_lock); pool_percpu = threadpool_percpus[i].pool_percpu; if (pool_percpu == NULL) { mutex_exit(&threadpools_lock); error = threadpool_percpu_create(&tmp, pri); if (error) return error; KASSERT(tmp != NULL); mutex_enter(&threadpools_lock); pool_percpu = threadpool_percpus[i].pool_percpu; if (pool_percpu == NULL) { pool_percpu = threadpool_percpus[i].pool_percpu = tmp; tmp = NULL; } } KASSERT(pool_percpu != NULL); if (threadpool_percpus[i].refcnt == UINT_MAX) { mutex_exit(&threadpools_lock); if (tmp != NULL) threadpool_percpu_destroy(tmp); return EBUSY; } threadpool_percpus[i].refcnt++; mutex_exit(&threadpools_lock); if (tmp != NULL) threadpool_percpu_destroy(tmp); KASSERT(pool_percpu != NULL); *pool_percpup = pool_percpu; return 0; } void threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri) { const size_t i = threadpool_pri_index(pri); ASSERT_SLEEPABLE(); mutex_enter(&threadpools_lock); KASSERT(pool_percpu == threadpool_percpus[i].pool_percpu); KASSERT(0 < threadpool_percpus[i].refcnt); if (--threadpool_percpus[i].refcnt == 0) threadpool_percpus[i].pool_percpu = NULL; else pool_percpu = NULL; mutex_exit(&threadpools_lock); if (pool_percpu) threadpool_percpu_destroy(pool_percpu); } struct threadpool * threadpool_percpu_ref(struct threadpool_percpu *pool_percpu) { struct percpu *const percpu = (struct percpu *)pool_percpu; struct threadpool **poolp, *pool; poolp = percpu_getref(percpu); pool = *poolp; percpu_putref(percpu); return pool; } struct threadpool * threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu, struct cpu_info *ci) { struct percpu *const percpu = (struct percpu *)pool_percpu; struct threadpool **poolp, *pool; percpu_traverse_enter(); poolp = percpu_getptr_remote(percpu, ci); pool = *poolp; percpu_traverse_exit(); return pool; } static int threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri) { struct percpu *percpu; struct cpu_info *ci; CPU_INFO_ITERATOR cii; unsigned int i, j; int error; percpu = percpu_alloc(sizeof(struct threadpool *)); if (percpu == NULL) { error = ENOMEM; goto fail0; } for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) { struct threadpool *pool; error = threadpool_create(&pool, ci, pri); if (error) goto fail1; percpu_traverse_enter(); struct threadpool **const poolp = percpu_getptr_remote(percpu, ci); *poolp = pool; percpu_traverse_exit(); } /* Success! */ *pool_percpup = (struct threadpool_percpu *)percpu; return 0; fail1: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) { if (i <= j) break; percpu_traverse_enter(); struct threadpool **const poolp = percpu_getptr_remote(percpu, ci); struct threadpool *const pool = *poolp; percpu_traverse_exit(); taskthread_pool_destroy(pool); } percpu_free(percpu, sizeof(struct taskthread_pool *)); fail0: return error; } static void threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu) { struct percpu *const percpu = (struct percpu *)pool_percpu; struct cpu_info *ci; CPU_INFO_ITERATOR cii; for (CPU_INFO_FOREACH(cii, ci)) { percpu_traverse_enter(); struct threadpool **const poolp = percpu_getptr_remote(percpu, ci); struct threadpool *const pool = *poolp; percpu_traverse_exit(); threadpool_destroy(pool); } percpu_free(percpu, sizeof(struct threadpool *)); } /* Thread pool jobs */ void __printflike(4,5) threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn, kmutex_t *lock, const char *fmt, ...) { va_list ap; va_start(ap, fmt); (void)vsnprintf(&job->job_name, sizeof(job->job_name), fmt, ap); va_end(ap); job->job_lock = lock; job->job_thread = NULL; job->job_refcnt = 0; cv_init(&job->job_cv, job->job_name); job->job_fn = fn; } static void threadpool_job_dead(struct threadpool_job *job) { panic("threadpool job %p ran after destruction", job); } void threadpool_job_destroy(struct threadpool_job *job) { ASSERT_SLEEPABLE(); KASSERTMSG((job->job_thread == NULL), "job %p still running", job); mutex_enter(job->job_lock); while (0 < job->job_refcnt) cv_wait(&job->job_cv, job->job_lock); mutex_exit(job->job_lock); job->job_lock = NULL; KASSERT(job->job_thread == NULL); KASSERT(job->job_refcnt == 0); KASSERT(!cv_has_waiters(&job->job_cv)); cv_destroy(&job->job_cv); job->job_fn = &threadpool_job_dead; (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name)); } static int threadpool_job_hold(struct threadpool_job *job) { unsigned int refcnt; do { refcnt = job->job_refcnt; if (refcnt == UINT_MAX) return EBUSY; } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1)) != refcnt); return 0; } static void threadpool_job_rele(struct threadpool_job *job) { unsigned int refcnt; do { refcnt = job->job_refcnt; KASSERT(0 < refcnt); if (refcnt == 1) { mutex_enter(job->job_lock); refcnt = atomic_dec_uint_nv(&job->job_refcnt); KASSERT(refcnt != UINT_MAX); if (refcnt == 0) cv_broadcast(&job->job_cv); mutex_exit(job->job_lock); return; } } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1)) != refcnt); } void threadpool_job_done(struct threadpool_job *job) { KASSERT(mutex_owned(job->job_lock)); KASSERT(job->job_thread != NULL); KASSERT(job->job_thread->tpt_lwp == curlwp); cv_broadcast(&job->job_cv); job->job_thread = NULL; } void _threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) { KASSERT(mutex_owned(job->job_lock)); /* * If the job's already running, let it keep running. The job * is guaranteed by the interlock not to end early -- if it had * ended early, threadpool_job_done would have set job_thread * to NULL under the interlock. */ if (job->job_thread != NULL) return; /* Otherwise, try to assign a thread to the job. */ mutex_spin_enter(&pool->tp_lock); if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { /* Nobody's idle. Give it to the overseer. */ job->job_thread = &pool->tp_overseer; TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); } else { /* Assign it to the first idle thread. */ job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, tpt_entry); job->job_thread->tpt_job = job; } /* Notify whomever we gave it to, overseer or idle thread. */ KASSERT(job->job_thread != NULL); cv_broadcast(&job->job_thread->tpt_cv); mutex_spin_exit(&pool->tp_lock); } bool threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job) { KASSERT(mutex_owned(job->job_lock)); if (job->job_thread == NULL) { /* Nothing to do. Guaranteed not running. */ return true; } else if (job->job_thread == &pool->tp_overseer) { /* Take it off the list to guarantee it won't run. */ job->job_thread = NULL; mutex_spin_enter(&pool->tp_lock); TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); mutex_spin_exit(&pool->tp_lock); return true; } else { /* Too late -- already running. */ return false; } } void threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job) { ASSERT_SLEEPABLE(); KASSERT(mutex_owned(job->job_lock)); if (threadpool_cancel_job_async(pool, job)) return; /* Already running. Wait for it to complete. */ while (job->job_thread != NULL) cv_wait(&job->job_cv, job->job_lock); } /* Thread pool overseer thread */ static void __dead threadpool_overseer_thread(void *arg) { struct threadpool_thread *const overseer = arg; struct threadpool *const pool = overseer->tpt_pool; struct lwp *lwp = NULL; int ktflags; int error; KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); /* Wait until we're initialized. */ mutex_spin_enter(&pool->tp_lock); while (overseer->tpt_lwp == NULL) cv_wait(&overseer->tpt_cv, &pool->tp_lock); for (;;) { /* Wait until there's a job. */ while (TAILQ_EMPTY(&pool->tp_jobs)) { if (ISSET(pool->tp_flags, THREADPOOL_DYING)) break; cv_wait(&overseer->tpt_cv, &pool->tp_lock); } if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs))) break; /* If there are no threads, we'll have to try to start one. */ if (TAILQ_EMPTY(&pool->tp_idle_threads)) { error = threadpool_hold(pool); if (error) { (void)kpause("thrdplrf", false, hz, &pool->tp_lock); continue; } mutex_spin_exit(&pool->tp_lock); struct threadpool_thread *const thread = pool_cache_get(threadpool_thread_pc, PR_WAITOK); thread->tpt_lwp = NULL; thread->tpt_pool = pool; thread->tpt_job = NULL; cv_init(&thread->tpt_cv, "poolthrd"); ktflags = 0; ktflags |= KTHREAD_MPSAFE: if (pool->tp_pri < PRI_KERNEL) ktflags |= KTHREAD_TS; error = kthread_create(pool->tp_pri, ktflags, pool->tp_cpu, &threadpool_thread, thread, &lwp, "poolthread/%d@%d", (pool->tp_cpu? cpu_index(pool->tp_cpu) : -1), (int)pool->tp_pri); mutex_spin_enter(&pool->tp_lock); if (error) { pool_cache_put(threadpool_thread_pc, thread); threadpool_rele(pool); /* XXX What to do to wait for memory? */ (void)kpause("thrdplcr", false, hz, &pool->tp_lock); continue; } KASSERT(lwp != NULL); TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry); thread->tpt_lwp = lwp; lwp = NULL; cv_broadcast(&thread->tpt_cv); continue; } /* There are idle threads, so try giving one a job. */ bool rele_job = true; struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs); TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); error = threadpool_job_hold(job); if (error) { TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); (void)kpause("pooljob", false, hz, &pool->tp_lock); continue; } mutex_spin_exit(&pool->tp_lock); mutex_enter(job->job_lock); /* If the job was cancelled, we'll no longer be its thread. */ if (__predict_true(job->job_thread == overseer)) { mutex_spin_enter(&pool->tp_lock); if (__predict_false( TAILQ_EMPTY(&pool->tp_idle_threads))) { /* * Someone else snagged the thread * first. We'll have to try again. */ TAILQ_INSERT_HEAD(&pool->tp_jobs, job, job_entry); } else { /* * Assign the job to the thread and * wake the thread so it starts work. */ struct threadpool_thread *const thread = TAILQ_FIRST(&pool->tp_idle_threads); KASSERT(thread->tpt_job == NULL); TAILQ_REMOVE(&pool->tp_idle_threads, thread, tpt_entry); thread->tpt_job = job; job->job_thread = thread; cv_broadcast(&thread->tpt_cv); /* Gave the thread our job reference. */ rele_job = false; } mutex_spin_exit(&pool->tp_lock); } mutex_exit(job->job_lock); if (__predict_false(rele_job)) threadpool_job_rele(job); mutex_spin_enter(&pool->tp_lock); } mutex_spin_exit(&pool->tp_lock); threadpool_rele(pool); kthread_exit(0); } /* Thread pool thread */ static void __dead threadpool_thread(void *arg) { struct threadpool_thread *const thread = arg; struct threadpool *const pool = thread->tpt_pool; KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); /* Wait until we're initialized and on the queue. */ mutex_spin_enter(&pool->tp_lock); while (thread->tpt_lwp == NULL) cv_wait(&thread->tpt_cv, &pool->tp_lock); KASSERT(thread->tpt_lwp == curlwp); for (;;) { /* Wait until we are assigned a job. */ while (thread->tpt_job == NULL) { if (ISSET(pool->tp_flags, THREADPOOL_DYING)) break; if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, THREADPOOL_IDLE_TICKS)) break; } if (__predict_false(thread->tpt_job == NULL)) { TAILQ_REMOVE(&pool->tp_idle_threads, thread, tpt_entry); break; } struct threadpool_job *const job = thread->tpt_job; KASSERT(job != NULL); mutex_spin_exit(&pool->tp_lock); /* Set our lwp name to reflect what job we're doing. */ lwp_lock(curlwp); char *const lwp_name = curlwp->l_name; curlwp->l_name = job->job_name; lwp_unlock(curlwp); /* Run the job. */ (*job->job_fn)(job); /* Restore our lwp name. */ lwp_lock(curlwp); curlwp->l_name = lwp_name; lwp_unlock(curlwp); /* Job is done and its name is unreferenced. Release it. */ threadpool_job_rele(job); mutex_spin_enter(&pool->tp_lock); KASSERT(thread->tpt_job == job); thread->tpt_job = NULL; TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry); } mutex_spin_exit(&pool->tp_lock); KASSERT(!cv_has_waiters(&thread->tpt_cv)); cv_destroy(&thread->tpt_cv); pool_cache_put(threadpool_thread_pc, thread); threadpool_rele(pool); kthread_exit(0); }