diff --git a/distrib/sets/lists/tests/mi b/distrib/sets/lists/tests/mi index 08776336bdc3f..0651c502e2526 100644 --- a/distrib/sets/lists/tests/mi +++ b/distrib/sets/lists/tests/mi @@ -8198,3 +8198,7 @@ ./usr/tests/util/xlint/lint1/d_zero_sized_arrays.c tests-obsolete obsolete ./usr/tests/util/xlint/lint1/t_integration tests-obsolete obsolete ./var/db/obsolete/tests base-sys-root atf +./usr/tests/lib/libc/sys/t_aio_cancel tests +./usr/tests/lib/libc/sys/t_aio_suspend tests +./usr/tests/lib/libc/sys/t_aio_rw tests +./usr/tests/lib/libc/sys/t_aio_lio tests diff --git a/sys/conf/files b/sys/conf/files index dcafeefce5bcc..7be7bf073fb07 100644 --- a/sys/conf/files +++ b/sys/conf/files @@ -51,6 +51,7 @@ defparam RTC_OFFSET defflag opt_pipe.h PIPE_SOCKETPAIR PIPE_NODIRECT defflag AIO +defflag AIOSP defflag MQUEUE defflag SEMAPHORE diff --git a/sys/kern/sys_aio.c b/sys/kern/sys_aio.c index 92832b19ad388..055510fb02e97 100644 --- a/sys/kern/sys_aio.c +++ b/sys/kern/sys_aio.c @@ -1,17 +1,17 @@ -/* $NetBSD: sys_aio.c,v 1.50 2024/12/07 02:38:51 riastradh Exp $ */ +/* $NetBSD: sys_aio.c,v 0.00 2025/08/15 12:00:00 ethan4984 Exp $ */ /* - * Copyright (c) 2007 Mindaugas Rasiukevicius + * Copyright (c) 2025 The NetBSD Foundation, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. + * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE @@ -27,8 +27,41 @@ */ /* - * Implementation of POSIX asynchronous I/O. - * Defined in the Base Definitions volume of IEEE Std 1003.1-2001. + * NetBSD asynchronous I/O service pool implementation + * + * Design overview + * + * Thread pool architecture: + * Each process owns an aiosp (service pool) with work threads (aiost). + * Workes are reused via freelist/active lists to avoid churn. + * Workers sleep on service_cv until a job is assigned. + * On process teardown, outstanding working is quiesced and threads are destroyed. + * + * Job distribution: + * Jobs are appended to aiosp->jobs which are then distributed to a worker thread. + * Regular files: Jobs are grouped together by file handle to allow for future + * optimisaton. + * Non-regular files: No grouping. Each jobs is handled directly by a discrete + * worker thread. + * Only regular files are candidates for non-blocking operation, however the + * non-blocking path is not implemented yet. Everything currently falls back to + * blocking I/O + * Distribution is triggered by aiosp_distribute_jobs + * + * Job tracking: + * A hash table (by userspace aiocb pointer) maps aiocb -> kernel job. + * This gives O(1)ish lookup for aio_error/aio_return/aio_suspend. + * Resubmission of the same aiocb updates the mapping. To allow userspace to + * reuse aiocb storage liberally. + * + * File group management: + * RB tree (aiost_file_tree) maintains active file groups. + * Groups are created ondemand when regular file jobs are distributed. + * Groups are destroyed when all jobs for that fp complete. + * Enables future enhancements like dynamic job appending during processing. + * + * Implementation notes + * io_read/io_write currently use fallback implementations */ #include @@ -40,6 +73,9 @@ __KERNEL_RCSID(0, "$NetBSD: sys_aio.c,v 1.50 2024/12/07 02:38:51 riastradh Exp $ #include #include +#include +#include +#include #include #include @@ -51,6 +87,7 @@ __KERNEL_RCSID(0, "$NetBSD: sys_aio.c,v 1.50 2024/12/07 02:38:51 riastradh Exp $ #include #include #include +#include #include #include #include @@ -80,15 +117,40 @@ static struct pool aio_job_pool; static struct pool aio_lio_pool; static void * aio_ehook; -static void aio_worker(void *); -static void aio_process(struct aio_job *); -static void aio_sendsig(struct proc *, struct sigevent *); static int aio_enqueue_job(int, void *, struct lio_req *); static void aio_exit(proc_t *, void *); static int sysctl_aio_listio_max(SYSCTLFN_PROTO); static int sysctl_aio_max(SYSCTLFN_PROTO); +/* Service pool functions */ +static int aiost_create(struct aiosp *, struct aiost **); +static int aiost_terminate(struct aiost *); +static void aiost_entry(void *); +static void aiost_sigsend(struct proc *, struct sigevent *); +static int aiosp_worker_extract(struct aiosp *, struct aiost **); + +static int io_write(struct aio_job *); +static int io_read(struct aio_job *); +static int io_sync(struct aio_job *); +static int uio_construct(struct aio_job *, struct file **, + struct iovec *, struct uio *); +static int io_write_fallback(struct aio_job *); +static int io_read_fallback(struct aio_job *); + +static void aio_job_fini(struct aio_job *); +static void aio_job_mark_complete(struct aio_job *); +static void aio_file_hold(struct file *); +static void aio_file_release(struct file *); + +static void aiocbp_destroy(struct aiosp *); +static int aiocbp_init(struct aiosp *, u_int); +static int aiocbp_insert(struct aiosp *, struct aiocbp *); +static int aiocbp_lookup_job(struct aiosp *, const void *, + struct aio_job **); +static int aiocbp_remove_job(struct aiosp *, const void *, + struct aio_job **, struct aiocbp **); + static const struct syscall_package aio_syscalls[] = { { SYS_aio_cancel, 0, (sy_call_t *)sys_aio_cancel }, { SYS_aio_error, 0, (sy_call_t *)sys_aio_error }, @@ -101,6 +163,26 @@ static const struct syscall_package aio_syscalls[] = { { 0, 0, NULL }, }; +/* + * Order RB with respect to fp + */ +static int +aiost_file_group_cmp(struct aiost_file_group *a, struct aiost_file_group *b) +{ + if (a == NULL || b == NULL) { + return (a == b) ? 0 : (a ? 1 : -1); + } + + uintptr_t ap = (uintptr_t)a->fp; + uintptr_t bp = (uintptr_t)b->fp; + + return (ap < bp) ? -1 : (ap > bp) ? 1 : 0; +} + +RB_HEAD(aiost_file_tree, aiost_file_group); +RB_PROTOTYPE(aiost_file_tree, aiost_file_group, tree, aiost_file_group_cmp); +RB_GENERATE(aiost_file_tree, aiost_file_group, tree, aiost_file_group_cmp); + /* * Tear down all AIO state. */ @@ -145,14 +227,15 @@ aio_init(void) int error; pool_init(&aio_job_pool, sizeof(struct aio_job), 0, 0, 0, - "aio_jobs_pool", &pool_allocator_nointr, IPL_NONE); + "aio_jobs_pool", &pool_allocator_nointr, IPL_NONE); pool_init(&aio_lio_pool, sizeof(struct lio_req), 0, 0, 0, - "aio_lio_pool", &pool_allocator_nointr, IPL_NONE); + "aio_lio_pool", &pool_allocator_nointr, IPL_NONE); aio_ehook = exithook_establish(aio_exit, NULL); error = syscall_establish(NULL, aio_syscalls); - if (error != 0) - (void)aio_fini(false); + if (error != 0) { + aio_fini(false); + } return error; } @@ -162,7 +245,6 @@ aio_init(void) static int aio_modcmd(modcmd_t cmd, void *arg) { - switch (cmd) { case MODULE_CMD_INIT: return aio_init(); @@ -180,52 +262,36 @@ static int aio_procinit(struct proc *p) { struct aioproc *aio; - struct lwp *l; int error; - vaddr_t uaddr; /* Allocate and initialize AIO structure */ - aio = kmem_zalloc(sizeof(struct aioproc), KM_SLEEP); - - /* Initialize queue and their synchronization structures */ - mutex_init(&aio->aio_mtx, MUTEX_DEFAULT, IPL_NONE); - cv_init(&aio->aio_worker_cv, "aiowork"); - cv_init(&aio->done_cv, "aiodone"); - TAILQ_INIT(&aio->jobs_queue); + aio = kmem_zalloc(sizeof(*aio), KM_SLEEP); - /* - * Create an AIO worker thread. - * XXX: Currently, AIO thread is not protected against user's actions. - */ - uaddr = uvm_uarea_alloc(); - if (uaddr == 0) { - aio_exit(p, aio); - return SET_ERROR(EAGAIN); + /* Initialize the service pool */ + error = aiosp_initialize(&aio->aiosp); + if (error) { + kmem_free(aio, sizeof(*aio)); + return error; } - error = lwp_create(curlwp, p, uaddr, 0, NULL, 0, aio_worker, - NULL, &l, curlwp->l_class, &curlwp->l_sigmask, &curlwp->l_sigstk); - if (error != 0) { - uvm_uarea_free(uaddr); - aio_exit(p, aio); + + error = aiocbp_init(&aio->aiosp, 256); + if (error) { + aiosp_destroy(&aio->aiosp, NULL); + kmem_free(aio, sizeof(*aio)); return error; } + /* Initialize queue and their synchronization structures */ + mutex_init(&aio->aio_mtx, MUTEX_DEFAULT, IPL_NONE); + /* Recheck if we are really first */ mutex_enter(p->p_lock); if (p->p_aio) { mutex_exit(p->p_lock); aio_exit(p, aio); - lwp_exit(l); return 0; } p->p_aio = aio; - - /* Complete the initialization of thread, and run it */ - aio->aio_worker = l; - lwp_lock(l); - lwp_changepri(l, MAXPRI_USER); - setrunnable(l); - /* LWP now unlocked */ mutex_exit(p->p_lock); return 0; @@ -237,240 +303,1259 @@ aio_procinit(struct proc *p) static void aio_exit(struct proc *p, void *cookie) { - struct aio_job *a_job; struct aioproc *aio; - if (cookie != NULL) + if (cookie != NULL) { aio = cookie; - else if ((aio = p->p_aio) == NULL) + } else if ((aio = p->p_aio) == NULL) { return; - - /* Free AIO queue */ - while (!TAILQ_EMPTY(&aio->jobs_queue)) { - a_job = TAILQ_FIRST(&aio->jobs_queue); - TAILQ_REMOVE(&aio->jobs_queue, a_job, list); - pool_put(&aio_job_pool, a_job); - atomic_dec_uint(&aio_jobs_count); } - /* Destroy and free the entire AIO data structure */ - cv_destroy(&aio->aio_worker_cv); - cv_destroy(&aio->done_cv); + aiocbp_destroy(&aio->aiosp); + aiosp_destroy(&aio->aiosp, NULL); mutex_destroy(&aio->aio_mtx); - kmem_free(aio, sizeof(struct aioproc)); + kmem_free(aio, sizeof(*aio)); } /* - * AIO worker thread and processor. + * Destroy job structure */ static void -aio_worker(void *arg) +aio_job_fini(struct aio_job *job) { - struct proc *p = curlwp->l_proc; + mutex_enter(&job->mtx); + aiowaitgrouplk_fini(&job->lk); + mutex_exit(&job->mtx); + mutex_destroy(&job->mtx); +} + +/* + * Mark job as complete + */ +static void +aio_job_mark_complete(struct aio_job *job) +{ + mutex_enter(&job->mtx); + job->completed = true; + aio_file_release(job->fp); + job->fp = NULL; + + aiowaitgrouplk_flush(&job->lk); + mutex_exit(&job->mtx); + + aiost_sigsend(job->p, &job->aiocbp.aio_sigevent); +} + +/* + * Acquire a file reference for async ops + */ +static void +aio_file_hold(struct file *fp) +{ + mutex_enter(&fp->f_lock); + fp->f_count++; + mutex_exit(&fp->f_lock); +} + +/* + * Release a file reference for async ops + */ +static void +aio_file_release(struct file *fp) +{ + mutex_enter(&fp->f_lock); + fp->f_count--; + if (!fp->f_count) { + mutex_exit(&fp->f_lock); + closef(fp); + return; + } + mutex_exit(&fp->f_lock); +} + +/* + * Release a job back to the pool + */ +static inline void +aio_job_release(struct aio_job *job) +{ + if (job->fp) { + aio_file_release(job->fp); + job->fp = NULL; + } + + aio_job_fini(job); + pool_put(&aio_job_pool, job); + atomic_dec_uint(&aio_jobs_count); +} + +/* + * Cancel a job pending on aiosp->jobs + */ +static inline void +aio_job_cancel(struct aiosp *aiosp, struct aio_job *job) +{ + mutex_enter(&job->mtx); + TAILQ_REMOVE(&aiosp->jobs, job, list); + aiosp->jobs_pending--; + job->on_queue = false; + job->aiocbp._errno = ECANCELED; + mutex_exit(&job->mtx); +} + +/* + * Remove file group from tree locked + */ +static inline void +aiosp_fg_teardown_locked(struct aiosp *sp, struct aiost_file_group *fg) +{ + if (fg == NULL) { + return; + } + + RB_REMOVE(aiost_file_tree, sp->fg_root, fg); + mutex_destroy(&fg->mtx); + kmem_free(fg, sizeof(*fg)); +} + +/* + * Remove file group from tree + */ +static inline void +aiosp_fg_teardown(struct aiosp *sp, struct aiost_file_group *fg) +{ + if (fg == NULL) { + return; + } + + mutex_enter(&sp->mtx); + aiosp_fg_teardown_locked(sp, fg); + mutex_exit(&sp->mtx); +} + +/* + * Group jobs by file descriptor and distribute to service threads. + * Regular files are coalesced per-fp, others get individual threads. + * Must be called with jobs queued in sp->jobs + */ +int +aiosp_distribute_jobs(struct aiosp *sp) +{ + struct aio_job *job, *tmp; + struct file *fp; + int error = 0; + + mutex_enter(&sp->mtx); + if (!sp->jobs_pending) { + mutex_exit(&sp->mtx); + return 0; + } + + TAILQ_FOREACH_SAFE(job, &sp->jobs, list, tmp) { + fp = job->fp; + KASSERT(fp); + + struct aiost_file_group *fg = NULL; + struct aiost *aiost = NULL; + + if (fp->f_vnode != NULL && fp->f_vnode->v_type == VREG) { + struct aiost_file_group key = { .fp = fp }; + fg = RB_FIND(aiost_file_tree, sp->fg_root, &key); + + if (fg == NULL) { + fg = kmem_zalloc(sizeof(*fg), KM_SLEEP); + fg->fp = fp; + fg->queue_size = 0; + mutex_init(&fg->mtx, MUTEX_DEFAULT, IPL_NONE); + TAILQ_INIT(&fg->queue); + + error = aiosp_worker_extract(sp, &aiost); + if (error) { + kmem_free(fg, sizeof(*fg)); + mutex_exit(&sp->mtx); + return error; + } + RB_INSERT(aiost_file_tree, sp->fg_root, fg); + fg->aiost = aiost; + + aiost->fg = fg; + aiost->job = NULL; + } else { + aiost = fg->aiost; + } + } else { + error = aiosp_worker_extract(sp, &aiost); + if (error) { + mutex_exit(&sp->mtx); + return error; + } + aiost->fg = NULL; + aiost->job = job; + } + + TAILQ_REMOVE(&sp->jobs, job, list); + sp->jobs_pending--; + job->on_queue = false; + + if (fg) { + mutex_enter(&fg->mtx); + TAILQ_INSERT_TAIL(&fg->queue, job, list); + fg->queue_size++; + mutex_exit(&fg->mtx); + } + + mutex_enter(&aiost->mtx); + aiost->freelist = false; + aiost->state = AIOST_STATE_OPERATION; + mutex_exit(&aiost->mtx); + cv_signal(&aiost->service_cv); + } + + mutex_exit(&sp->mtx); + return error; +} + +/* + * Wait for specified AIO operations to complete + * Create a waitgroup to monitor the specified aiocb list. + * Returns when timeout expires or completion criteria met + * + * AIOSP_SUSPEND_ANY return when any job completes + * AIOSP_SUSPEND_ALL return when all jobs complete + */ +int +aiosp_suspend(struct aiosp *aiosp, struct aiocb **aiocbp_list, int nent, + struct timespec *ts, int flags) +{ + struct aio_job *job; + struct aiowaitgroup *wg; + int error = 0, timo = 0; + size_t joined = 0; + + if (ts) { + timo = tstohz(ts); + if (timo <= 0) { + error = SET_ERROR(EAGAIN); + return error; + } + } + + wg = kmem_zalloc(sizeof(*wg), KM_SLEEP); + aiowaitgroup_init(wg); + + for (int i = 0; i < nent; i++) { + if (aiocbp_list[i] == NULL) { + continue; + } + + error = aiocbp_lookup_job(aiosp, aiocbp_list[i], &job); + if (error) { + goto done; + } + if (job == NULL) { + continue; + } + + if (job->completed) { + mutex_enter(&wg->mtx); + wg->completed++; + wg->total++; + mutex_exit(&wg->mtx); + mutex_exit(&job->mtx); + continue; + } + + aiowaitgroup_join(wg, &job->lk); + joined++; + mutex_exit(&job->mtx); + } + + if (!joined) { + goto done; + } + + mutex_enter(&wg->mtx); + const size_t target = (flags & AIOSP_SUSPEND_ANY) ? 1 : wg->total; + while (wg->completed < target) { + error = aiowaitgroup_wait(wg, timo); + if (error) { + break; + } + } + mutex_exit(&wg->mtx); +done: + mutex_enter(&wg->mtx); + wg->active = false; + if (--wg->refcnt == 0) { + mutex_exit(&wg->mtx); + aiowaitgroup_fini(wg); + } else { + mutex_exit(&wg->mtx); + } + return error; +} + +int +aio_suspend1(struct lwp *l, struct aiocb **aiocbp_list, int nent, + struct timespec *ts) +{ + struct proc *p = l->l_proc; struct aioproc *aio = p->p_aio; - struct aio_job *a_job; - struct lio_req *lio; - sigset_t oss, nss; - int error __diagused, refcnt; + struct aiosp *aiosp = &aio->aiosp; - /* - * Make an empty signal mask, so it - * handles only SIGKILL and SIGSTOP. - */ - sigfillset(&nss); - mutex_enter(p->p_lock); - error = sigprocmask1(curlwp, SIG_SETMASK, &nss, &oss); - mutex_exit(p->p_lock); - KASSERT(error == 0); + return aiosp_suspend(aiosp, aiocbp_list, nent, ts, AIOSP_SUSPEND_ANY); +} + +/* + * Initializes a servicing pool. + */ +int +aiosp_initialize(struct aiosp *sp) +{ + mutex_init(&sp->mtx, MUTEX_DEFAULT, IPL_NONE); + TAILQ_INIT(&sp->freelist); + TAILQ_INIT(&sp->active); + TAILQ_INIT(&sp->jobs); + sp->fg_root = kmem_zalloc(sizeof(*sp->fg_root), KM_SLEEP); + RB_INIT(sp->fg_root); + + return 0; +} + +/* + * Extract an available worker thread from pool or create new one + */ +static int +aiosp_worker_extract(struct aiosp *sp, struct aiost **aiost) +{ + int error; + + if (sp->nthreads_free == 0) { + error = aiost_create(sp, aiost); + if (error) { + return error; + } + } else { + *aiost = TAILQ_LAST(&sp->freelist, aiost_list); + } + + TAILQ_REMOVE(&sp->freelist, *aiost, list); + sp->nthreads_free--; + TAILQ_INSERT_TAIL(&sp->active, *aiost, list); + sp->nthreads_active++; + + return 0; +} + +/* + * Each process keeps track of all the service threads instantiated to service + * an asynchronous operation by the process. When a process is terminated we + * must also terminate all of its active and pending asynchronous operation. + */ +int +aiosp_destroy(struct aiosp *sp, int *cn) +{ + struct aiost *st; + int error, cnt = 0; for (;;) { /* - * Loop for each job in the queue. If there - * are no jobs then sleep. + * peek one worker under sp->mtx */ - mutex_enter(&aio->aio_mtx); - while ((a_job = TAILQ_FIRST(&aio->jobs_queue)) == NULL) { - if (cv_wait_sig(&aio->aio_worker_cv, &aio->aio_mtx)) { - /* - * Thread was interrupted - check for - * pending exit or suspend. - */ - mutex_exit(&aio->aio_mtx); - lwp_userret(curlwp); - mutex_enter(&aio->aio_mtx); - } + mutex_enter(&sp->mtx); + st = TAILQ_FIRST(&sp->freelist); + if (st == NULL) { + st = TAILQ_FIRST(&sp->active); } + mutex_exit(&sp->mtx); - /* Take the job from the queue */ - aio->curjob = a_job; - TAILQ_REMOVE(&aio->jobs_queue, a_job, list); + if (st == NULL) + break; - atomic_dec_uint(&aio_jobs_count); - aio->jobs_count--; + error = aiost_terminate(st); + if (error) { + return error; + } + st->lwp = NULL; - mutex_exit(&aio->aio_mtx); + kmem_free(st, sizeof(*st)); + cnt++; + } - /* Process an AIO operation */ - aio_process(a_job); + if (cn) { + *cn = cnt; + } - /* Copy data structure back to the user-space */ - (void)copyout(&a_job->aiocbp, a_job->aiocb_uptr, - sizeof(struct aiocb)); + mutex_destroy(&sp->mtx); + return 0; +} - mutex_enter(&aio->aio_mtx); - KASSERT(aio->curjob == a_job); - aio->curjob = NULL; +/* + * Enqueue a job for processing by the process's servicing pool + */ +int +aiosp_enqueue_job(struct aiosp *aiosp, struct aio_job *job) +{ + mutex_enter(&aiosp->mtx); - /* Decrease a reference counter, if there is a LIO structure */ - lio = a_job->lio; - refcnt = (lio != NULL ? --lio->refcnt : -1); + TAILQ_INSERT_TAIL(&aiosp->jobs, job, list); + aiosp->jobs_pending++; + job->on_queue = true; - /* Notify all suspenders */ - cv_broadcast(&aio->done_cv); - mutex_exit(&aio->aio_mtx); + mutex_exit(&aiosp->mtx); + + return 0; +} + +/* + * Create and initialise a new servicing thread and append it to the freelist. + */ +static int +aiost_create(struct aiosp *sp, struct aiost **ret) +{ + struct proc *p = curlwp->l_proc; + struct aiost *st; + + st = kmem_zalloc(sizeof(*st), KM_SLEEP); + + mutex_init(&st->mtx, MUTEX_DEFAULT, IPL_NONE); + cv_init(&st->service_cv, "aioservice"); - /* Send a signal, if any */ - aio_sendsig(p, &a_job->aiocbp.aio_sigevent); + st->job = NULL; + st->state = AIOST_STATE_NONE; + st->aiosp = sp; + st->freelist = true; - /* Destroy the LIO structure */ - if (refcnt == 0) { - aio_sendsig(p, &lio->sig); - pool_put(&aio_lio_pool, lio); + TAILQ_INSERT_TAIL(&sp->freelist, st, list); + sp->nthreads_free++; + sp->nthreads_total++; + + int error = kthread_create(PRI_USER, KTHREAD_MUSTJOIN, NULL, aiost_entry, + st, &st->lwp, "aio_%d_%ld", p->p_pid, sp->nthreads_total); + if (error) { + return error; + } + + if (ret) { + *ret = st; + } + + return 0; +} + +/* + * Process single job without coalescing. + */ +static void +aiost_process_singleton(struct aio_job *job) +{ + if ((job->aio_op & AIO_READ) == AIO_READ) { + io_read(job); + } else if ((job->aio_op & AIO_WRITE) == AIO_WRITE) { + io_write(job); + } else if ((job->aio_op & AIO_SYNC) == AIO_SYNC) { + io_sync(job); + } else { + panic("%s: invalid operation code {%x}n", __func__, job->aio_op); + } + + aio_job_mark_complete(job); +} + +/* + * Process all jobs in a file group. + */ +static void +aiost_process_fg(struct aiosp *sp, struct aiost_file_group *fg) +{ + for (struct aio_job *job;;) { + mutex_enter(&fg->mtx); + job = TAILQ_FIRST(&fg->queue); + if (job) { + TAILQ_REMOVE(&fg->queue, job, list); + fg->queue_size--; + } + mutex_exit(&fg->mtx); + if (job == NULL) { + break; } - /* Destroy the job */ - pool_put(&aio_job_pool, a_job); + aiost_process_singleton(job); } +} + +/* + * Service thread entry point. Processes assigned jobs until termination. + * Handles both singleton jobs and file-grouped job batches. + */ +static void +aiost_entry(void *arg) +{ + struct aiost *st = arg; + struct aiosp *sp = st->aiosp; + int error; - /* NOTREACHED */ + /* + * We want to handle abrupt process terminations effectively. We use + * st->exit to indicate that the thread must exit. When a thread is + * terminated aiost_terminate(st) unblocks those sleeping on + * st->service_cv + */ + mutex_enter(&st->mtx); + for(;;) { + for (; st->state == AIOST_STATE_NONE;) { + error = cv_wait_sig(&st->service_cv, &st->mtx); + if (error) { + /* + * Thread was interrupt. Check for pending exit + * or suspension + */ + mutex_exit(&st->mtx); + lwp_userret(curlwp); + mutex_enter(&st->mtx); + } + } + + if (st->state == AIOST_STATE_TERMINATE) { + break; + } + + if (st->state != AIOST_STATE_OPERATION) { + panic("aio_process: invalid aiost state {%x}\n", + st->state); + } + + if (st->fg) { + struct aiost_file_group *fg = st->fg; + st->fg = NULL; + + mutex_exit(&st->mtx); + aiost_process_fg(sp, fg); + mutex_enter(&st->mtx); + + aiosp_fg_teardown(sp, fg); + } else if (st->job) { + struct aio_job *job = st->job; + + mutex_exit(&st->mtx); + aiost_process_singleton(job); + mutex_enter(&st->mtx); + } else { + KASSERT(0); + } + + /* + * check whether or not a termination was queued while handling + * a job + */ + if (st->state == AIOST_STATE_TERMINATE) { + break; + } + + st->state = AIOST_STATE_NONE; + st->job = NULL; + st->fg = NULL; + + /* + * Remove st from list of active service threads, append to + * freelist, dance around locks, then iterate loop and block on + * st->service_cv + */ + mutex_exit(&st->mtx); + mutex_enter(&sp->mtx); + mutex_enter(&st->mtx); + + st->freelist = true; + + TAILQ_REMOVE(&sp->active, st, list); + sp->nthreads_active--; + + TAILQ_INSERT_TAIL(&sp->freelist, st, list); + sp->nthreads_free++; + + mutex_exit(&sp->mtx); + } + + if (st->job) { + aio_job_release(st->job); + } else if (st->fg) { + struct aiost_file_group *fg = st->fg; + st->fg = NULL; + + for (struct aio_job *job;;) { + mutex_enter(&fg->mtx); + job = TAILQ_FIRST(&fg->queue); + if (job) { + TAILQ_REMOVE(&fg->queue, job, list); + fg->queue_size--; + } + mutex_exit(&fg->mtx); + if (job == NULL) { + break; + } + + aio_job_release(job); + } + + aiosp_fg_teardown(sp, fg); + } + + + mutex_exit(&st->mtx); + mutex_enter(&sp->mtx); + + if (st->freelist) { + TAILQ_REMOVE(&sp->freelist, st, list); + sp->nthreads_free--; + } else { + TAILQ_REMOVE(&sp->active, st, list); + sp->nthreads_active--; + } + sp->nthreads_total--; + + mutex_exit(&sp->mtx); + kthread_exit(0); +} + +/* + * send AIO signal. + */ +static void +aiost_sigsend(struct proc *p, struct sigevent *sig) +{ + ksiginfo_t ksi; + + if (sig->sigev_signo == 0 || sig->sigev_notify == SIGEV_NONE) + return; + + KSI_INIT(&ksi); + ksi.ksi_signo = sig->sigev_signo; + ksi.ksi_code = SI_ASYNCIO; + ksi.ksi_value = sig->sigev_value; + + mutex_enter(&proc_lock); + kpsignal(p, &ksi, NULL); + mutex_exit(&proc_lock); +} + +/* + * Process write operation for non-blocking jobs. + */ +static int +io_write(struct aio_job *job) +{ + return io_write_fallback(job); +} + +/* + * Process read operation for non-blocking jobs. + */ +static int +io_read(struct aio_job *job) +{ + return io_read_fallback(job); +} + +/* + * Initialize UIO structure for I/O operation. + */ +static int +uio_construct(struct aio_job *job, struct file **fp, struct iovec *aiov, + struct uio *auio) +{ + struct aiocb *aiocbp = &job->aiocbp; + + if (aiocbp->aio_nbytes > SSIZE_MAX) + return SET_ERROR(EINVAL); + + *fp = job->fp; + if (*fp == NULL) { + return SET_ERROR(EBADF); + } + + aiov->iov_base = aiocbp->aio_buf; + aiov->iov_len = aiocbp->aio_nbytes; + + auio->uio_iov = aiov; + auio->uio_iovcnt = 1; + auio->uio_resid = aiocbp->aio_nbytes; + auio->uio_offset = aiocbp->aio_offset; + auio->uio_vmspace = job->p->p_vmspace; + + return 0; +} + +/* + * Perform synchronous write via file operations. + */ +static int +io_write_fallback(struct aio_job *job) +{ + struct file *fp = NULL; + struct iovec aiov; + struct uio auio; + struct aiocb *aiocbp = &job->aiocbp; + int error; + + error = uio_construct(job, &fp, &aiov, &auio); + if (error) { + goto done; + } + + /* Write using pinned file */ + if ((fp->f_flag & FWRITE) == 0) { + error = SET_ERROR(EBADF); + goto done; + } + + auio.uio_rw = UIO_WRITE; + error = (*fp->f_ops->fo_write)(fp, &aiocbp->aio_offset, + &auio, fp->f_cred, FOF_UPDATE_OFFSET); + + /* result */ + job->aiocbp.aio_nbytes -= auio.uio_resid; + job->aiocbp._retval = (error == 0) ? job->aiocbp.aio_nbytes : -1; +done: + job->aiocbp._errno = error; + job->aiocbp._state = JOB_DONE; + return 0; +} + +/* + * Perform synchronous read via file operations. + */ +static int +io_read_fallback(struct aio_job *job) +{ + struct file *fp = NULL; + struct iovec aiov; + struct uio auio; + struct aiocb *aiocbp = &job->aiocbp; + int error; + + error = uio_construct(job, &fp, &aiov, &auio); + if (error) + goto done; + + /* Read using pinned file */ + if ((fp->f_flag & FREAD) == 0) { + error = SET_ERROR(EBADF); + goto done; + } + + auio.uio_rw = UIO_READ; + error = (*fp->f_ops->fo_read)(fp, &aiocbp->aio_offset, + &auio, fp->f_cred, FOF_UPDATE_OFFSET); + + job->aiocbp.aio_nbytes -= auio.uio_resid; + job->aiocbp._retval = (error == 0) ? job->aiocbp.aio_nbytes : -1; +done: + job->aiocbp._errno = error; + job->aiocbp._state = JOB_DONE; + return 0; +} + +/* + * Perform sync via file operations + */ +static int +io_sync(struct aio_job *job) +{ + struct file *fp = job->fp; + int error = 0; + + if (fp == NULL) { + error = SET_ERROR(EBADF); + goto done; + } + + if ((fp->f_flag & FWRITE) == 0) { + error = SET_ERROR(EBADF); + goto done; + } + + struct vnode *vp = fp->f_vnode; + vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); + if (vp->v_type == VREG) { + if (job->aio_op & AIO_DSYNC) { + error = VOP_FSYNC(vp, fp->f_cred, + FSYNC_WAIT | FSYNC_DATAONLY, 0, 0); + } else { + error = VOP_FSYNC(vp, fp->f_cred, FSYNC_WAIT, 0, 0); + } + } + VOP_UNLOCK(vp); + + job->aiocbp._retval = (error == 0) ? 0 : -1; +done: + job->aiocbp._errno = error; + job->aiocbp._state = JOB_DONE; + + copyout(&job->aiocbp, job->aiocb_uptr, sizeof(job->aiocbp)); + + return 0; +} + +/* + * Destroy a servicing thread. Set st->exit high such that when we unblock the + * thread blocking on st->service_cv it will invoke an exit routine within + * aiost_entry. + */ +static int +aiost_terminate(struct aiost *st) +{ + int error = 0; + + mutex_enter(&st->mtx); + + st->state = AIOST_STATE_TERMINATE; + + mutex_exit(&st->mtx); + + cv_signal(&st->service_cv); + kthread_join(st->lwp); + + cv_destroy(&st->service_cv); + mutex_destroy(&st->mtx); + + return error; +} + +/* + * Ensure that the same job can not be enqueued twice. + */ +int +aiosp_validate_conflicts(struct aiosp *aiosp, const void *uptr) +{ + struct aiost *st; + struct aio_job *job; + + mutex_enter(&aiosp->mtx); + + /* check active threads */ + TAILQ_FOREACH(st, &aiosp->active, list) { + job = st->job; + if (job && st->job->aiocb_uptr == uptr) { + mutex_exit(&aiosp->mtx); + return EINVAL; + } else if (st->fg) { + mutex_enter(&st->fg->mtx); + TAILQ_FOREACH(job, &st->fg->queue, list) { + if (job->aiocb_uptr == uptr) { + mutex_exit(&st->fg->mtx); + mutex_exit(&aiosp->mtx); + return EINVAL; + } + } + mutex_exit(&st->fg->mtx); + } + } + + /* no need to check freelist threads as they have no jobs */ + + mutex_exit(&aiosp->mtx); + return 0; +} + +/* + * Get error status of async I/O operation + */ +int +aiosp_error(struct aiosp *aiosp, const void *uptr, register_t *retval) +{ + struct aio_job *job; + int error = 0; + + error = aiocbp_lookup_job(aiosp, uptr, &job); + if (error || job == NULL) { + return error; + } + + if (job->aiocbp._state == JOB_NONE) { + mutex_exit(&job->mtx); + return SET_ERROR(EINVAL); + } + + *retval = job->aiocbp._errno; + mutex_exit(&job->mtx); + + return error; +} + +/* + * Get return value of completed async I/O operation + */ +int +aiosp_return(struct aiosp *aiosp, const void *uptr, register_t *retval) +{ + struct aiocbp *handle = NULL; + struct aio_job *job = NULL; + int error; + + error = aiocbp_remove_job(aiosp, uptr, &job, &handle); + if (error) { + return error; + } + + if (job == NULL) { + if (handle) { + kmem_free(handle, sizeof(*handle)); + } + return SET_ERROR(ENOENT); + } + + if (job->aiocbp._state != JOB_DONE) { + mutex_exit(&job->mtx); + if (handle) { + kmem_free(handle, sizeof(*handle)); + } + return SET_ERROR(EINVAL); + } + + *retval = job->aiocbp._retval; + + if (job->fp) { + aio_file_release(job->fp); + job->fp = NULL; + } + + job->aiocbp._errno = 0; + job->aiocbp._retval = -1; + job->aiocbp._state = JOB_NONE; + + mutex_exit(&job->mtx); + if (handle) { + kmem_free(handle, sizeof(*handle)); + } + + aio_job_fini(job); + pool_put(&aio_job_pool, job); + atomic_dec_uint(&aio_jobs_count); + + return 0; +} + +/* + * Hash function for aiocb user pointers. + */ +static inline u_int +aiocbp_hash(const void *uptr) +{ + return hash32_buf(&uptr, sizeof(uptr), HASH32_BUF_INIT); +} + +/* + * Find aiocb entry by user pointer and locks. + */ +static int +aiocbp_lookup_job(struct aiosp *aiosp, const void *uptr, + struct aio_job **jobp) +{ + struct aiocbp *aiocbp; + struct aio_job *job = NULL; + u_int hash; + + *jobp = NULL; + hash = aiocbp_hash(uptr) & aiosp->aio_hash_mask; + + mutex_enter(&aiosp->aio_hash_mtx); + TAILQ_FOREACH(aiocbp, &aiosp->aio_hash[hash], list) { + if (aiocbp->uptr == uptr) { + job = aiocbp->job; + if (job) { + mutex_enter(&job->mtx); + } + + mutex_exit(&aiosp->aio_hash_mtx); + *jobp = job; + return 0; + } + } + mutex_exit(&aiosp->aio_hash_mtx); + + *jobp = NULL; + return SET_ERROR(ENOENT); +} + +/* + * Detach job and return job with job->mtx held + */ +static int +aiocbp_remove_job(struct aiosp *aiosp, const void *uptr, + struct aio_job **jobp, struct aiocbp **handlep) +{ + struct aiocbp *aiocbp; + struct aio_job *job = NULL; + u_int hash; + + *jobp = NULL; + if (handlep) { + *handlep = NULL; + } + hash = aiocbp_hash(uptr) & aiosp->aio_hash_mask; + + mutex_enter(&aiosp->aio_hash_mtx); + TAILQ_FOREACH(aiocbp, &aiosp->aio_hash[hash], list) { + if (aiocbp->uptr == uptr) { + job = aiocbp->job; + if (job) { + mutex_enter(&job->mtx); + } + + TAILQ_REMOVE(&aiosp->aio_hash[hash], aiocbp, list); + mutex_exit(&aiosp->aio_hash_mtx); + if (handlep) { + *handlep = aiocbp; + } + *jobp = job; + + return 0; + } + } + mutex_exit(&aiosp->aio_hash_mtx); + + return SET_ERROR(ENOENT); +} + +/* + * Insert aiocb entry into hash table. + */ +int +aiocbp_insert(struct aiosp *aiosp, struct aiocbp *aiocbp) +{ + struct aiocbp *found; + const void *uptr; + u_int hash; + + uptr = aiocbp->uptr; + hash = aiocbp_hash(uptr) & aiosp->aio_hash_mask; + + mutex_enter(&aiosp->aio_hash_mtx); + TAILQ_FOREACH(found, &aiosp->aio_hash[hash], list) { + if (found->uptr == uptr) { + found->job = aiocbp->job; + mutex_exit(&aiosp->aio_hash_mtx); + return EEXIST; + } + } + + TAILQ_INSERT_HEAD(&aiosp->aio_hash[hash], aiocbp, list); + mutex_exit(&aiosp->aio_hash_mtx); + + return 0; +} + +/* + * Initialize aiocb hash table. + */ +int +aiocbp_init(struct aiosp *aiosp, u_int hashsize) +{ + if (!powerof2(hashsize)) { + return EINVAL; + } + + aiosp->aio_hash = kmem_zalloc(hashsize * sizeof(*aiosp->aio_hash), + KM_SLEEP); + + aiosp->aio_hash_mask = hashsize - 1; + aiosp->aio_hash_size = hashsize; + + mutex_init(&aiosp->aio_hash_mtx, MUTEX_DEFAULT, IPL_NONE); + + for (size_t i = 0; i < hashsize; i++) { + TAILQ_INIT(&aiosp->aio_hash[i]); + } + + return 0; +} + +/* + * Destroy aiocb hash table and free entries. + */ +void +aiocbp_destroy(struct aiosp *aiosp) +{ + if (aiosp->aio_hash == NULL) { + return; + } + + struct aiocbp *aiocbp; + + mutex_enter(&aiosp->aio_hash_mtx); + for (size_t i = 0; i < aiosp->aio_hash_size; i++) { + struct aiocbp *tmp; + TAILQ_FOREACH_SAFE(aiocbp, &aiosp->aio_hash[i], list, tmp) { + TAILQ_REMOVE(&aiosp->aio_hash[i], aiocbp, list); + kmem_free(aiocbp, sizeof(*aiocbp)); + } + } + mutex_exit(&aiosp->aio_hash_mtx); + + kmem_free(aiosp->aio_hash, + aiosp->aio_hash_size * sizeof(*aiosp->aio_hash)); + aiosp->aio_hash = NULL; + aiosp->aio_hash_mask = 0; + aiosp->aio_hash_size = 0; + mutex_destroy(&aiosp->aio_hash_mtx); +} + +/* + * Initialize wait group for suspend operations. + */ +void +aiowaitgroup_init(struct aiowaitgroup *wg) +{ + wg->completed = 0; + wg->total = 0; + wg->refcnt = 1; + wg->active = true; + cv_init(&wg->done_cv, "aiodone"); + mutex_init(&wg->mtx, MUTEX_DEFAULT, IPL_NONE); +} + +/* + * Clean up wait group resources. + */ +void +aiowaitgroup_fini(struct aiowaitgroup *wg) +{ + cv_destroy(&wg->done_cv); + mutex_destroy(&wg->mtx); + kmem_free(wg, sizeof(*wg)); +} + +/* + * Block until wait group signals completion. + */ +int +aiowaitgroup_wait(struct aiowaitgroup *wg, int timo) +{ + int error; + + error = cv_timedwait_sig(&wg->done_cv, &wg->mtx, timo); + if (error) { + if (error == EWOULDBLOCK) { + error = SET_ERROR(EAGAIN); + } + return error; + } + + return 0; +} + +/* + * Initialize wait group link for job tracking. + */ +void +aiowaitgrouplk_init(struct aiowaitgrouplk *lk) +{ + mutex_init(&lk->mtx, MUTEX_DEFAULT, IPL_NONE); + lk->n = 0; + lk->s = 2; + lk->wgs = kmem_alloc(sizeof(*lk->wgs) * lk->s, KM_SLEEP); } -static void -aio_process(struct aio_job *a_job) +/* + * Clean up wait group link resources. + * Caller must hold job->mtx + */ +void +aiowaitgrouplk_fini(struct aiowaitgrouplk *lk) { - struct proc *p = curlwp->l_proc; - struct aiocb *aiocbp = &a_job->aiocbp; - struct file *fp; - int fd = aiocbp->aio_fildes; - int error = 0; + mutex_enter(&lk->mtx); - KASSERT(a_job->aio_op != 0); - - if ((a_job->aio_op & (AIO_READ | AIO_WRITE)) != 0) { - struct iovec aiov; - struct uio auio; - - if (aiocbp->aio_nbytes > SSIZE_MAX) { - error = SET_ERROR(EINVAL); - goto done; + for (size_t i = 0; i < lk->n; i++) { + struct aiowaitgroup *wg = lk->wgs[i]; + if (!wg) { + continue; } - fp = fd_getfile(fd); - if (fp == NULL) { - error = SET_ERROR(EBADF); - goto done; - } + lk->wgs[i] = NULL; - aiov.iov_base = (void *)(uintptr_t)aiocbp->aio_buf; - aiov.iov_len = aiocbp->aio_nbytes; - auio.uio_iov = &aiov; - auio.uio_iovcnt = 1; - auio.uio_resid = aiocbp->aio_nbytes; - auio.uio_vmspace = p->p_vmspace; - - if (a_job->aio_op & AIO_READ) { - /* - * Perform a Read operation - */ - KASSERT((a_job->aio_op & AIO_WRITE) == 0); - - if ((fp->f_flag & FREAD) == 0) { - fd_putfile(fd); - error = SET_ERROR(EBADF); - goto done; - } - auio.uio_rw = UIO_READ; - error = (*fp->f_ops->fo_read)(fp, &aiocbp->aio_offset, - &auio, fp->f_cred, FOF_UPDATE_OFFSET); + mutex_enter(&wg->mtx); + if (--wg->refcnt == 0) { + mutex_exit(&wg->mtx); + aiowaitgroup_fini(wg); } else { - /* - * Perform a Write operation - */ - KASSERT(a_job->aio_op & AIO_WRITE); - - if ((fp->f_flag & FWRITE) == 0) { - fd_putfile(fd); - error = SET_ERROR(EBADF); - goto done; - } - auio.uio_rw = UIO_WRITE; - error = (*fp->f_ops->fo_write)(fp, &aiocbp->aio_offset, - &auio, fp->f_cred, FOF_UPDATE_OFFSET); + mutex_exit(&wg->mtx); } - fd_putfile(fd); + } - /* Store the result value */ - a_job->aiocbp.aio_nbytes -= auio.uio_resid; - a_job->aiocbp._retval = (error == 0) ? - a_job->aiocbp.aio_nbytes : -1; + if (lk->wgs) { + kmem_free(lk->wgs, lk->s * sizeof(*lk->wgs)); + } + lk->wgs = NULL; + lk->n = 0; + lk->s = 0; - } else if ((a_job->aio_op & (AIO_SYNC | AIO_DSYNC)) != 0) { - /* - * Perform a file Sync operation - */ - struct vnode *vp; + mutex_exit(&lk->mtx); + mutex_destroy(&lk->mtx); +} - if ((error = fd_getvnode(fd, &fp)) != 0) - goto done; +/* + * Notify all wait groups of job completion. + */ +void +aiowaitgrouplk_flush(struct aiowaitgrouplk *lk) +{ + mutex_enter(&lk->mtx); + for (int i = 0; i < lk->n; i++) { + struct aiowaitgroup *wg = lk->wgs[i]; + if (wg == NULL) { + continue; + } - if ((fp->f_flag & FWRITE) == 0) { - fd_putfile(fd); - error = SET_ERROR(EBADF); - goto done; + mutex_enter(&wg->mtx); + + if (wg->active) { + wg->completed++; + cv_signal(&wg->done_cv); } - vp = fp->f_vnode; - vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); - if (a_job->aio_op & AIO_DSYNC) { - error = VOP_FSYNC(vp, fp->f_cred, - FSYNC_WAIT | FSYNC_DATAONLY, 0, 0); - } else if (a_job->aio_op & AIO_SYNC) { - error = VOP_FSYNC(vp, fp->f_cred, - FSYNC_WAIT, 0, 0); + if (--wg->refcnt == 0) { + mutex_exit(&wg->mtx); + aiowaitgroup_fini(wg); + } else { + mutex_exit(&wg->mtx); } - VOP_UNLOCK(vp); - fd_putfile(fd); + } - /* Store the result value */ - a_job->aiocbp._retval = (error == 0) ? 0 : -1; + if (lk->n) { + kmem_free(lk->wgs, sizeof(*lk->wgs) * lk->s); - } else - panic("aio_process: invalid operation code\n"); + lk->n = 0; + lk->s = 2; + lk->wgs = kmem_alloc(sizeof(*lk->wgs) * lk->s, KM_SLEEP); + } -done: - /* Job is done, set the error, if any */ - a_job->aiocbp._errno = error; - a_job->aiocbp._state = JOB_DONE; + mutex_exit(&lk->mtx); } /* - * Send AIO signal. + * Attach wait group to jobs notification list. */ -static void -aio_sendsig(struct proc *p, struct sigevent *sig) +void +aiowaitgroup_join(struct aiowaitgroup *wg, struct aiowaitgrouplk *lk) { - ksiginfo_t ksi; + mutex_enter(&lk->mtx); + if (lk->n == lk->s) { + size_t new_size = lk->s * lk->s; - if (sig->sigev_signo == 0 || sig->sigev_notify == SIGEV_NONE) - return; + void **new_wgs = kmem_zalloc(new_size * + sizeof(*new_wgs), KM_SLEEP); - KSI_INIT(&ksi); - ksi.ksi_signo = sig->sigev_signo; - ksi.ksi_code = SI_ASYNCIO; - ksi.ksi_value = sig->sigev_value; - mutex_enter(&proc_lock); - kpsignal(p, &ksi, NULL); - mutex_exit(&proc_lock); + memcpy(new_wgs, lk->wgs, lk->n * sizeof(*lk->wgs)); + kmem_free(lk->wgs, lk->s * sizeof(*lk->wgs)); + + lk->s = new_size; + lk->wgs = new_wgs; + } + lk->wgs[lk->n] = wg; + lk->n++; + wg->total++; + wg->refcnt++; + mutex_exit(&lk->mtx); } /* @@ -482,154 +1567,189 @@ aio_enqueue_job(int op, void *aiocb_uptr, struct lio_req *lio) struct proc *p = curlwp->l_proc; struct aioproc *aio; struct aio_job *a_job; - struct aiocb aiocbp; + struct aiocb aiocb; struct sigevent *sig; int error; - /* Non-accurate check for the limit */ - if (aio_jobs_count + 1 > aio_max) - return SET_ERROR(EAGAIN); - /* Get the data structure from user-space */ - error = copyin(aiocb_uptr, &aiocbp, sizeof(struct aiocb)); - if (error) + error = copyin(aiocb_uptr, &aiocb, sizeof(aiocb)); + if (error) { return error; + } /* Check if signal is set, and validate it */ - sig = &aiocbp.aio_sigevent; + sig = &aiocb.aio_sigevent; if (sig->sigev_signo < 0 || sig->sigev_signo >= NSIG || - sig->sigev_notify < SIGEV_NONE || sig->sigev_notify > SIGEV_SA) + sig->sigev_notify < SIGEV_NONE || sig->sigev_notify > SIGEV_SA) { return SET_ERROR(EINVAL); + } /* Buffer and byte count */ if (((AIO_SYNC | AIO_DSYNC) & op) == 0) - if (aiocbp.aio_buf == NULL || aiocbp.aio_nbytes > SSIZE_MAX) + if (aiocb.aio_buf == NULL || aiocb.aio_nbytes > SSIZE_MAX) return SET_ERROR(EINVAL); /* Check the opcode, if LIO_NOP - simply ignore */ if (op == AIO_LIO) { KASSERT(lio != NULL); - if (aiocbp.aio_lio_opcode == LIO_WRITE) + if (aiocb.aio_lio_opcode == LIO_WRITE) { op = AIO_WRITE; - else if (aiocbp.aio_lio_opcode == LIO_READ) + } else if (aiocb.aio_lio_opcode == LIO_READ) { op = AIO_READ; - else - return (aiocbp.aio_lio_opcode == LIO_NOP) ? 0 : - SET_ERROR(EINVAL); + } else { + if (aiocb.aio_lio_opcode == LIO_NOP) { + return 0; + } else { + return SET_ERROR(EINVAL); + } + } } else { KASSERT(lio == NULL); } /* - * Look for already existing job. If found - the job is in-progress. + * Look for already existing job. If found the job is in-progress. * According to POSIX this is invalid, so return the error. */ aio = p->p_aio; if (aio) { - mutex_enter(&aio->aio_mtx); - TAILQ_FOREACH(a_job, &aio->jobs_queue, list) { - if (a_job->aiocb_uptr != aiocb_uptr) - continue; - mutex_exit(&aio->aio_mtx); - return SET_ERROR(EINVAL); + error = aiosp_validate_conflicts(&aio->aiosp, aiocb_uptr); + if (error) { + return SET_ERROR(error); } - mutex_exit(&aio->aio_mtx); } /* - * Check if AIO structure is initialized, if not - initialize it. - * In LIO case, we did that already. We will recheck this with - * the lock in aio_procinit(). + * Check if AIO structure is initialized, if not initialize it */ - if (lio == NULL && p->p_aio == NULL) - if (aio_procinit(p)) + if (p->p_aio == NULL) { + if (aio_procinit(p)) { return SET_ERROR(EAGAIN); + } + } aio = p->p_aio; /* * Set the state with errno, and copy data * structure back to the user-space. */ - aiocbp._state = JOB_WIP; - aiocbp._errno = SET_ERROR(EINPROGRESS); - aiocbp._retval = -1; - error = copyout(&aiocbp, aiocb_uptr, sizeof(struct aiocb)); - if (error) + aiocb._state = JOB_WIP; + aiocb._errno = SET_ERROR(EINPROGRESS); + aiocb._retval = -1; + error = copyout(&aiocb, aiocb_uptr, sizeof(aiocb)); + if (error) { return error; + } /* Allocate and initialize a new AIO job */ a_job = pool_get(&aio_job_pool, PR_WAITOK | PR_ZERO); - /* - * Set the data. - * Store the user-space pointer for searching. Since we - * are storing only per proc pointers - it is safe. - */ - memcpy(&a_job->aiocbp, &aiocbp, sizeof(struct aiocb)); + memcpy(&a_job->aiocbp, &aiocb, sizeof(aiocb)); a_job->aiocb_uptr = aiocb_uptr; a_job->aio_op |= op; a_job->lio = lio; + mutex_init(&a_job->mtx, MUTEX_DEFAULT, IPL_NONE); + aiowaitgrouplk_init(&a_job->lk); + a_job->p = p; + a_job->on_queue = false; + a_job->completed = false; + a_job->fp = NULL; + + const int fd = aiocb.aio_fildes; + struct file *fp = fd_getfile2(p, fd); + if (fp == NULL) { + aio_job_fini(a_job); + pool_put(&aio_job_pool, a_job); + return SET_ERROR(EBADF); + } + + aio_file_hold(fp); + a_job->fp = fp; + + struct aiocbp *aiocbp = kmem_zalloc(sizeof(*aiocbp), KM_SLEEP); + aiocbp->job = a_job; + aiocbp->uptr = aiocb_uptr; + error = aiocbp_insert(&aio->aiosp, aiocbp); + if (error) { + aio_file_release(a_job->fp); + a_job->fp = NULL; + kmem_free(aiocbp, sizeof(*aiocbp)); + aio_job_fini(a_job); + pool_put(&aio_job_pool, a_job); + return SET_ERROR(error); + } /* * Add the job to the queue, update the counters, and * notify the AIO worker thread to handle the job. */ mutex_enter(&aio->aio_mtx); - - /* Fail, if the limit was reached */ if (atomic_inc_uint_nv(&aio_jobs_count) > aio_max || - aio->jobs_count >= aio_listio_max) { - atomic_dec_uint(&aio_jobs_count); + aio->jobs_count >= aio_listio_max) { mutex_exit(&aio->aio_mtx); - pool_put(&aio_job_pool, a_job); - return SET_ERROR(EAGAIN); + error = SET_ERROR(EAGAIN); + goto error; + } + + mutex_exit(&aio->aio_mtx); + + error = aiosp_enqueue_job(&aio->aiosp, a_job); + if (error) { + error = SET_ERROR(EAGAIN); + goto error; } - TAILQ_INSERT_TAIL(&aio->jobs_queue, a_job, list); + mutex_enter(&aio->aio_mtx); aio->jobs_count++; - if (lio) + if (lio) { lio->refcnt++; - cv_signal(&aio->aio_worker_cv); - + } mutex_exit(&aio->aio_mtx); - /* - * One would handle the errors only with aio_error() function. - * This way is appropriate according to POSIX. - */ return 0; +error: + aiocbp_remove_job(&aio->aiosp, aiocb_uptr, &a_job, NULL); + kmem_free(aiocbp, sizeof(*aiocbp)); + + aio_file_release(a_job->fp); + a_job->fp = NULL; + + aio_job_fini(a_job); + atomic_dec_uint(&aio_jobs_count); + pool_put(&aio_job_pool, a_job); + + return SET_ERROR(error); } /* * Syscall functions. */ - int sys_aio_cancel(struct lwp *l, const struct sys_aio_cancel_args *uap, - register_t *retval) + register_t *retval) { - /* { - syscallarg(int) fildes; - syscallarg(struct aiocb *) aiocbp; - } */ struct proc *p = l->l_proc; struct aioproc *aio; - struct aio_job *a_job; - struct aiocb *aiocbp_ptr; - struct lio_req *lio; + struct aiocb *aiocbp_uptr; struct filedesc *fdp = p->p_fd; - unsigned int cn, errcnt, fildes; + struct aiosp *aiosp; + struct aio_job *job; + struct file *fp; + struct aiost_file_group find = { 0 }, *fg; + unsigned int fildes, canceled = 0; + bool have_active = false; fdtab_t *dt; + int error = 0; - TAILQ_HEAD(, aio_job) tmp_jobs_list; - - /* Check for invalid file descriptor */ fildes = (unsigned int)SCARG(uap, fildes); dt = atomic_load_consume(&fdp->fd_dt); - if (fildes >= dt->dt_nfiles) + if (fildes >= dt->dt_nfiles) { return SET_ERROR(EBADF); - if (dt->dt_ff[fildes] == NULL || dt->dt_ff[fildes]->ff_file == NULL) + } + if (dt->dt_ff[fildes] == NULL || dt->dt_ff[fildes]->ff_file == NULL) { return SET_ERROR(EBADF); + } + fp = dt->dt_ff[fildes]->ff_file; /* Check if AIO structure is initialized */ if (p->p_aio == NULL) { @@ -638,316 +1758,224 @@ sys_aio_cancel(struct lwp *l, const struct sys_aio_cancel_args *uap, } aio = p->p_aio; - aiocbp_ptr = (struct aiocb *)SCARG(uap, aiocbp); + aiocbp_uptr = (struct aiocb *)SCARG(uap, aiocbp); + aiosp = &aio->aiosp; mutex_enter(&aio->aio_mtx); + mutex_enter(&aiosp->mtx); - /* Cancel the jobs, and remove them from the queue */ - cn = 0; - TAILQ_INIT(&tmp_jobs_list); - TAILQ_FOREACH(a_job, &aio->jobs_queue, list) { - if (aiocbp_ptr) { - if (aiocbp_ptr != a_job->aiocb_uptr) - continue; - if (fildes != a_job->aiocbp.aio_fildes) { - mutex_exit(&aio->aio_mtx); - return SET_ERROR(EBADF); - } - } else if (a_job->aiocbp.aio_fildes != fildes) - continue; - - TAILQ_REMOVE(&aio->jobs_queue, a_job, list); - TAILQ_INSERT_TAIL(&tmp_jobs_list, a_job, list); - - /* Decrease the counters */ - atomic_dec_uint(&aio_jobs_count); - aio->jobs_count--; - lio = a_job->lio; - if (lio != NULL && --lio->refcnt != 0) - a_job->lio = NULL; - - cn++; - if (aiocbp_ptr) - break; + /* + * If there is a live file-group for this fp, then some requests + * are active and could not be canceled. + */ + find.fp = fp; + fg = RB_FIND(aiost_file_tree, aiosp->fg_root, &find); + if (fg) { + have_active = fg->queue_size ? true : false; } - /* There are canceled jobs */ - if (cn) - *retval = AIO_CANCELED; - - /* We cannot cancel current job */ - a_job = aio->curjob; - if (a_job && ((a_job->aiocbp.aio_fildes == fildes) || - (a_job->aiocb_uptr == aiocbp_ptr))) - *retval = AIO_NOTCANCELED; - - mutex_exit(&aio->aio_mtx); + /* + * if aiocbp_uptr != NULL, then just cancel the job associated with that + * uptr. + * if aiocbp_uptr == NULL, then cancel all jobs associated with fildes. + */ + if (aiocbp_uptr) { + error = aiocbp_lookup_job(aiosp, aiocbp_uptr, &job); + if (error || job == NULL) { + *retval = AIO_ALLDONE; + goto finish; + } - /* Free the jobs after the lock */ - errcnt = 0; - while (!TAILQ_EMPTY(&tmp_jobs_list)) { - a_job = TAILQ_FIRST(&tmp_jobs_list); - TAILQ_REMOVE(&tmp_jobs_list, a_job, list); - /* Set the errno and copy structures back to the user-space */ - a_job->aiocbp._errno = SET_ERROR(ECANCELED); - a_job->aiocbp._state = JOB_DONE; - if (copyout(&a_job->aiocbp, a_job->aiocb_uptr, - sizeof(struct aiocb))) - errcnt++; - /* Send a signal if any */ - aio_sendsig(p, &a_job->aiocbp.aio_sigevent); - if (a_job->lio) { - lio = a_job->lio; - aio_sendsig(p, &lio->sig); - pool_put(&aio_lio_pool, lio); + if (job->completed) { + *retval = AIO_ALLDONE; + } else { + *retval = AIO_NOTCANCELED; } - pool_put(&aio_job_pool, a_job); - } - if (errcnt) - return SET_ERROR(EFAULT); + /* + * If the job is on sp->job (signified by job->on_queue) + * that means that it has been distribtued yet. And if + * it is not on the queue that means it is currently + * beign processed. + */ + if (job->on_queue) { + aio_job_cancel(aiosp, job); + aio_job_mark_complete(job); + *retval = AIO_CANCELED; + } - /* Set a correct return value */ - if (*retval == 0) - *retval = AIO_ALLDONE; + mutex_exit(&job->mtx); + } else { + /* + * Cancel all queued jobs associated with this file descriptor + */ + struct aio_job *tmp; + TAILQ_FOREACH_SAFE(job, &aiosp->jobs, list, tmp) { + if (job->aiocbp.aio_fildes == (int)fildes) { + aio_job_cancel(aiosp, job); + aio_job_mark_complete(job); + canceled++; + } + } + if (canceled && !have_active) { + *retval = AIO_CANCELED; + } else if (!canceled) { + *retval = have_active ? AIO_NOTCANCELED : AIO_ALLDONE; + } else { + *retval = AIO_NOTCANCELED; + } + } +finish: + mutex_exit(&aiosp->mtx); + mutex_exit(&aio->aio_mtx); + return 0; } int sys_aio_error(struct lwp *l, const struct sys_aio_error_args *uap, - register_t *retval) + register_t *retval) { - /* { - syscallarg(const struct aiocb *) aiocbp; - } */ struct proc *p = l->l_proc; struct aioproc *aio = p->p_aio; - struct aiocb aiocbp; - int error; - if (aio == NULL) - return SET_ERROR(EINVAL); - - error = copyin(SCARG(uap, aiocbp), &aiocbp, sizeof(struct aiocb)); - if (error) - return error; - - if (aiocbp._state == JOB_NONE) + if (aio == NULL) { return SET_ERROR(EINVAL); + } - *retval = aiocbp._errno; - - return 0; + const void *uptr = SCARG(uap, aiocbp); + return aiosp_error(&aio->aiosp, uptr, retval); } int sys_aio_fsync(struct lwp *l, const struct sys_aio_fsync_args *uap, - register_t *retval) + register_t *retval) { - /* { - syscallarg(int) op; - syscallarg(struct aiocb *) aiocbp; - } */ int op = SCARG(uap, op); - if ((op != O_DSYNC) && (op != O_SYNC)) + if ((op != O_DSYNC) && (op != O_SYNC)) { return SET_ERROR(EINVAL); + } - op = O_DSYNC ? AIO_DSYNC : AIO_SYNC; + op = (op == O_DSYNC) ? AIO_DSYNC : AIO_SYNC; return aio_enqueue_job(op, SCARG(uap, aiocbp), NULL); } int sys_aio_read(struct lwp *l, const struct sys_aio_read_args *uap, - register_t *retval) + register_t *retval) { - /* { - syscallarg(struct aiocb *) aiocbp; - } */ + int error; + + error = aio_enqueue_job(AIO_READ, SCARG(uap, aiocbp), NULL); + if (error) { + return error; + } - return aio_enqueue_job(AIO_READ, SCARG(uap, aiocbp), NULL); + struct proc *p = l->l_proc; + struct aioproc *aio = p->p_aio; + KASSERT(aio); + return aiosp_distribute_jobs(&aio->aiosp); } int sys_aio_return(struct lwp *l, const struct sys_aio_return_args *uap, - register_t *retval) + register_t *retval) { - /* { - syscallarg(struct aiocb *) aiocbp; - } */ struct proc *p = l->l_proc; struct aioproc *aio = p->p_aio; - struct aiocb aiocbp; - int error; - - if (aio == NULL) - return SET_ERROR(EINVAL); - error = copyin(SCARG(uap, aiocbp), &aiocbp, sizeof(struct aiocb)); - if (error) - return error; - - if (aiocbp._errno == EINPROGRESS || aiocbp._state != JOB_DONE) + if (aio == NULL) { return SET_ERROR(EINVAL); + } - *retval = aiocbp._retval; - - /* Reset the internal variables */ - aiocbp._errno = 0; - aiocbp._retval = -1; - aiocbp._state = JOB_NONE; - error = copyout(&aiocbp, SCARG(uap, aiocbp), sizeof(struct aiocb)); - - return error; + const void *uptr = SCARG(uap, aiocbp); + return aiosp_return(&aio->aiosp, uptr, retval); } int sys___aio_suspend50(struct lwp *l, const struct sys___aio_suspend50_args *uap, - register_t *retval) + register_t *retval) { - /* { - syscallarg(const struct aiocb *const[]) list; - syscallarg(int) nent; - syscallarg(const struct timespec *) timeout; - } */ + struct proc *p = l->l_proc; + struct aioproc *aio = p->p_aio; struct aiocb **list; struct timespec ts; int error, nent; nent = SCARG(uap, nent); - if (nent <= 0 || nent > aio_listio_max) + if (nent <= 0 || nent > aio_listio_max) { return SET_ERROR(EAGAIN); + } + + if (aio == NULL) { + return SET_ERROR(EINVAL); + } if (SCARG(uap, timeout)) { /* Convert timespec to ticks */ error = copyin(SCARG(uap, timeout), &ts, - sizeof(struct timespec)); + sizeof(ts)); if (error) return error; } list = kmem_alloc(nent * sizeof(*list), KM_SLEEP); error = copyin(SCARG(uap, list), list, nent * sizeof(*list)); - if (error) + if (error) { goto out; - error = aio_suspend1(l, list, nent, SCARG(uap, timeout) ? &ts : NULL); + } + + error = aiosp_suspend(&aio->aiosp, list, nent, SCARG(uap, timeout) ? + &ts : NULL, AIOSP_SUSPEND_ANY); out: kmem_free(list, nent * sizeof(*list)); return error; } int -aio_suspend1(struct lwp *l, struct aiocb **aiocbp_list, int nent, - struct timespec *ts) +sys_aio_write(struct lwp *l, const struct sys_aio_write_args *uap, + register_t *retval) { - struct proc *p = l->l_proc; - struct aioproc *aio; - struct aio_job *a_job; - int i, error, timo; - - if (p->p_aio == NULL) - return SET_ERROR(EAGAIN); - aio = p->p_aio; - - if (ts) { - timo = mstohz((ts->tv_sec * 1000) + (ts->tv_nsec / 1000000)); - if (timo == 0 && ts->tv_sec == 0 && ts->tv_nsec > 0) - timo = 1; - if (timo <= 0) - return SET_ERROR(EAGAIN); - } else - timo = 0; - - mutex_enter(&aio->aio_mtx); - for (;;) { - for (i = 0; i < nent; i++) { - - /* Skip NULL entries */ - if (aiocbp_list[i] == NULL) - continue; - - /* Skip current job */ - if (aio->curjob) { - a_job = aio->curjob; - if (a_job->aiocb_uptr == aiocbp_list[i]) - continue; - } - - /* Look for a job in the queue */ - TAILQ_FOREACH(a_job, &aio->jobs_queue, list) - if (a_job->aiocb_uptr == aiocbp_list[i]) - break; - - if (a_job == NULL) { - struct aiocb aiocbp; - - mutex_exit(&aio->aio_mtx); - - /* Check if the job is done. */ - error = copyin(aiocbp_list[i], &aiocbp, - sizeof(struct aiocb)); - if (error == 0 && aiocbp._state != JOB_DONE) { - mutex_enter(&aio->aio_mtx); - continue; - } - return error; - } - } + int error; - /* Wait for a signal or when timeout occurs */ - error = cv_timedwait_sig(&aio->done_cv, &aio->aio_mtx, timo); - if (error) { - if (error == EWOULDBLOCK) - error = SET_ERROR(EAGAIN); - break; - } + error = aio_enqueue_job(AIO_WRITE, SCARG(uap, aiocbp), NULL); + if (error) { + return error; } - mutex_exit(&aio->aio_mtx); - return error; -} - -int -sys_aio_write(struct lwp *l, const struct sys_aio_write_args *uap, - register_t *retval) -{ - /* { - syscallarg(struct aiocb *) aiocbp; - } */ - return aio_enqueue_job(AIO_WRITE, SCARG(uap, aiocbp), NULL); + struct proc *p = l->l_proc; + struct aioproc *aio = p->p_aio; + KASSERT(aio); + return aiosp_distribute_jobs(&aio->aiosp); } int sys_lio_listio(struct lwp *l, const struct sys_lio_listio_args *uap, - register_t *retval) + register_t *retval) { - /* { - syscallarg(int) mode; - syscallarg(struct aiocb *const[]) list; - syscallarg(int) nent; - syscallarg(struct sigevent *) sig; - } */ struct proc *p = l->l_proc; struct aioproc *aio; struct aiocb **aiocbp_list; struct lio_req *lio; - int i, error, errcnt, mode, nent; + int i, error = 0, errcnt, mode, nent; mode = SCARG(uap, mode); nent = SCARG(uap, nent); /* Non-accurate checks for the limit and invalid values */ - if (nent < 1 || nent > aio_listio_max) + if (nent < 1 || nent > aio_listio_max) { return SET_ERROR(EINVAL); - if (aio_jobs_count + nent > aio_max) - return SET_ERROR(EAGAIN); + } - /* Check if AIO structure is initialized, if not - initialize it */ - if (p->p_aio == NULL) - if (aio_procinit(p)) + /* Check if AIO structure is initialized, if not initialize it */ + if (p->p_aio == NULL) { + if (aio_procinit(p)) { return SET_ERROR(EAGAIN); + } + } aio = p->p_aio; /* Create a LIO structure */ @@ -957,7 +1985,7 @@ sys_lio_listio(struct lwp *l, const struct sys_lio_listio_args *uap, switch (mode) { case LIO_WAIT: - memset(&lio->sig, 0, sizeof(struct sigevent)); + memset(&lio->sig, 0, sizeof(lio->sig)); break; case LIO_NOWAIT: /* Check for signal, validate it */ @@ -965,15 +1993,16 @@ sys_lio_listio(struct lwp *l, const struct sys_lio_listio_args *uap, struct sigevent *sig = &lio->sig; error = copyin(SCARG(uap, sig), &lio->sig, - sizeof(struct sigevent)); + sizeof(lio->sig)); if (error == 0 && - (sig->sigev_signo < 0 || - sig->sigev_signo >= NSIG || - sig->sigev_notify < SIGEV_NONE || - sig->sigev_notify > SIGEV_SA)) + (sig->sigev_signo < 0 || + sig->sigev_signo >= NSIG || + sig->sigev_notify < SIGEV_NONE || + sig->sigev_notify > SIGEV_SA)) error = SET_ERROR(EINVAL); - } else - memset(&lio->sig, 0, sizeof(struct sigevent)); + } else { + memset(&lio->sig, 0, sizeof(lio->sig)); + } break; default: error = SET_ERROR(EINVAL); @@ -988,7 +2017,7 @@ sys_lio_listio(struct lwp *l, const struct sys_lio_listio_args *uap, /* Get the list from user-space */ aiocbp_list = kmem_alloc(nent * sizeof(*aiocbp_list), KM_SLEEP); error = copyin(SCARG(uap, list), aiocbp_list, - nent * sizeof(*aiocbp_list)); + nent * sizeof(*aiocbp_list)); if (error) { mutex_enter(&aio->aio_mtx); goto err; @@ -1002,35 +2031,36 @@ sys_lio_listio(struct lwp *l, const struct sys_lio_listio_args *uap, * According to POSIX, in such error case it may * fail with other I/O operations initiated. */ - if (error) + if (error) { errcnt++; + } + } + + error = aiosp_distribute_jobs(&aio->aiosp); + if (error) { + goto err; } mutex_enter(&aio->aio_mtx); - /* Return an error, if any */ + /* Return an error if any */ if (errcnt) { error = SET_ERROR(EIO); goto err; } if (mode == LIO_WAIT) { - /* - * Wait for AIO completion. In such case, - * the LIO structure will be freed here. - */ - while (lio->refcnt > 1 && error == 0) - error = cv_wait_sig(&aio->done_cv, &aio->aio_mtx); - if (error) - error = SET_ERROR(EINTR); + error = aiosp_suspend(&aio->aiosp, aiocbp_list, nent, + NULL, AIOSP_SUSPEND_ALL); } err: - if (--lio->refcnt != 0) + if (--lio->refcnt != 0) { lio = NULL; + } mutex_exit(&aio->aio_mtx); if (lio != NULL) { - aio_sendsig(p, &lio->sig); + aiost_sigsend(p, &lio->sig); pool_put(&aio_lio_pool, lio); } kmem_free(aiocbp_list, nent * sizeof(*aiocbp_list)); @@ -1040,7 +2070,6 @@ sys_lio_listio(struct lwp *l, const struct sys_lio_listio_args *uap, /* * SysCtl */ - static int sysctl_aio_listio_max(SYSCTLFN_ARGS) { @@ -1052,11 +2081,13 @@ sysctl_aio_listio_max(SYSCTLFN_ARGS) newsize = aio_listio_max; error = sysctl_lookup(SYSCTLFN_CALL(&node)); - if (error || newp == NULL) + if (error || newp == NULL) { return error; + } - if (newsize < 1 || newsize > aio_max) + if (newsize < 1 || newsize > aio_max) { return SET_ERROR(EINVAL); + } aio_listio_max = newsize; return 0; @@ -1073,11 +2104,13 @@ sysctl_aio_max(SYSCTLFN_ARGS) newsize = aio_max; error = sysctl_lookup(SYSCTLFN_CALL(&node)); - if (error || newp == NULL) + if (error || newp == NULL) { return error; + } - if (newsize < 1 || newsize < aio_listio_max) + if (newsize < 1 || newsize < aio_listio_max) { return SET_ERROR(EINVAL); + } aio_max = newsize; return 0; @@ -1088,35 +2121,37 @@ SYSCTL_SETUP(sysctl_aio_init, "aio sysctl") int rv; rv = sysctl_createv(clog, 0, NULL, NULL, - CTLFLAG_PERMANENT | CTLFLAG_IMMEDIATE, - CTLTYPE_INT, "posix_aio", - SYSCTL_DESCR("Version of IEEE Std 1003.1 and its " + CTLFLAG_PERMANENT | CTLFLAG_IMMEDIATE, + CTLTYPE_INT, "posix_aio", + SYSCTL_DESCR("Version of IEEE Std 1003.1 and its " "Asynchronous I/O option to which the " "system attempts to conform"), - NULL, _POSIX_ASYNCHRONOUS_IO, NULL, 0, - CTL_KERN, CTL_CREATE, CTL_EOL); + NULL, _POSIX_ASYNCHRONOUS_IO, NULL, 0, + CTL_KERN, CTL_CREATE, CTL_EOL); - if (rv != 0) + if (rv != 0) { return; + } rv = sysctl_createv(clog, 0, NULL, NULL, - CTLFLAG_PERMANENT | CTLFLAG_READWRITE, - CTLTYPE_INT, "aio_listio_max", - SYSCTL_DESCR("Maximum number of asynchronous I/O " + CTLFLAG_PERMANENT | CTLFLAG_READWRITE, + CTLTYPE_INT, "aio_listio_max", + SYSCTL_DESCR("Maximum number of asynchronous I/O " "operations in a single list I/O call"), - sysctl_aio_listio_max, 0, &aio_listio_max, 0, - CTL_KERN, CTL_CREATE, CTL_EOL); + sysctl_aio_listio_max, 0, &aio_listio_max, 0, + CTL_KERN, CTL_CREATE, CTL_EOL); - if (rv != 0) + if (rv != 0) { return; + } rv = sysctl_createv(clog, 0, NULL, NULL, - CTLFLAG_PERMANENT | CTLFLAG_READWRITE, - CTLTYPE_INT, "aio_max", - SYSCTL_DESCR("Maximum number of asynchronous I/O " + CTLFLAG_PERMANENT | CTLFLAG_READWRITE, + CTLTYPE_INT, "aio_max", + SYSCTL_DESCR("Maximum number of asynchronous I/O " "operations"), - sysctl_aio_max, 0, &aio_max, 0, - CTL_KERN, CTL_CREATE, CTL_EOL); + sysctl_aio_max, 0, &aio_max, 0, + CTL_KERN, CTL_CREATE, CTL_EOL); return; } @@ -1130,45 +2165,92 @@ aio_print_jobs(void (*pr)(const char *, ...)) { struct proc *p = curlwp->l_proc; struct aioproc *aio; - struct aio_job *a_job; - struct aiocb *aiocbp; + struct aiosp *sp; + struct aio_job *job; if (p == NULL) { - (*pr)("AIO: We are not in the processes right now.\n"); + (*pr)("AIO: no current process context.\n"); return; } aio = p->p_aio; if (aio == NULL) { - (*pr)("AIO data is not initialized (PID = %d).\n", p->p_pid); + (*pr)("AIO: not initialized (pid=%d).\n", p->p_pid); return; } - (*pr)("AIO: PID = %d\n", p->p_pid); - (*pr)("AIO: Global count of the jobs = %u\n", aio_jobs_count); - (*pr)("AIO: Count of the jobs = %u\n", aio->jobs_count); - - if (aio->curjob) { - a_job = aio->curjob; - (*pr)("\nAIO current job:\n"); - (*pr)(" opcode = %d, errno = %d, state = %d, aiocb_ptr = %p\n", - a_job->aio_op, a_job->aiocbp._errno, - a_job->aiocbp._state, a_job->aiocb_uptr); - aiocbp = &a_job->aiocbp; - (*pr)(" fd = %d, offset = %u, buf = %p, nbytes = %u\n", - aiocbp->aio_fildes, aiocbp->aio_offset, - aiocbp->aio_buf, aiocbp->aio_nbytes); - } - - (*pr)("\nAIO queue:\n"); - TAILQ_FOREACH(a_job, &aio->jobs_queue, list) { - (*pr)(" opcode = %d, errno = %d, state = %d, aiocb_ptr = %p\n", - a_job->aio_op, a_job->aiocbp._errno, - a_job->aiocbp._state, a_job->aiocb_uptr); - aiocbp = &a_job->aiocbp; - (*pr)(" fd = %d, offset = %u, buf = %p, nbytes = %u\n", - aiocbp->aio_fildes, aiocbp->aio_offset, - aiocbp->aio_buf, aiocbp->aio_nbytes); + sp = &aio->aiosp; + + (*pr)("AIO: pid=%d\n", p->p_pid); + (*pr)("AIO: global jobs=%u, proc jobs=%u\n", aio_jobs_count, + aio->jobs_count); + (*pr)("AIO: sp{ total_threads=%zu active=%zu free=%zu pending=%zu\n" + " processing=%lu hash_buckets=%zu mask=%#x }\n", + sp->nthreads_total, sp->nthreads_active, sp->nthreads_free, + sp->jobs_pending, (u_long)sp->njobs_processing, + sp->aio_hash_size, sp->aio_hash_mask); + + /* Pending queue */ + (*pr)("\nqueue (%zu pending):\n", sp->jobs_pending); + TAILQ_FOREACH(job, &sp->jobs, list) { + (*pr)(" op=%d err=%d state=%d uptr=%p completed=%d\n", + job->aio_op, job->aiocbp._errno, job->aiocbp._state, + job->aiocb_uptr, job->completed); + (*pr)(" fd=%d off=%llu buf=%p nbytes=%zu lio=%p\n", + job->aiocbp.aio_fildes, + (unsigned long long)job->aiocbp.aio_offset, + (void *)job->aiocbp.aio_buf, + (size_t)job->aiocbp.aio_nbytes, + job->lio); + } + + /* Active service threads */ + (*pr)("\nactive threads (%zu):\n", sp->nthreads_active); + { + struct aiost *st; + TAILQ_FOREACH(st, &sp->active, list) { + (*pr)(" lwp=%p state=%d freelist=%d\n", + (void *)st->lwp, st->state, st->freelist ? 1 : 0); + + if (st->job) { + struct aio_job *j = st->job; + (*pr)(" job: op=%d err=%d state=%d uptr=%p\n", + j->aio_op, j->aiocbp._errno, + j->aiocbp._state, j->aiocb_uptr); + (*pr)(" fd=%d off=%llu buf=%p nbytes=%zu\n", + j->aiocbp.aio_fildes, + (unsigned long long)j->aiocbp.aio_offset, + (void *)j->aiocbp.aio_buf, + (size_t)j->aiocbp.aio_nbytes); + } + + if (st->fg) { + (*pr)(" file-group: fp=%p qlen=%zu\n", + (void *)st->fg->fp, + st->fg->queue_size); + } + } + } + + /* Freelist summary */ + (*pr)("\nfree threads (%zu)\n", sp->nthreads_free); + + /* aiocbp hash maps user aiocbp to kernel job */ + (*pr)("\naiocbp hash: buckets=%zu\n", sp->aio_hash_size); + if (sp->aio_hash != NULL && sp->aio_hash_size != 0) { + size_t b; + for (b = 0; b < sp->aio_hash_size; b++) { + struct aiocbp *hc; + if (TAILQ_EMPTY(&sp->aio_hash[b])) { + continue; + } + + (*pr)(" [%zu]:", b); + TAILQ_FOREACH(hc, &sp->aio_hash[b], list) { + (*pr)(" uptr=%p job=%p", hc->uptr, (void *)hc->job); + } + (*pr)("\n"); + } } } #endif /* defined(DDB) */ diff --git a/sys/sys/aio.h b/sys/sys/aio.h index cbf0959b02cee..183314634ee01 100644 --- a/sys/sys/aio.h +++ b/sys/sys/aio.h @@ -8,10 +8,10 @@ * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. + * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE @@ -31,6 +31,7 @@ #include #include +#include /* Returned by aio_cancel() */ #define AIO_CANCELED 0x1 @@ -52,7 +53,7 @@ */ struct aiocb { off_t aio_offset; /* File offset */ - volatile void *aio_buf; /* I/O buffer in process space */ + void *aio_buf; /* I/O buffer in process space */ size_t aio_nbytes; /* Length of transfer */ int aio_fildes; /* File descriptor */ int aio_lio_opcode; /* LIO opcode */ @@ -90,13 +91,97 @@ struct aiocb { #define JOB_WIP 0x1 #define JOB_DONE 0x2 +/* Structure for tracking the status of a collection of OPS */ +struct aiowaitgroup { + kmutex_t mtx; /* Protects entire structure */ + kcondvar_t done_cv; /* Signaled when job completes */ + size_t completed; /* Number of completed jobs in this wait group */ + size_t total; /* Total jobs being waited on */ + bool active; /* False after suspend returns/times out */ + int refcnt; /* Reference count */ +}; + +/* */ +struct aiowaitgrouplk { + kmutex_t mtx; /* Protects wgs array modifications */ + void **wgs; /* Dynamic array of waiting aiowaitgroups */ + size_t s; /* Allocated size of wgs array */ + size_t n; /* Current number of waitgroups */ +}; + /* Structure of AIO job */ +struct aiost; +struct buf; struct aio_job { - int aio_op; /* Operation code */ - struct aiocb aiocbp; /* AIO data structure */ - void *aiocb_uptr; /* User-space pointer for identification of job */ + kmutex_t mtx; /* Protects completed flag */ + int aio_op; /* Operation type (AIO_READ/WRITE/SYNC) */ + struct aiocb aiocbp; /* User-visible AIO control block */ + void *aiocb_uptr; /* User pointer for job identification */ + struct proc *p; /* Originating process */ + bool completed; /* Job completion status */ + bool on_queue; /* Whether or not this job is on sp->jobs */ + struct file *fp; /* File pointer associated with the job */ + struct aiowaitgrouplk lk; /* List of waitgroups waiting on this job */ TAILQ_ENTRY(aio_job) list; - struct lio_req *lio; + struct lio_req *lio; /* List I/O request (if part of lio_listio) */ +}; + +#define AIOST_STATE_NONE 0x1 +#define AIOST_STATE_OPERATION 0x2 +#define AIOST_STATE_TERMINATE 0x4 + +#define AIOSP_SUSPEND_ANY 0x1 +#define AIOSP_SUSPEND_ALL 0x2 + +struct aiost; +struct aiost_file_group { + RB_ENTRY(aiost_file_group) tree; + struct file *fp; + struct aiost *aiost; + kmutex_t mtx; + TAILQ_HEAD(, aio_job) queue; + size_t queue_size; +}; + +/* Structure for AIO servicing thread */ +struct aiosp; +struct aiost { + TAILQ_ENTRY(aiost) list; + struct aiosp *aiosp; /* Servicing pool of this thread */ + kmutex_t mtx; /* Protects this structure */ + kcondvar_t service_cv; /* Signal to activate thread */ + struct lwp *lwp; /* Servicing thread LWP */ + int state; /* The state of the thread */ + bool freelist; /* Whether or not aiost is on freelist */ + struct aiost_file_group *fg; /* File group associated with the thread */ + struct aio_job *job; /* Singleton job */ +}; + +struct aiocbp { + TAILQ_ENTRY(aiocbp) list; + const void *uptr; + struct aio_job *job; +}; + +/* Structure for AIO servicing pool */ +TAILQ_HEAD(aiost_list, aiost); +TAILQ_HEAD(aiocbp_list, aiocbp); +struct aiost_file_tree; +struct aiosp { + struct aiost_list freelist; /* Available service threads */ + size_t nthreads_free; /* Length of freelist */ + struct aiost_list active; /* Active servicing threads */ + size_t nthreads_active; /* length of active list */ + TAILQ_HEAD(, aio_job) jobs; /* Queue of pending jobs */ + size_t jobs_pending; /* Number of pending jobs */ + kmutex_t mtx; /* Protects structure */ + size_t nthreads_total; /* Number of total servicing threads */ + volatile u_long njobs_processing;/* Number of total jobs currently being processed*/ + struct aiocbp_list *aio_hash; /* Aiocbp hash root */ + kmutex_t aio_hash_mtx; /* Protects the hash table */ + size_t aio_hash_size; /* Total number of buckets */ + u_int aio_hash_mask; /* Hash mask */ + struct aiost_file_tree *fg_root;/* RB tree of file groups */ }; /* LIO structure */ @@ -108,19 +193,40 @@ struct lio_req { /* Structure of AIO data for process */ struct aioproc { kmutex_t aio_mtx; /* Protects the entire structure */ - kcondvar_t aio_worker_cv; /* Signals on a new job */ - kcondvar_t done_cv; /* Signals when the job is done */ - struct aio_job *curjob; /* Currently processing AIO job */ unsigned int jobs_count; /* Count of the jobs */ - TAILQ_HEAD(, aio_job) jobs_queue;/* Queue of the AIO jobs */ - struct lwp *aio_worker; /* AIO worker thread */ + struct aiosp aiosp; /* Per-process service pool */ }; extern u_int aio_listio_max; -/* Prototypes */ + +/* + * Prototypes + */ + void aio_print_jobs(void (*)(const char *, ...) __printflike(1, 2)); int aio_suspend1(struct lwp *, struct aiocb **, int, struct timespec *); +int aiosp_initialize(struct aiosp *); +int aiosp_destroy(struct aiosp *, int *); +int aiosp_distribute_jobs(struct aiosp *); +int aiosp_enqueue_job(struct aiosp *, struct aio_job *); +int aiosp_suspend(struct aiosp *, struct aiocb **, int, struct timespec *, + int); +int aiosp_flush(struct aiosp *); +int aiosp_validate_conflicts(struct aiosp *, const void *); +int aiosp_error(struct aiosp *, const void *, register_t *); +int aiosp_return(struct aiosp *, const void *, register_t *); + +void aiowaitgroup_init(struct aiowaitgroup *); +void aiowaitgroup_fini(struct aiowaitgroup *); +int aiowaitgroup_wait(struct aiowaitgroup *, int); +void aiowaitgroup_done(struct aiowaitgroup *); +void aiowaitgroup_join(struct aiowaitgroup *, struct aiowaitgrouplk *); +void aiowaitgrouplk_init(struct aiowaitgrouplk *); +void aiowaitgrouplk_fini(struct aiowaitgrouplk *); +void aiowaitgrouplk_flush(struct aiowaitgrouplk *); + + #endif /* _KERNEL */ #endif /* _SYS_AIO_H_ */ diff --git a/tests/lib/libc/sys/Makefile b/tests/lib/libc/sys/Makefile index bde309956db03..193b9392c2df1 100644 --- a/tests/lib/libc/sys/Makefile +++ b/tests/lib/libc/sys/Makefile @@ -93,9 +93,18 @@ TESTS_C+= t_wait TESTS_C+= t_wait_noproc TESTS_C+= t_wait_noproc_wnohang TESTS_C+= t_write +TESTS_C+= t_aio_cancel +TESTS_C+= t_aio_suspend +TESTS_C+= t_aio_rw +TESTS_C+= t_aio_lio SRCS.t_mprotect= t_mprotect.c ${SRCS_EXEC_PROT} t_mprotect_helper.c +LDADD.t_aio_cancel+= -lrt -lpthread +LDADD.t_aio_suspend+= -lrt -lpthread +LDADD.t_aio_rw+= -lrt -lpthread +LDADD.t_aio_lio+= -lrt -lpthread + LDADD.t_eventfd+= -lpthread LDADD.t_getpid+= -lpthread LDADD.t_mmap+= -lpthread diff --git a/tests/lib/libc/sys/t_aio_cancel.c b/tests/lib/libc/sys/t_aio_cancel.c new file mode 100644 index 0000000000000..64bdd43561d30 --- /dev/null +++ b/tests/lib/libc/sys/t_aio_cancel.c @@ -0,0 +1,221 @@ +/* $NetBSD: t_aio_cancel.c,v 1.00 2025/08/26 00:00:00 ethan4984 Exp $ */ + +/* + * Copyright (c) 2025 The NetBSD Foundation, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND + * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE + * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN + * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int mktemp_file(char *, size_t); +static void fill_pattern(uint8_t *, size_t, uint8_t); +static void wait_all(const struct aiocb * const [], size_t); + +static int +mktemp_file(char *path, size_t pathlen) +{ + int fd, n; + + n = snprintf(path, pathlen, "t_aio_cancel.XXXXXX"); + ATF_REQUIRE(n > 0 && (size_t)n < pathlen); + + fd = mkstemp(path); + ATF_REQUIRE(fd >= 0); + + return fd; +} + +static void +fill_pattern(uint8_t *buf, size_t len, uint8_t seed) +{ + size_t i; + + for (i = 0; i < len; i++) { + buf[i] = (uint8_t)(seed + (i & 0xff)); + } +} + +static void +wait_all(const struct aiocb * const list[], size_t nent) +{ + size_t i; + int pending, rv; + + for (;;) { + pending = 0; + + for (i = 0; i < nent; i++) { + int err; + + if (list[i] == NULL) { + continue; + } + + err = aio_error(list[i]); + if (err == EINPROGRESS) { + pending = 1; + } + } + + if (!pending) { + break; + } + + rv = aio_suspend(list, (int)nent, NULL); + ATF_REQUIRE_EQ_MSG(0, rv, "aio_suspend failed: %s", strerror(errno)); + } +} + +ATF_TC_WITHOUT_HEAD(cancel_active_write); +ATF_TC_BODY(cancel_active_write, tc) +{ + char path[64]; + int fd, rv, crv, err; + const size_t blksz = 0x1000; + uint8_t *wbuf; + struct aiocb cb; + const struct aiocb *list[1]; + + fd = mktemp_file(path, sizeof(path)); + + wbuf = malloc(blksz); + ATF_REQUIRE(wbuf != NULL); + fill_pattern(wbuf, blksz, 0x33); + + memset(&cb, 0, sizeof(cb)); + cb.aio_fildes = fd; + cb.aio_buf = wbuf; + cb.aio_nbytes = blksz; + cb.aio_offset = 0; + + rv = aio_write(&cb); + ATF_REQUIRE_EQ(0, rv); + + crv = aio_cancel(fd, &cb); + ATF_REQUIRE(crv == AIO_CANCELED || crv == AIO_NOTCANCELED || crv == AIO_ALLDONE); + + if (crv == AIO_CANCELED) { + do { + err = aio_error(&cb); + } while (err == EINPROGRESS); + ATF_REQUIRE_EQ(ECANCELED, err); + ATF_REQUIRE_EQ(-1, aio_return(&cb)); + } else if (crv == AIO_NOTCANCELED) { + list[0] = &cb; + wait_all(list, 1); + ATF_REQUIRE_EQ(0, aio_error(&cb)); + ATF_REQUIRE_EQ((ssize_t)blksz, aio_return(&cb)); + } else { + do { + err = aio_error(&cb); + } while (err == EINPROGRESS); + ATF_REQUIRE_EQ(0, err); + ATF_REQUIRE_EQ((ssize_t)blksz, aio_return(&cb)); + } + + rv = close(fd); + ATF_REQUIRE_EQ(0, rv); + rv = unlink(path); + ATF_REQUIRE_EQ(0, rv); + + free(wbuf); +} + +ATF_TC_WITHOUT_HEAD(cancel_completed_request); +ATF_TC_BODY(cancel_completed_request, tc) +{ + char path[64]; + int fd, rv, crv; + const size_t blksz = 4096; + uint8_t *wbuf; + struct aiocb cb; + const struct aiocb *list[1]; + + fd = mktemp_file(path, sizeof(path)); + + wbuf = malloc(blksz); + ATF_REQUIRE(wbuf != NULL); + memset(wbuf, 0x7E, blksz); + + memset(&cb, 0, sizeof(cb)); + cb.aio_fildes = fd; + cb.aio_buf = wbuf; + cb.aio_nbytes = blksz; + cb.aio_offset = 0; + + rv = aio_write(&cb); + ATF_REQUIRE_EQ(0, rv); + + list[0] = &cb; + wait_all(list, 1); + ATF_REQUIRE_EQ(0, aio_error(&cb)); + ATF_REQUIRE_EQ((ssize_t)blksz, aio_return(&cb)); + + crv = aio_cancel(fd, &cb); + ATF_REQUIRE_EQ(AIO_ALLDONE, crv); + + rv = close(fd); + ATF_REQUIRE_EQ(0, rv); + rv = unlink(path); + ATF_REQUIRE_EQ(0, rv); + + free(wbuf); +} + +ATF_TC_WITHOUT_HEAD(cancel_invalid_fd); +ATF_TC_BODY(cancel_invalid_fd, tc) +{ + struct aiocb cb; + int crv; + + memset(&cb, 0, sizeof(cb)); + cb.aio_fildes = -1; + + errno = 0; + crv = aio_cancel(-1, &cb); + ATF_REQUIRE_EQ(-1, crv); + ATF_REQUIRE_EQ(EBADF, errno); +} + +ATF_TP_ADD_TCS(tp) +{ + ATF_TP_ADD_TC(tp, cancel_active_write); + ATF_TP_ADD_TC(tp, cancel_completed_request); + ATF_TP_ADD_TC(tp, cancel_invalid_fd); + return atf_no_error(); +} diff --git a/tests/lib/libc/sys/t_aio_lio.c b/tests/lib/libc/sys/t_aio_lio.c new file mode 100644 index 0000000000000..c841c9ed3376a --- /dev/null +++ b/tests/lib/libc/sys/t_aio_lio.c @@ -0,0 +1,262 @@ +/* $NetBSD: t_aio_lio.c,v 1.00 2025/08/26 00:00:00 ethan4984 Exp $ */ + +/* + * Copyright (c) 2025 The NetBSD Foundation, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND + * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE + * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN + * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int mktemp_file(char *, size_t); +static void fill_pattern(uint8_t *, size_t, uint8_t); +static void wait_all(const struct aiocb * const [], size_t); + +static int +mktemp_file(char *path, size_t pathlen) +{ + int fd, n; + + n = snprintf(path, pathlen, "t_aio_lio.XXXXXX"); + ATF_REQUIRE(n > 0 && (size_t)n < pathlen); + + fd = mkstemp(path); + ATF_REQUIRE(fd >= 0); + + return fd; +} + +static void +fill_pattern(uint8_t *buf, size_t len, uint8_t seed) +{ + size_t i; + + for (i = 0; i < len; i++) { + buf[i] = (uint8_t)(seed + (i & 0xff)); + } +} + +static void +wait_all(const struct aiocb * const list[], size_t nent) +{ + size_t i; + int pending, rv; + + for (;;) { + pending = 0; + + for (i = 0; i < nent; i++) { + int err; + + if (list[i] == NULL) { + continue; + } + + err = aio_error(list[i]); + if (err == EINPROGRESS) { + pending = 1; + } + } + + if (!pending) { + break; + } + + rv = aio_suspend(list, (int)nent, NULL); + ATF_REQUIRE_EQ_MSG(0, rv, "aio_suspend failed: %s", + strerror(errno)); + } +} + +ATF_TC_WITHOUT_HEAD(lio_nowait); +ATF_TC_BODY(lio_nowait, tc) +{ + char path[64]; + int fd, rv; + const size_t nreq = 8, blksz = 8192; + uint8_t *bufs[nreq]; + struct aiocb cbs[nreq]; + struct aiocb *list[nreq]; + off_t off; + size_t i; + + fd = mktemp_file(path, sizeof(path)); + + off = 0; + for (i = 0; i < nreq; i++) { + bufs[i] = malloc(blksz); + ATF_REQUIRE(bufs[i] != NULL); + + fill_pattern(bufs[i], blksz, (uint8_t)i); + + memset(&cbs[i], 0, sizeof(cbs[i])); + cbs[i].aio_fildes = fd; + cbs[i].aio_buf = bufs[i]; + cbs[i].aio_nbytes = blksz; + cbs[i].aio_offset = off; + cbs[i].aio_lio_opcode = LIO_WRITE; + + list[i] = &cbs[i]; + off += (off_t)blksz; + } + + rv = lio_listio(LIO_NOWAIT, list, (int)nreq, NULL); + ATF_REQUIRE_EQ_MSG(0, rv, "lio_listio failed: %s", + strerror(errno)); + + wait_all((const struct aiocb * const *)list, nreq); + + for (i = 0; i < nreq; i++) { + int err; + ssize_t done; + + err = aio_error(&cbs[i]); + ATF_REQUIRE_EQ(0, err); + + done = aio_return(&cbs[i]); + ATF_REQUIRE_EQ((ssize_t)blksz, done); + + free(bufs[i]); + } + + rv = close(fd); + ATF_REQUIRE_EQ(0, rv); + rv = unlink(path); + ATF_REQUIRE_EQ(0, rv); +} + +ATF_TC_WITHOUT_HEAD(lio_wait_write_then_read); +ATF_TC_BODY(lio_wait_write_then_read, tc) +{ + char path[64]; + int fd, rv; + const size_t nreq = 4, blksz = 4096; + + uint8_t *wbufs[nreq]; + struct aiocb wcbs[nreq]; + struct aiocb *wlist[nreq]; + + uint8_t *rbufs[nreq]; + struct aiocb rcbs[nreq]; + struct aiocb *rlist[nreq]; + + size_t i; + off_t off; + + fd = mktemp_file(path, sizeof(path)); + + off = 0; + for (i = 0; i < nreq; i++) { + wbufs[i] = malloc(blksz); + ATF_REQUIRE(wbufs[i] != NULL); + + fill_pattern(wbufs[i], blksz, (uint8_t)(0xA0 + i)); + + memset(&wcbs[i], 0, sizeof(wcbs[i])); + wcbs[i].aio_fildes = fd; + wcbs[i].aio_buf = wbufs[i]; + wcbs[i].aio_nbytes = blksz; + wcbs[i].aio_offset = off; + wcbs[i].aio_lio_opcode = LIO_WRITE; + + wlist[i] = &wcbs[i]; + off += (off_t)blksz; + } + + rv = lio_listio(LIO_WAIT, wlist, (int)nreq, NULL); + ATF_REQUIRE_EQ_MSG(0, rv, "lio_listio write failed: %s", + strerror(errno)); + + for (i = 0; i < nreq; i++) { + int err; + ssize_t done; + + err = aio_error(&wcbs[i]); + ATF_REQUIRE_EQ(0, err); + + done = aio_return(&wcbs[i]); + ATF_REQUIRE_EQ((ssize_t)blksz, done); + } + + for (i = 0; i < nreq; i++) { + rbufs[i] = calloc(1, blksz); + ATF_REQUIRE(rbufs[i] != NULL); + + memset(&rcbs[i], 0, sizeof(rcbs[i])); + rcbs[i].aio_fildes = fd; + rcbs[i].aio_buf = rbufs[i]; + rcbs[i].aio_nbytes = blksz; + rcbs[i].aio_offset = (off_t)i * (off_t)blksz; + rcbs[i].aio_lio_opcode = LIO_READ; + + rlist[i] = &rcbs[i]; + } + + rv = lio_listio(LIO_NOWAIT, rlist, (int)nreq, NULL); + ATF_REQUIRE_EQ_MSG(0, rv, "lio_listio read failed: %s", + strerror(errno)); + + wait_all((const struct aiocb * const *)rlist, nreq); + + for (i = 0; i < nreq; i++) { + int err; + ssize_t done; + + err = aio_error(&rcbs[i]); + ATF_REQUIRE_EQ(0, err); + + done = aio_return(&rcbs[i]); + ATF_REQUIRE_EQ((ssize_t)blksz, done); + + ATF_REQUIRE_EQ(0, memcmp(wbufs[i], rbufs[i], blksz)); + + free(wbufs[i]); + free(rbufs[i]); + } + + rv = close(fd); + ATF_REQUIRE_EQ(0, rv); + rv = unlink(path); + ATF_REQUIRE_EQ(0, rv); +} + +ATF_TP_ADD_TCS(tp) +{ + ATF_TP_ADD_TC(tp, lio_nowait); + ATF_TP_ADD_TC(tp, lio_wait_write_then_read); + + return atf_no_error(); +} diff --git a/tests/lib/libc/sys/t_aio_rw.c b/tests/lib/libc/sys/t_aio_rw.c new file mode 100644 index 0000000000000..0c47d0f76c338 --- /dev/null +++ b/tests/lib/libc/sys/t_aio_rw.c @@ -0,0 +1,167 @@ +/* $NetBSD: t_aio_rw.c,v 1.00 2025/08/26 00:00:00 ethan4984 Exp $ */ + +/* + * Copyright (c) 2025 The NetBSD Foundation, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND + * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE + * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN + * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int mktemp_file(char *, size_t); +static void fill_pattern(uint8_t *, size_t, uint8_t); +static void wait_all(const struct aiocb * const [], size_t); + +static int +mktemp_file(char *path, size_t pathlen) +{ + int fd, n; + + n = snprintf(path, pathlen, "t_aio_rw.XXXXXX"); + ATF_REQUIRE(n > 0 && (size_t)n < pathlen); + + fd = mkstemp(path); + ATF_REQUIRE(fd >= 0); + + return fd; +} + +static void +fill_pattern(uint8_t *buf, size_t len, uint8_t seed) +{ + size_t i; + + for (i = 0; i < len; i++) { + buf[i] = (uint8_t)(seed + (i & 0xff)); + } +} + +static void +wait_all(const struct aiocb * const list[], size_t nent) +{ + size_t i; + int pending, rv, error; + + for (;;) { + pending = 0; + + for (i = 0; i < nent; i++) { + if (list[i] == NULL) { + continue; + } + + error = aio_error(list[i]); + if (error == EINPROGRESS) { + pending = 1; + } + } + + if (!pending) { + break; + } + + rv = aio_suspend(list, (int)nent, NULL); + ATF_REQUIRE_EQ_MSG(0, rv, "aio_suspend failed: %s", + strerror(errno)); + } +} + +/* + * write_then_read_back + * Write a block then read it back asynchronously and compare. + */ +ATF_TC_WITHOUT_HEAD(write_then_read_back); +ATF_TC_BODY(write_then_read_back, tc) +{ + char path[64]; + int fd, rv; + const size_t blksz = 0x2000; + uint8_t *wbuf, *rbuf; + struct aiocb wcb, rcb; + const struct aiocb *wlist[1], *rlist[1]; + + fd = mktemp_file(path, sizeof(path)); + + wbuf = malloc(blksz); + rbuf = calloc(1, blksz); + ATF_REQUIRE(wbuf != NULL && rbuf != NULL); + + fill_pattern(wbuf, blksz, 0xA0); + + memset(&wcb, 0, sizeof(wcb)); + wcb.aio_fildes = fd; + wcb.aio_buf = wbuf; + wcb.aio_nbytes = blksz; + wcb.aio_offset = 0; + + rv = aio_write(&wcb); + ATF_REQUIRE_EQ(0, rv); + wlist[0] = &wcb; + wait_all(wlist, 1); + + ATF_REQUIRE_EQ(0, aio_error(&wcb)); + ATF_REQUIRE_EQ((ssize_t)blksz, aio_return(&wcb)); + + memset(&rcb, 0, sizeof(rcb)); + rcb.aio_fildes = fd; + rcb.aio_buf = rbuf; + rcb.aio_nbytes = blksz; + rcb.aio_offset = 0; + + rv = aio_read(&rcb); + ATF_REQUIRE_EQ(0, rv); + rlist[0] = &rcb; + wait_all(rlist, 1); + + ATF_REQUIRE_EQ(0, aio_error(&rcb)); + ATF_REQUIRE_EQ((ssize_t)blksz, aio_return(&rcb)); + ATF_REQUIRE_EQ(0, memcmp(wbuf, rbuf, blksz)); + + rv = close(fd); + ATF_REQUIRE_EQ(0, rv); + rv = unlink(path); + ATF_REQUIRE_EQ(0, rv); + + free(wbuf); + free(rbuf); +} + +ATF_TP_ADD_TCS(tp) +{ + ATF_TP_ADD_TC(tp, write_then_read_back); + return atf_no_error(); +} diff --git a/tests/lib/libc/sys/t_aio_suspend.c b/tests/lib/libc/sys/t_aio_suspend.c new file mode 100644 index 0000000000000..6f766d5a0e685 --- /dev/null +++ b/tests/lib/libc/sys/t_aio_suspend.c @@ -0,0 +1,170 @@ +/* $NetBSD: t_aio_suspend.c,v 1.00 2025/08/26 00:00:00 ethan4984 Exp $ */ + +/* + * Copyright (c) 2025 The NetBSD Foundation, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND + * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE + * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN + * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int mktemp_file(char *, size_t); +static void fill_pattern(uint8_t *, size_t, uint8_t); +static void wait_cb(struct aiocb *); + +static int +mktemp_file(char *path, size_t pathlen) +{ + int fd, n; + + n = snprintf(path, pathlen, "t_aio_suspend.XXXXXX"); + ATF_REQUIRE(n > 0 && (size_t)n < pathlen); + + fd = mkstemp(path); + ATF_REQUIRE(fd >= 0); + + return fd; +} + +static void +fill_pattern(uint8_t *buf, size_t len, uint8_t seed) +{ + size_t i; + + for (i = 0; i < len; i++) { + buf[i] = (uint8_t)(seed + (i & 0xff)); + } +} + +static void +wait_cb(struct aiocb *cb) +{ + const struct aiocb *one[1]; + int rv; + + one[0] = cb; + while (aio_error(cb) == EINPROGRESS) { + rv = aio_suspend(one, 1, NULL); + ATF_REQUIRE_EQ(0, rv); + } + if (aio_error(cb) == 0) { + aio_return(cb); + } +} + +ATF_TC_WITHOUT_HEAD(suspend_any); +ATF_TC_BODY(suspend_any, tc) +{ + char path[64]; + int fd, rv; + const size_t blksz = 4096; + uint8_t *buf0, *buf1; + struct aiocb cb0, cb1; + const struct aiocb *list[2]; + int done; + + fd = mktemp_file(path, sizeof(path)); + + buf0 = malloc(blksz); + buf1 = malloc(blksz); + ATF_REQUIRE(buf0 != NULL && buf1 != NULL); + fill_pattern(buf0, blksz, 0x20); + fill_pattern(buf1, blksz, 0x40); + + memset(&cb0, 0, sizeof(cb0)); + cb0.aio_fildes = fd; + cb0.aio_buf = buf0; + cb0.aio_nbytes = blksz; + cb0.aio_offset = 0; + + memset(&cb1, 0, sizeof(cb1)); + cb1.aio_fildes = fd; + cb1.aio_buf = buf1; + cb1.aio_nbytes = blksz; + cb1.aio_offset = blksz; + + ATF_REQUIRE_EQ(0, aio_write(&cb0)); + ATF_REQUIRE_EQ(0, aio_write(&cb1)); + + list[0] = &cb0; + list[1] = &cb1; + + rv = aio_suspend(list, 2, NULL); + ATF_REQUIRE_EQ(0, rv); + + done = 0; + if (aio_error(&cb0) != EINPROGRESS) { + done++; + if (aio_error(&cb0) == 0) { + ATF_REQUIRE_EQ((ssize_t)blksz, aio_return(&cb0)); + } else { + ATF_REQUIRE_EQ(ECANCELED, aio_error(&cb0)); + ATF_REQUIRE_EQ(-1, aio_return(&cb0)); + } + } + if (aio_error(&cb1) != EINPROGRESS) { + done++; + if (aio_error(&cb1) == 0) { + ATF_REQUIRE_EQ((ssize_t)blksz, aio_return(&cb1)); + } else { + ATF_REQUIRE_EQ(ECANCELED, aio_error(&cb1)); + ATF_REQUIRE_EQ(-1, aio_return(&cb1)); + } + } + ATF_REQUIRE(done >= 1); + + if (aio_error(&cb0) == EINPROGRESS) { + wait_cb(&cb0); + } + if (aio_error(&cb1) == EINPROGRESS) { + wait_cb(&cb1); + } + + rv = close(fd); + ATF_REQUIRE_EQ(0, rv); + rv = unlink(path); + ATF_REQUIRE_EQ(0, rv); + + free(buf0); + free(buf1); +} + +ATF_TP_ADD_TCS(tp) +{ + ATF_TP_ADD_TC(tp, suspend_any); + return atf_no_error(); +}