commit 8ca468c83c643be0f6ec8c92d459db91430b8b81 Author: Ryota Ozaki Date: Wed Dec 20 15:08:25 2017 +0900 Add workqueue_drain that waits for all pending works to finish It is useful when you want to safely call workqueue_destroy that requires that there remains no work in the queue. Without workqueue_drain the caller had to satisfy the constract by itself somehow. workqueue_drain has a contract that the caller must ensure no new work to be enqueued to the workqueue during workqueue_drain. diff --git a/sys/kern/subr_workqueue.c b/sys/kern/subr_workqueue.c index 7e651d1ebda..e88be2ea2b5 100644 --- a/sys/kern/subr_workqueue.c +++ b/sys/kern/subr_workqueue.c @@ -49,7 +49,8 @@ SIMPLEQ_HEAD(workqhead, work_impl); struct workqueue_queue { kmutex_t q_mutex; kcondvar_t q_cv; - struct workqhead q_queue; + struct workqhead q_queue_pending; + struct workqhead q_queue_running; lwp_t *q_worker; }; @@ -57,6 +58,8 @@ struct workqueue { void (*wq_func)(struct work *, void *); void *wq_arg; int wq_flags; + /* Internal flags */ +#define WQ_DRAINING 0x1000 /* Draining. No new work is allowed. */ char wq_name[MAXCOMLEN]; pri_t wq_prio; @@ -115,24 +118,29 @@ workqueue_worker(void *cookie) q = workqueue_queue_lookup(wq, curlwp->l_cpu); for (;;) { - struct workqhead tmp; - /* * we violate abstraction of SIMPLEQ. */ -#if defined(DIAGNOSTIC) - tmp.sqh_last = (void *)POISON; -#endif /* defined(DIAGNOSTIC) */ - mutex_enter(&q->q_mutex); - while (SIMPLEQ_EMPTY(&q->q_queue)) + while (SIMPLEQ_EMPTY(&q->q_queue_pending)) cv_wait(&q->q_cv, &q->q_mutex); - tmp.sqh_first = q->q_queue.sqh_first; /* XXX */ - SIMPLEQ_INIT(&q->q_queue); + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); + q->q_queue_running.sqh_first = + q->q_queue_pending.sqh_first; /* XXX */ + SIMPLEQ_INIT(&q->q_queue_pending); mutex_exit(&q->q_mutex); - workqueue_runlist(wq, &tmp); + workqueue_runlist(wq, &q->q_queue_running); + + mutex_enter(&q->q_mutex); + KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); + SIMPLEQ_INIT(&q->q_queue_running); + if (__predict_false(ISSET(wq->wq_flags, WQ_DRAINING))) { + /* Wake up workqueue_drain */ + cv_signal(&q->q_cv); + } + mutex_exit(&q->q_mutex); } } @@ -159,7 +167,8 @@ workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); cv_init(&q->q_cv, wq->wq_name); - SIMPLEQ_INIT(&q->q_queue); + SIMPLEQ_INIT(&q->q_queue_pending); + SIMPLEQ_INIT(&q->q_queue_running); ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); if (wq->wq_prio < PRI_KERNEL) ktf |= KTHREAD_TS; @@ -194,7 +203,7 @@ workqueue_exit(struct work *wk, void *arg) */ KASSERT(q->q_worker == curlwp); - KASSERT(SIMPLEQ_EMPTY(&q->q_queue)); + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); mutex_enter(&q->q_mutex); q->q_worker = NULL; cv_signal(&q->q_cv); @@ -210,10 +219,10 @@ workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) KASSERT(wq->wq_func == workqueue_exit); wqe.wqe_q = q; - KASSERT(SIMPLEQ_EMPTY(&q->q_queue)); + KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); KASSERT(q->q_worker != NULL); mutex_enter(&q->q_mutex); - SIMPLEQ_INSERT_TAIL(&q->q_queue, &wqe.wqe_wk, wk_entry); + SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); cv_signal(&q->q_cv); while (q->q_worker != NULL) { cv_wait(&q->q_cv, &q->q_mutex); @@ -271,6 +280,51 @@ workqueue_create(struct workqueue **wqp, const char *name, return error; } +static void +workqueue_drain_wait(struct workqueue_queue *q) +{ + + KASSERT(q->q_worker != NULL); + + mutex_enter(&q->q_mutex); + while (!SIMPLEQ_EMPTY(&q->q_queue_pending) && + !SIMPLEQ_EMPTY(&q->q_queue_running)) { + cv_wait(&q->q_cv, &q->q_mutex); + } + mutex_exit(&q->q_mutex); +} + +/* + * Wait for all pending works to finish. The caller must ensure that no new + * work will be enqueued before calling workqueue_drain. + */ +void +workqueue_drain(struct workqueue *wq) +{ + struct workqueue_queue *q; + + wq->wq_flags |= WQ_DRAINING; + + if (ISSET(wq->wq_flags, WQ_PERCPU)) { + struct cpu_info *ci; + CPU_INFO_ITERATOR cii; + for (CPU_INFO_FOREACH(cii, ci)) { + q = workqueue_queue_lookup(wq, ci); + workqueue_drain_wait(q); + } + } else { + q = workqueue_queue_lookup(wq, NULL); + workqueue_drain_wait(q); + } +} + +void +workqueue_activate(struct workqueue *wq) +{ + + wq->wq_flags &= ~WQ_DRAINING; +} + void workqueue_destroy(struct workqueue *wq) { @@ -298,7 +352,9 @@ workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) q = workqueue_queue_lookup(wq, ci); mutex_enter(&q->q_mutex); - SIMPLEQ_INSERT_TAIL(&q->q_queue, wk, wk_entry); + KASSERTMSG((wq->wq_flags & WQ_DRAINING) == 0, + "adding work to a draining workqueue"); + SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); cv_signal(&q->q_cv); mutex_exit(&q->q_mutex); } diff --git a/sys/sys/workqueue.h b/sys/sys/workqueue.h index 26d974bced2..f305c5c1664 100644 --- a/sys/sys/workqueue.h +++ b/sys/sys/workqueue.h @@ -51,6 +51,8 @@ struct workqueue; int workqueue_create(struct workqueue **, const char *, void (*)(struct work *, void *), void *, pri_t, int, int); void workqueue_destroy(struct workqueue *); +void workqueue_drain(struct workqueue *); +void workqueue_activate(struct workqueue *); void workqueue_enqueue(struct workqueue *, struct work *, struct cpu_info *);