From d0e224a9bbcb92da2bb0fe9c3a1dc9a9352e0965 Mon Sep 17 00:00:00 2001 From: Taylor R Campbell Date: Sat, 8 Feb 2020 23:03:43 +0000 Subject: [PATCH 1/2] Teach threadpool(9) to use percpu_create, mostly. --- sys/kern/kern_threadpool.c | 110 ++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/sys/kern/kern_threadpool.c b/sys/kern/kern_threadpool.c index e1c9f342ba78..0a34875b2a48 100644 --- a/sys/kern/kern_threadpool.c +++ b/sys/kern/kern_threadpool.c @@ -132,6 +132,9 @@ static void threadpool_rele(struct threadpool *); static int threadpool_percpu_create(struct threadpool_percpu **, pri_t); static void threadpool_percpu_destroy(struct threadpool_percpu *); +static void threadpool_percpu_init(void *, void *, struct cpu_info *); +static void threadpool_percpu_ok(void *, void *, struct cpu_info *); +static void threadpool_percpu_fini(void *, void *, struct cpu_info *); static threadpool_job_fn_t threadpool_job_dead; @@ -576,80 +579,77 @@ static int threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri) { struct threadpool_percpu *pool_percpu; - struct cpu_info *ci; - CPU_INFO_ITERATOR cii; - unsigned int i, j; - int error; + bool ok = true; pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP); - if (pool_percpu == NULL) { - error = ENOMEM; - goto fail0; - } pool_percpu->tpp_pri = pri; + pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *), + threadpool_percpu_init, threadpool_percpu_fini, + (void *)(intptr_t)pri); - pool_percpu->tpp_percpu = percpu_alloc(sizeof(struct threadpool *)); - if (pool_percpu->tpp_percpu == NULL) { - error = ENOMEM; - goto fail1; - } - - for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) { - struct threadpool *pool; - - pool = kmem_zalloc(sizeof(*pool), KM_SLEEP); - error = threadpool_create(pool, ci, pri); - if (error) { - kmem_free(pool, sizeof(*pool)); - goto fail2; - } - percpu_traverse_enter(); - struct threadpool **const poolp = - percpu_getptr_remote(pool_percpu->tpp_percpu, ci); - *poolp = pool; - percpu_traverse_exit(); - } + /* + * Verify that all of the CPUs were initialized. + * + * XXX What to do if we add CPU hotplug? + */ + percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok); + if (!ok) + goto fail; /* Success! */ *pool_percpup = (struct threadpool_percpu *)pool_percpu; return 0; -fail2: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) { - if (i <= j) - break; - percpu_traverse_enter(); - struct threadpool **const poolp = - percpu_getptr_remote(pool_percpu->tpp_percpu, ci); - struct threadpool *const pool = *poolp; - percpu_traverse_exit(); - threadpool_destroy(pool); - kmem_free(pool, sizeof(*pool)); - } - percpu_free(pool_percpu->tpp_percpu, sizeof(struct taskthread_pool *)); -fail1: kmem_free(pool_percpu, sizeof(*pool_percpu)); -fail0: return error; +fail: percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); + kmem_free(pool_percpu, sizeof(*pool_percpu)); + return ENOMEM; } static void threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu) { - struct cpu_info *ci; - CPU_INFO_ITERATOR cii; - - for (CPU_INFO_FOREACH(cii, ci)) { - percpu_traverse_enter(); - struct threadpool **const poolp = - percpu_getptr_remote(pool_percpu->tpp_percpu, ci); - struct threadpool *const pool = *poolp; - percpu_traverse_exit(); - threadpool_destroy(pool); - kmem_free(pool, sizeof(*pool)); - } percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); kmem_free(pool_percpu, sizeof(*pool_percpu)); } +static void +threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci) +{ + struct threadpool **const poolp = vpoolp; + pri_t pri = (intptr_t)(void *)vpri; + int error; + + *poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP); + error = threadpool_create(*poolp, ci, pri); + if (error) { + KASSERT(error == ENOMEM); + kmem_free(*poolp, sizeof(**poolp)); + *poolp = NULL; + } +} + +static void +threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci) +{ + struct threadpool **const poolp = vpoolp; + bool *okp = vokp; + + if (*poolp == NULL) + atomic_store_relaxed(okp, false); +} + +static void +threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci) +{ + struct threadpool **const poolp = vpoolp; + + if (*poolp == NULL) /* initialization failed */ + return; + threadpool_destroy(*poolp); + kmem_free(*poolp, sizeof(**poolp)); +} + /* Thread pool jobs */ void __printflike(4,5) From b2c4fb953405b0ef983cd8810f251fd7b8b2f161 Mon Sep 17 00:00:00 2001 From: Taylor R Campbell Date: Sat, 8 Feb 2020 23:35:18 +0000 Subject: [PATCH 2/2] Switch from ad-hoc logging to dtrace probes. --- sys/kern/kern_threadpool.c | 168 +++++++++++++++++++++++++++---------- 1 file changed, 124 insertions(+), 44 deletions(-) diff --git a/sys/kern/kern_threadpool.c b/sys/kern/kern_threadpool.c index 0a34875b2a48..8f3e0dc96a03 100644 --- a/sys/kern/kern_threadpool.c +++ b/sys/kern/kern_threadpool.c @@ -97,10 +97,82 @@ __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.15 2019/01/17 10:18:52 hannken #include #include #include -#include +#include #include +#include #include +/* Probes */ + +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get, + "pri_t"/*pri*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create, + "pri_t"/*pri*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race, + "pri_t"/*pri*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put, + "struct threadpool *"/*pool*/, "pri_t"/*pri*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy, + "struct threadpool *"/*pool*/, "pri_t"/*pri*/); + +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get, + "pri_t"/*pri*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create, + "pri_t"/*pri*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race, + "pri_t"/*pri*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put, + "struct threadpool *"/*pool*/, "pri_t"/*pri*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy, + "struct threadpool *"/*pool*/, "pri_t"/*pri*/); + +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create, + "struct cpu_info *"/*ci*/, "pri_t"/*pri*/); +SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success, + "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/); +SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure, + "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy, + "struct threadpool *"/*pool*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait, + "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/); + +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job, + "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running, + "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__overseer, + "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); +SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread, + "struct threadpool *"/*pool*/, + "struct threadpool_job *"/*job*/, + "struct lwp *"/*thread*/); + +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__start, + "struct threadpool *"/*pool*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__dying, + "struct threadpool *"/*pool*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__spawn, + "struct threadpool *"/*pool*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, overseer__race, + "struct threadpool *"/*pool*/, + "struct threadpool_job *"/*job*/); +SDT_PROBE_DEFINE3(sdt, kernel, threadpool, overseer__assign, + "struct threadpool *"/*pool*/, + "struct threadpool_job *"/*job*/, + "struct lwp *"/*thread*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__exit, + "struct threadpool *"/*pool*/); + +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start, + "struct threadpool *"/*pool*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying, + "struct threadpool *"/*pool*/); +SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job, + "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); +SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit, + "struct threadpool *"/*pool*/); + /* Data structures */ TAILQ_HEAD(job_head, threadpool_job); @@ -224,12 +296,6 @@ threadpool_remove_percpu(struct threadpool_percpu *tpp) LIST_REMOVE(tpp, tpp_link); } -#ifdef THREADPOOL_VERBOSE -#define TP_LOG(x) printf x -#else -#define TP_LOG(x) /* nothing */ -#endif /* THREADPOOL_VERBOSE */ - static int sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS) { @@ -308,6 +374,8 @@ threadpool_create(struct threadpool *const pool, struct cpu_info *ci, KASSERT(threadpool_pri_is_valid(pri)); + SDT_PROBE2(sdt, kernel, threadpool, create, ci, pri); + mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM); /* XXX overseer */ TAILQ_INIT(&pool->tp_jobs); @@ -337,6 +405,7 @@ threadpool_create(struct threadpool *const pool, struct cpu_info *ci, cv_broadcast(&pool->tp_overseer.tpt_cv); mutex_spin_exit(&pool->tp_lock); + SDT_PROBE3(sdt, kernel, threadpool, create__success, ci, pri, pool); return 0; fail0: KASSERT(error); @@ -349,6 +418,7 @@ fail0: KASSERT(error); KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); cv_destroy(&pool->tp_overseer.tpt_cv); mutex_destroy(&pool->tp_lock); + SDT_PROBE3(sdt, kernel, threadpool, create__failure, ci, pri, error); return error; } @@ -359,6 +429,8 @@ threadpool_destroy(struct threadpool *pool) { struct threadpool_thread *thread; + SDT_PROBE1(sdt, kernel, threadpool, destroy, pool); + /* Mark the pool dying and wait for threads to commit suicide. */ mutex_spin_enter(&pool->tp_lock); KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); @@ -367,8 +439,8 @@ threadpool_destroy(struct threadpool *pool) TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry) cv_broadcast(&thread->tpt_cv); while (0 < pool->tp_refcnt) { - TP_LOG(("%s: draining %" PRIu64 " references...\n", __func__, - pool->tp_refcnt)); + SDT_PROBE2(sdt, kernel, threadpool, destroy__wait, + pool, pool->tp_refcnt); cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock); } mutex_spin_exit(&pool->tp_lock); @@ -413,6 +485,8 @@ threadpool_get(struct threadpool **poolp, pri_t pri) ASSERT_SLEEPABLE(); + SDT_PROBE1(sdt, kernel, threadpool, get, pri); + if (! threadpool_pri_is_valid(pri)) return EINVAL; @@ -420,8 +494,7 @@ threadpool_get(struct threadpool **poolp, pri_t pri) tpu = threadpool_lookup_unbound(pri); if (tpu == NULL) { mutex_exit(&threadpools_lock); - TP_LOG(("%s: No pool for pri=%d, creating one.\n", - __func__, (int)pri)); + SDT_PROBE1(sdt, kernel, threadpool, get__create, pri); tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP); error = threadpool_create(&tmp->tpu_pool, NULL, pri); if (error) { @@ -431,11 +504,11 @@ threadpool_get(struct threadpool **poolp, pri_t pri) mutex_enter(&threadpools_lock); tpu = threadpool_lookup_unbound(pri); if (tpu == NULL) { - TP_LOG(("%s: Won the creation race for pri=%d.\n", - __func__, (int)pri)); tpu = tmp; tmp = NULL; threadpool_insert_unbound(tpu); + } else { + SDT_PROBE1(sdt, kernel, threadpool, get__race, pri); } } KASSERT(tpu != NULL); @@ -459,15 +532,15 @@ threadpool_put(struct threadpool *pool, pri_t pri) container_of(pool, struct threadpool_unbound, tpu_pool); ASSERT_SLEEPABLE(); - KASSERT(threadpool_pri_is_valid(pri)); + SDT_PROBE2(sdt, kernel, threadpool, put, pool, pri); + mutex_enter(&threadpools_lock); KASSERT(tpu == threadpool_lookup_unbound(pri)); KASSERT(0 < tpu->tpu_refcnt); if (--tpu->tpu_refcnt == 0) { - TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", - __func__, (int)pri)); + SDT_PROBE2(sdt, kernel, threadpool, put__destroy, pool, pri); threadpool_remove_unbound(tpu); } else { tpu = NULL; @@ -490,6 +563,8 @@ threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) ASSERT_SLEEPABLE(); + SDT_PROBE1(sdt, kernel, threadpool, percpu__get, pri); + if (! threadpool_pri_is_valid(pri)) return EINVAL; @@ -497,8 +572,7 @@ threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) pool_percpu = threadpool_lookup_percpu(pri); if (pool_percpu == NULL) { mutex_exit(&threadpools_lock); - TP_LOG(("%s: No pool for pri=%d, creating one.\n", - __func__, (int)pri)); + SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create, pri); error = threadpool_percpu_create(&tmp, pri); if (error) return error; @@ -506,11 +580,12 @@ threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) mutex_enter(&threadpools_lock); pool_percpu = threadpool_lookup_percpu(pri); if (pool_percpu == NULL) { - TP_LOG(("%s: Won the creation race for pri=%d.\n", - __func__, (int)pri)); pool_percpu = tmp; tmp = NULL; threadpool_insert_percpu(pool_percpu); + } else { + SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race, + pri); } } KASSERT(pool_percpu != NULL); @@ -533,12 +608,14 @@ threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri) KASSERT(threadpool_pri_is_valid(pri)); + SDT_PROBE2(sdt, kernel, threadpool, percpu__put, pool_percpu, pri); + mutex_enter(&threadpools_lock); KASSERT(pool_percpu == threadpool_lookup_percpu(pri)); KASSERT(0 < pool_percpu->tpp_refcnt); if (--pool_percpu->tpp_refcnt == 0) { - TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", - __func__, (int)pri)); + SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy, + pool_percpu, pri); threadpool_remove_percpu(pool_percpu); } else { pool_percpu = NULL; @@ -768,6 +845,8 @@ threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) KASSERT(mutex_owned(job->job_lock)); + SDT_PROBE2(sdt, kernel, threadpool, schedule__job, pool, job); + /* * If the job's already running, let it keep running. The job * is guaranteed by the interlock not to end early -- if it had @@ -775,8 +854,8 @@ threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) * to NULL under the interlock. */ if (__predict_true(job->job_thread != NULL)) { - TP_LOG(("%s: job '%s' already runnining.\n", - __func__, job->job_name)); + SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running, + pool, job); return; } @@ -786,15 +865,15 @@ threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) mutex_spin_enter(&pool->tp_lock); if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { /* Nobody's idle. Give it to the overseer. */ - TP_LOG(("%s: giving job '%s' to overseer.\n", - __func__, job->job_name)); + SDT_PROBE2(sdt, kernel, threadpool, schedule__job__overseer, + pool, job); job->job_thread = &pool->tp_overseer; TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); } else { /* Assign it to the first idle thread. */ job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); - TP_LOG(("%s: giving job '%s' to idle thread %p.\n", - __func__, job->job_name, job->job_thread)); + SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread, + pool, job, job->job_thread->tpt_lwp); TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, tpt_entry); job->job_thread->tpt_job = job; @@ -878,20 +957,21 @@ threadpool_overseer_thread(void *arg) int error; KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); + KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); /* Wait until we're initialized. */ mutex_spin_enter(&pool->tp_lock); while (overseer->tpt_lwp == NULL) cv_wait(&overseer->tpt_cv, &pool->tp_lock); - TP_LOG(("%s: starting.\n", __func__)); + SDT_PROBE1(sdt, kernel, threadpool, overseer__start, pool); for (;;) { /* Wait until there's a job. */ while (TAILQ_EMPTY(&pool->tp_jobs)) { if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { - TP_LOG(("%s: THREADPOOL_DYING\n", - __func__)); + SDT_PROBE1(sdt, kernel, threadpool, + overseer__dying, pool); break; } cv_wait(&overseer->tpt_cv, &pool->tp_lock); @@ -901,8 +981,8 @@ threadpool_overseer_thread(void *arg) /* If there are no threads, we'll have to try to start one. */ if (TAILQ_EMPTY(&pool->tp_idle_threads)) { - TP_LOG(("%s: Got a job, need to create a thread.\n", - __func__)); + SDT_PROBE1(sdt, kernel, threadpool, overseer__spawn, + pool); threadpool_hold(pool); mutex_spin_exit(&pool->tp_lock); @@ -965,8 +1045,8 @@ threadpool_overseer_thread(void *arg) * Someone else snagged the thread * first. We'll have to try again. */ - TP_LOG(("%s: '%s' lost race to use idle thread.\n", - __func__, job->job_name)); + SDT_PROBE2(sdt, kernel, threadpool, + overseer__race, pool, job); TAILQ_INSERT_HEAD(&pool->tp_jobs, job, job_entry); } else { @@ -977,8 +1057,8 @@ threadpool_overseer_thread(void *arg) struct threadpool_thread *const thread = TAILQ_FIRST(&pool->tp_idle_threads); - TP_LOG(("%s: '%s' gets thread %p\n", - __func__, job->job_name, thread)); + SDT_PROBE2(sdt, kernel, threadpool, + overseer__assign, job, thread->tpt_lwp); KASSERT(thread->tpt_job == NULL); TAILQ_REMOVE(&pool->tp_idle_threads, thread, tpt_entry); @@ -996,7 +1076,7 @@ threadpool_overseer_thread(void *arg) threadpool_rele(pool); mutex_spin_exit(&pool->tp_lock); - TP_LOG(("%s: exiting.\n", __func__)); + SDT_PROBE1(sdt, kernel, threadpool, overseer__exit, pool); kthread_exit(0); } @@ -1010,21 +1090,22 @@ threadpool_thread(void *arg) struct threadpool *const pool = thread->tpt_pool; KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); + KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); /* Wait until we're initialized and on the queue. */ mutex_spin_enter(&pool->tp_lock); while (thread->tpt_lwp == NULL) cv_wait(&thread->tpt_cv, &pool->tp_lock); - TP_LOG(("%s: starting.\n", __func__)); + SDT_PROBE1(sdt, kernel, threadpool, thread__start, pool); KASSERT(thread->tpt_lwp == curlwp); for (;;) { /* Wait until we are assigned a job. */ while (thread->tpt_job == NULL) { if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { - TP_LOG(("%s: THREADPOOL_DYING\n", - __func__)); + SDT_PROBE1(sdt, kernel, threadpool, + thread__dying, pool); break; } if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, @@ -1049,8 +1130,7 @@ threadpool_thread(void *arg) mutex_spin_exit(&pool->tp_lock); - TP_LOG(("%s: running job '%s' on thread %p.\n", - __func__, job->job_name, thread)); + SDT_PROBE2(sdt, kernel, threadpool, thread__job, pool, job); /* Run the job. */ (*job->job_fn)(job); @@ -1073,7 +1153,7 @@ threadpool_thread(void *arg) threadpool_rele(pool); mutex_spin_exit(&pool->tp_lock); - TP_LOG(("%s: thread %p exiting.\n", __func__, thread)); + SDT_PROBE1(sdt, kernel, threadpool, thread__exit, pool); KASSERT(!cv_has_waiters(&thread->tpt_cv)); cv_destroy(&thread->tpt_cv);