struct taskqueue { kmutex_t tq_lock; kcondvar_t tq_cv; struct task_head tq_tasks; struct task *tq_current_task; struct lwp *tq_lwp; int tq_flags; int tq_ipl; const char *tq_name; }; static void task_error(struct task *task) { panic("destroyed task got run: %p", task); } void task_destroy(struct task *task) { task->task_fn = task_error; } void task_done(struct task *task) { struct taskqueue *const taskqueue = (struct taskqueue *)task->task_worker; KASSERT(taskqueue != NULL); mutex_enter(&taskqueue->tq_lock); KASSERT(task->task_worker == (struct taskworker *)taskqueue); KASSERT(taskqueue->tq_current_task == task); KASSERT(!ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)); taskqueue_done(taskqueue, task); mutex_exit(&taskqueue->tq_lock); } void task_schedule(struct task *task) { taskqueue_schedule(system_taskqueue, task); } bool task_cancel(struct task *task, kmutex_t *interlock) { struct taskqueue *const taskqueue = (struct taskqueue *)task->task_worker; ASSERT_SLEEPABLE(); if (taskqueue == NULL) return true; mutex_enter(&taskqueue->tq_lock); KASSERT(task->task_worker == (struct taskworker *)taskqueue); if (taskqueue->tq_current_task == task) { mutex_exit(interlock); while ((taskqueue->tq_current_task == task) && !ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)) cv_wait(&taskqueue->tq_cv, &taskqueue->tq_lock); mutex_exit(&taskqueue->tq_lock); mutex_enter(interlock); return false; } TAILQ_REMOVE(&taskqueue->tq_tasks, task, task_entry); mutex_exit(&taskqueue->tq_lock); return true; } bool task_cancel_async(struct task *task) { struct taskqueue *const taskqueue = (struct taskqueue *)task->task_worker; if (taskqueue == NULL) return true; mutex_enter(&taskqueue->tq_lock); KASSERT(task->task_worker == (struct taskworker *)taskqueue); if (taskqueue->tq_current_task == task) { mutex_exit(&taskqueue->tq_lock); return false; } TAILQ_REMOVE(&taskqueue->tq_tasks, task, task_entry); mutex_exit(&taskqueue->tq_lock); return true; } void delayed_task_destroy(struct delayed_task *dt) { callout_destroy(&dt->dt_callout); task_destroy(&dt->dt_task); } bool delayed_task_timedout(struct delayed_task *dt) { return callout_invoking(&dt->dt_callout); } void delayed_task_schedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { taskqueue_schedule_delayed(system_taskqueue, dt, nsec, jitter); } bool delayed_task_reschedule(struct delayed_task *dt, nsec_t nsec, jitter_t jitter) { struct taskqueue *const taskqueue = (struct taskqueue *)dt->dt_task.task_worker; if (taskqueue == NULL) return false; mutex_enter(&taskqueue->tq_lock); if (dt->dt_task.task_worker != (struct taskworker *)taskqueue) { mutex_exit(&taskqueue->tq_lock); return false; } if (taskqueue->tq_current_task != &dt->dt_task) { mutex_exit(&taskqueue->tq_lock); return false; } if (callout_stop(&dt->dt_callout)) { mutex_exit(&taskqueue->tq_lock); return false; } if (nsec == 0) { TAILQ_INSERT_TAIL(&taskqueue->tq_tasks, &dt->dt_task, task_entry); cv_broadcast(&taskqueue->tq_cv); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } mutex_exit(&taskqueue->tq_lock); return true; } void delayed_task_cancel(struct delayed_task *dt, kmutex_t *interlock) { struct taskqueue *const taskqueue = (struct taskqueue *)dt->dt_task.task_worker; if (taskqueue == NULL) return false; mutex_enter(&taskqueue->tq_lock); if (dt->dt_task.task_worker != (struct taskqueue *)worker) { mutex_exit(&taskqueue->tq_lock); return false; } if (taskqueue->tq_current_task == &dt->dt_task) { mutex_exit(interlock); do { if (ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)) break; cv_wait(&taskqueue->tq_cv, &taskqueue->tq_lock); } while (taskqueue->tq_current_task == &dt->dt_task); mutex_exit(&taskqueue->tq_lock); mutex_enter(interlock); return false; } if (!callout_stop(&dt->dt_callout)) { mutex_exit(&taskqueue->tq_lock); return false; } mutex_exit(interlock); if (!callout_halt(&dt->dt_callout, &taskqueue->tq_lock)) { mutex_exit(&taskqueue->tq_lock); mutex_enter(interlock); return true; } if (taskqueue->tq_current_task == &dt->dt_task) { do { if (ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)) break; cv_wait(&taskqueue->tq_cv, &taskqueue->tq_lock); } while (taskqueue->tq_current_task == &dt->dt_task); mutex_exit(&taskqueue->tq_lock); mutex_enter(interlock); return false; } TAILQ_REMOVE(&taskqueue->tq_tasks, &dt->dt_task, task_entry); mutex_exit(&taskqueue->tq_lock); mutex_enter(interlock); return true; } void delayed_task_cancel_async(struct delayed_task *dt) { struct taskqueue *const taskqueue = (struct taskqueue *)dt->dt_task.task_worker; if (taskqueue == NULL) return false; mutex_enter(&taskqueue->tq_lock); if (dt->dt_task.task_worker != (struct taskqueue *)worker) { mutex_exit(&taskqueue->tq_lock); return false; } if (!callout_stop(&dt->dt_callout)) { mutex_exit(&taskqueue->tq_lock); return true; } if (taskqueue->tq_current_task != &dt->dt_task) { TAILQ_REMOVE(&taskqueue->tq_tasks, &dt->dt_task, task_entry); mutex_exit(&taskqueue->tq_lock); return true; } mutex_exit(&taskqueue->tq_lock); return false; } int taskqueue_create(struct taskqueue **taskqueuep, const char *name, pri_t task_pri, int schedule_ipl, int flags) { struct taskqueue *taskqueue; int error; ASSERT_SLEEPABLE(); KASSERTMSG(((task_pri == PRI_NONE) || (task_pri <= MAXPRI_KTHREAD)), "priority too high for taskqueue_create(%s): %d", name, (int)task_pri); taskqueue = pool_get(&taskqueue_pool, PR_WAITOK); taskqueue->tq_flags = flags; taskqueue->tq_pri = task_pri; taskqueue->tq_ipl = schedule_ipl; taskqueue->tq_name = name; mutex_init(&taskqueue->tq_lock, MUTEX_DEFAULT, schedule_ipl); cv_init(&taskqueue->tq_cv, name); TAILQ_INIT(&taskqueue->tq_tasks); taskqueue->tq_current_task = NULL; error = kthread_create(task_pri, (KTHREAD_MPSAFE | KTHREAD_TS), NULL, &taskqueue_thread, taskqueue, &taskqueue->tq_lwp, "%s", name); if (error) goto fail0; *taskqueuep = taskqueue; return 0; fail0: KASSERT(error); KASSERT(taskqueue->tq_current_task == NULL); KASSERT(TAILQ_EMPTY(&taskqueue->tq_tasks)); KASSERT(!cv_has_waiters(&taskqueue->tq_cv)); cv_destroy(&taskqueue->tq_cv); mutex_destroy(&taskqueue->tq_lock); pool_put(&taskqueue_pool, taskqueue); return error; } void taskqueue_destroy(struct taskqueue *taskqueue) { mutex_enter(&taskqueue->tq_lock); taskqueue->tq_flags |= TASKQUEUE_DYING; cv_broadcast(&taskqueue->tq_cv); mutex_exit(&taskqueue->tq_lock); (void)kthread_join(taskqueue->tq_lwp); KASSERT(taskqueue->tq_current_task == NULL); KASSERT(TAILQ_EMPTY(&taskqueue->tq_tasks)); KASSERT(!cv_has_waiters(&taskqueue->tq_cv)); cv_destroy(&taskqueue->tq_cv); mutex_destroy(&taskqueue->tq_lock); pool_put(&taskqueue_pool, taskqueue); } void taskqueue_schedule(struct taskqueue *taskqueue, struct task *task) { mutex_enter(&taskqueue->tq_lock); struct taskqueue *const taskqueue0 = atomic_cas_ptr(&task->task_worker, NULL, (struct taskworker *)taskqueue); if (taskqueue0 != NULL) { KASSERT(taskqueue0 == taskqueue); if ((taskqueue->tq_current_task == task) && !ISSET(taskqueue->tq_flags, TASKQUEUE_REQUEUED)) { taskqueue->tq_flags |= TASKQUEUE_REQUEUED; TAILQ_INSERT_TAIL(&taskqueue->tq_tasks, task, task_entry); } goto out; } TAILQ_INSERT_TAIL(&taskqueue->tq_tasks, task, task_entry); cv_broadcast(&taskqueue->tq_cv); out: mutex_exit(&taskqueue->tq_lock); } void taskqueue_schedule_delayed(struct taskqueue *taskqueue, struct delayed_task *dt) { mutex_enter(&taskqueue->tq_lock); struct taskqueue *const taskqueue0 = atomic_cas_ptr(&dt->dt_task.task_worker, NULL, (struct taskworker *)taskqueue); if (taskqueue0 != NULL) { KASSERT(taskqueue0 == taskqueue); if ((taskqueue->tq_current_task == &dt->dt_task) && !ISSET(taskqueue->tq_flags, TASKQUEUE_REQUEUED)) { taskqueue->tq_flags |= TASKQUEUE_REQUEUED; if (nsec == 0) { taskqueue->tq_flags |= TASKQUEUE_CALLOUT_ACK; TAILQ_INSERT_TAIL(&taskqueue->tq_tasks, &dt->dt_task, task_entry); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } } goto out; } if (nsec == 0) { callout_ack(&dt->dt_callout); TAILQ_INSERT_TAIL(&taskqueue->tq_tasks, &dt->dt_task, task_entry); cv_broadcast(&taskqueue->tq_cv); } else { callout_schedule(&dt->dt_callout, nstohz(nsec, jitter)); } out: mutex_exit(&taskqueue->tq_lock); } static void __dead taskqueue_thread(void *arg) { struct taskqueue *const taskqueue = arg; mutex_enter(&taskqueue->tq_lock); for (;;) { while (TAILQ_EMPTY(&taskqueue->tq_tasks)) { if (ISSET(taskqueue->tq_flags, TASKQUEUE_DYING)) break; cv_wait(&taskqueue->tq_cv, &taskqueue->tq_lock); } if (TAILQ_EMPTY(&taskqueue->tq_tasks)) break; struct task *const task = TAILQ_FIRST(&taskqueue->tq_tasks); TAILQ_REMOVE(&taskqueue->tq_tasks, task, task_entry); KASSERT(taskqueue->tq_current_task == NULL); KASSERT(!ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)); KASSERT(!ISSET(taskqueue->tq_flags, TASKQUEUE_REQUEUED)); taskqueue->tq_current_task = task; mutex_exit(&taskqueue->tq_lock); (*task->task_fn)(task); mutex_enter(&taskqueue->tq_lock); KASSERT(taskqueue->tq_current_task == task); if (!ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)) taskqueue_done(taskqueue, task); KASSERT(ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)); KASSERT(!ISSET(taskqueue->tq_flags, TASKQUEUE_REQUEUED)); KASSERT(!ISSET(worker->tw_flags, TASKWORKER_CALLOUT_ACK)); taskqueue->tq_flags &= ~(TASKQUEUE_DONE | TASKQUEUE_REQUEUED | TASKQUEUE_CALLOUT_ACK); } mutex_exit(&taskqueue->tq_lock); kthread_exit(0); } static void taskqueue_done(struct taskqueue *taskqueue, struct task *task) { KASSERT(mutex_owned(&taskqueue->tq_lock)); KASSERT(taskqueue->tq_current_task == task); KASSERT(!ISSET(taskqueue->tq_flags, TASKQUEUE_DONE)); KASSERT(task->task_worker == (struct taskworker *)taskqueue); /* XXX KLUDGE */ if (ISSET(taskqueue->tq_flags, TASKQUEUE_CALLOUT_ACK)) { struct delayed_task *const dt = container_of(task, struct delayed_task, dt_task); /* Clear the CALLOUT_INVOKING flag before it runs again. */ callout_ack(&dt->dt_callout); taskqueue->tq_flags &= ~TASKQUEUE_CALLOUT_ACK; } if (ISSET(taskqueue->tq_flags, TASKQUEUE_REQUEUED)) { taskqueue->tq_flags &= ~TASKQUEUE_REQUEUED; } else { struct taskqueue *const taskqueue0 __diagused = atomic_swap_ptr(&task->task_worker, taskqueue); KASSERT(taskqueue0 == taskqueue); taskqueue->tq_current_task = NULL; cv_broadcast(&taskqueue->tq_cv); } taskqueue->tq_flags |= TASKQUEUE_DONE; } struct drain_task { struct task drt_task; kmutex_t drt_lock; kcondvar_t drt_cv; bool drt_done; }; void taskqueue_drain(struct taskqueue *taskqueue) { struct drain_task drt; task_init(&drt->drt_task, &drain_task); mutex_init(&drt->drt_lock, MUTEX_DEFAULT, IPL_NONE); cv_init(&drt->drt_cv, "tqdrain"); drt->drt_done = false; taskqueue_schedule(taskqueue, &drt->drt_task); mutex_enter(&drt->drt_lock); while (!drt->drt_done) cv_wait(&drt->drt_cv, &drt->drt_lock); mutex_exit(&drt->drt_lock); cv_destroy(&drt->drt_cv); mutex_destroy(&drt->drt_lock); task_destroy(&drt->drt_task); } static void drain_task(struct task *task) { struct drain_task *const drt = container_of(task, struct drain_task, drt_task); mutex_enter(&drt->drt_lock); drt->drt_done = true; cv_broadcast(&drt->drt_lock); mutex_exit(&drt->drt_lock); }