commit d3744f59385f7bec0a3007545ca4782dd9818a55 Author: Ryota Ozaki Date: Wed Dec 20 15:08:25 2017 +0900 Add workqueue_wait that waits for a specific work to finish The caller must ensure that no new work is enqueued before calling workqueue_wait. Note that Note that if the workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue other than the waiting queue. diff --git a/sys/kern/subr_workqueue.c b/sys/kern/subr_workqueue.c index 7e651d1ebda..bdba92c1a55 100644 --- a/sys/kern/subr_workqueue.c +++ b/sys/kern/subr_workqueue.c @@ -49,8 +49,10 @@ SIMPLEQ_HEAD(workqhead, work_impl); struct workqueue_queue { kmutex_t q_mutex; kcondvar_t q_cv; - struct workqhead q_queue; + struct workqhead q_queue_pending; + struct workqhead q_queue_running; lwp_t *q_worker; + work_impl_t *q_waiter; }; struct workqueue { @@ -115,24 +117,29 @@ workqueue_worker(void *cookie) q = workqueue_queue_lookup(wq, curlwp->l_cpu); for (;;) { - struct workqhead tmp; - /* * we violate abstraction of SIMPLEQ. */ -#if defined(DIAGNOSTIC) - tmp.sqh_last = (void *)POISON; -#endif /* defined(DIAGNOSTIC) */ - mutex_enter(&q->q_mutex); - while (SIMPLEQ_EMPTY(&q->q_queue)) + while (SIMPLEQ_EMPTY(&q->q_queue_pending)) cv_wait(&q->q_cv, &q->q_mutex); - tmp.sqh_first = q->q_queue.sqh_first; /* XXX */ - SIMPLEQ_INIT(&q->q_queue); + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); + q->q_queue_running.sqh_first = + q->q_queue_pending.sqh_first; /* XXX */ + SIMPLEQ_INIT(&q->q_queue_pending); mutex_exit(&q->q_mutex); - workqueue_runlist(wq, &tmp); + workqueue_runlist(wq, &q->q_queue_running); + + mutex_enter(&q->q_mutex); + KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); + SIMPLEQ_INIT(&q->q_queue_running); + if (__predict_false(q->q_waiter != NULL)) { + /* Wake up workqueue_wait */ + cv_signal(&q->q_cv); + } + mutex_exit(&q->q_mutex); } } @@ -159,7 +166,8 @@ workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); cv_init(&q->q_cv, wq->wq_name); - SIMPLEQ_INIT(&q->q_queue); + SIMPLEQ_INIT(&q->q_queue_pending); + SIMPLEQ_INIT(&q->q_queue_running); ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); if (wq->wq_prio < PRI_KERNEL) ktf |= KTHREAD_TS; @@ -194,7 +202,7 @@ workqueue_exit(struct work *wk, void *arg) */ KASSERT(q->q_worker == curlwp); - KASSERT(SIMPLEQ_EMPTY(&q->q_queue)); + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); mutex_enter(&q->q_mutex); q->q_worker = NULL; cv_signal(&q->q_cv); @@ -210,10 +218,10 @@ workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) KASSERT(wq->wq_func == workqueue_exit); wqe.wqe_q = q; - KASSERT(SIMPLEQ_EMPTY(&q->q_queue)); + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); KASSERT(q->q_worker != NULL); mutex_enter(&q->q_mutex); - SIMPLEQ_INSERT_TAIL(&q->q_queue, &wqe.wqe_wk, wk_entry); + SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); cv_signal(&q->q_cv); while (q->q_worker != NULL) { cv_wait(&q->q_cv, &q->q_mutex); @@ -271,6 +279,64 @@ workqueue_create(struct workqueue **wqp, const char *name, return error; } +static bool +workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) +{ + work_impl_t *wk; + bool found = false; + + mutex_enter(&q->q_mutex); + again: + SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { + if (wk == wk_target) + goto found; + } + SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { + if (wk == wk_target) + goto found; + } + found: + if (wk != NULL) { + found = true; + KASSERT(q->q_waiter == NULL); + q->q_waiter = wk; + cv_wait(&q->q_cv, &q->q_mutex); + goto again; + } + if (q->q_waiter != NULL) + q->q_waiter = NULL; + mutex_exit(&q->q_mutex); + + return found; +} + +/* + * Wait for a specified work to finish. The caller must ensure that no new + * work will be enqueued before calling workqueue_wait. Note that if the + * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue + * other than the waiting queue. + */ +void +workqueue_wait(struct workqueue *wq, struct work *wk) +{ + struct workqueue_queue *q; + bool found; + + if (ISSET(wq->wq_flags, WQ_PERCPU)) { + struct cpu_info *ci; + CPU_INFO_ITERATOR cii; + for (CPU_INFO_FOREACH(cii, ci)) { + q = workqueue_queue_lookup(wq, ci); + found = workqueue_q_wait(q, (work_impl_t *)wk); + if (found) + break; + } + } else { + q = workqueue_queue_lookup(wq, NULL); + (void) workqueue_q_wait(q, (work_impl_t *)wk); + } +} + void workqueue_destroy(struct workqueue *wq) { @@ -298,7 +364,8 @@ workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) q = workqueue_queue_lookup(wq, ci); mutex_enter(&q->q_mutex); - SIMPLEQ_INSERT_TAIL(&q->q_queue, wk, wk_entry); + KASSERT(q->q_waiter == NULL); + SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); cv_signal(&q->q_cv); mutex_exit(&q->q_mutex); } diff --git a/sys/sys/workqueue.h b/sys/sys/workqueue.h index 26d974bced2..b9685bf1dca 100644 --- a/sys/sys/workqueue.h +++ b/sys/sys/workqueue.h @@ -51,6 +51,7 @@ struct workqueue; int workqueue_create(struct workqueue **, const char *, void (*)(struct work *, void *), void *, pri_t, int, int); void workqueue_destroy(struct workqueue *); +void workqueue_wait(struct workqueue *, struct work *); void workqueue_enqueue(struct workqueue *, struct work *, struct cpu_info *);