diff --git a/CHANGELOG.md b/CHANGELOG.md index 84d4804..022681a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Fixed +- **`ThreadPool` synchronous task: snapshot use-after-free when the task spawns an un-awaited coroutine** — a sync-mode task body ran inline in the worker and its per-task snapshot arena (which backs every spawned closure's op_array) was freed the instant the body returned, while a coroutine the body had spawned was still pending; running it later dereferenced freed memory (Windows debug-heap crash; ASAN-caught on Linux). The body now runs as a coroutine in its own per-task **nursery** `Scope`: `Async\spawn()` inside the body lands in that scope on its own (no scope-pointer hijacking), and on task exit the scope is cancelled and *drained* — awaited until every spawned coroutine is physically disposed — before the snapshot is freed. ABI bumped to v0.20.0: new `zend_async_scope_await_after_cancellation_fn` exposes the C core of `Scope::awaitAfterCancellation` so the worker reuses the canonical zombie-aware drain instead of hand-rolling it. Regression test `tests/thread_pool/065-task_scope_nursery_no_uaf.phpt`. +- **`ThreadPool`: fatal in a task no longer leaves a use-after-free or a leaked libuv loop** — a fatal (e.g. OOM) in a task body now rejects the future with `ThreadTransferException` and tears the pool down cleanly. The snapshot's op_array name strings (`function_name`, `filename`) are materialized into refcounted heap strings so holders that outlive the snapshot arena (the closure, `PG(last_error_file)`) are freed by refcount instead of dangling. The `zend_bailout()` that a fatal re-raises through a parked `ThreadChannel` send/recv or the worker's slot wait is now caught so the channel/slot trigger is disposed before re-raising — an undisposed trigger's open `uv_async` would block `uv_loop_close` and leak the reactor's loop internals. Debug builds dump any libuv handle that survives reactor shutdown. Regression tests `tests/thread_pool/066`–`068`. +- **`ThreadPool` coroutine-mode task: a fatal now reports its cause instead of resolving to `null`** — in `coroutine: true` mode a task that hit a fatal/OOM (or `exit()`/`die()`) resolved its future to a silent `null`, because the completion path only checked for a thrown exception and a bailout is not one. It now detects the bailout (no exception, `UNDEF` result) and rejects the future with a `ThreadTransferException` carrying the fatal message, matching synchronous mode. Tests `tests/thread_pool/067`, `068`. +- **`spawn_thread`/`ThreadPool`: cross-thread use-after-free of a closure's captured-variable names** — a transferred closure's `use`-variable names were copied with `zend_string_dup()`, which returns interned strings unchanged; the variable names are interned in the parent thread, so the worker's snapshot held pointers into the parent's interned-string table. When the parent aborted/ended its request (`zend_interned_strings_deactivate`) while a worker was still building the closure, the worker read freed memory in `async_thread_create_closure` (ASAN `heap-use-after-free`). The keys are now copied into private persistent strings owned by the snapshot. Regression tests `tests/thread/050`, `052`. - **`TaskSet`/`TaskGroup(scope: $scope)` use-after-free on teardown** — the group held only an event refcount on a PHP-supplied scope, which is shared with coroutine bookkeeping, so a finishing coroutine could free the scope while `group->scope` still pointed at it (`task_group.c:486`, seen as `zend_mm_heap corrupted`). The group now holds a strong ref to the external Scope object for its lifetime, so the scope can't be disposed while in use. Test `tests/task_group/043-task_group_external_scope_uaf.phpt`. ## [0.7.0] - 2026-06-02 diff --git a/async_API.c b/async_API.c index 371f48b..b456279 100644 --- a/async_API.c +++ b/async_API.c @@ -1304,6 +1304,7 @@ void async_api_register(void) async_scheduler_coroutine_enqueue, async_coroutine_resume, async_coroutine_cancel, + async_scope_await_after_cancellation, async_spawn_and_throw, start_graceful_shutdown, async_waker_new, diff --git a/libuv_reactor.c b/libuv_reactor.c index 799aaa1..00b868d 100644 --- a/libuv_reactor.c +++ b/libuv_reactor.c @@ -411,6 +411,18 @@ static void libuv_reactor_stop_with_exception(void) /* }}} */ +#ifdef ZEND_DEBUG +/* Dump a libuv handle that survived reactor shutdown — such a handle keeps the + * loop from closing (uv_loop_close => EBUSY) and leaks the loop's internals. */ +static void libuv_debug_dump_handle(uv_handle_t *handle, void *arg) +{ + (void) arg; + fprintf(stderr, "async: leftover libuv handle: type=%s active=%d closing=%d has_ref=%d\n", + uv_handle_type_name(uv_handle_get_type(handle)), + uv_is_active(handle), uv_is_closing(handle), uv_has_ref(handle)); +} +#endif + /* {{{ libuv_reactor_shutdown */ bool libuv_reactor_shutdown(void) { @@ -434,9 +446,20 @@ bool libuv_reactor_shutdown(void) if (uv_loop_alive(UVLOOP)) { #ifdef ZEND_DEBUG fprintf(stderr, "async: libuv shutdown timeout; loop left open\n"); + uv_walk(UVLOOP, libuv_debug_dump_handle, NULL); #endif } else { - uv_loop_close(UVLOOP); + /* uv_loop_close fails with EBUSY if any handle is still open (even an + * unref'd one, which uv_loop_alive ignores) — that would silently + * leak the loop's internals. Surface it in debug builds. */ + const int close_result = uv_loop_close(UVLOOP); + if (UNEXPECTED(close_result != 0)) { +#ifdef ZEND_DEBUG + fprintf(stderr, "async: uv_loop_close failed (%s); leftover handles:\n", + uv_err_name(close_result)); + uv_walk(UVLOOP, libuv_debug_dump_handle, NULL); +#endif + } } ASYNC_G(reactor_started) = false; diff --git a/scope.c b/scope.c index 013019f..ecefbaa 100644 --- a/scope.c +++ b/scope.c @@ -371,95 +371,115 @@ METHOD(awaitCompletion) zend_async_waker_clean(current_coroutine); } -METHOD(awaitAfterCancellation) +/* C core of Scope::awaitAfterCancellation, also used by the thread pool via the + * async API. Suspends `awaiter` until the scope is COMPLETELY_DONE (active and + * zombie counts both zero — physical disposal, not just "completed"). `awaiter` + * must not belong to `scope`. `error_fci`/`cancellation` are optional. */ +void async_scope_await_after_cancellation( + zend_async_scope_t *zend_scope, zend_coroutine_t *awaiter, + zend_fcall_info *error_fci, zend_fcall_info_cache *error_fci_cache, + zend_async_event_t *cancellation) { - zend_fcall_info error_handler_fci = { 0 }; - zend_fcall_info_cache error_handler_fcc = { 0 }; - zend_object *cancellation_obj = NULL; - - ZEND_PARSE_PARAMETERS_START(0, 2) - Z_PARAM_OPTIONAL - Z_PARAM_FUNC_OR_NULL(error_handler_fci, error_handler_fcc) - Z_PARAM_OBJ_OR_NULL(cancellation_obj) - ZEND_PARSE_PARAMETERS_END(); - - // Mark cancellation token as used immediately, before any early returns - if (cancellation_obj != NULL) { - zend_async_event_t *cancellation_event = ZEND_ASYNC_OBJECT_TO_EVENT(cancellation_obj); - ZEND_ASYNC_EVENT_SET_RESULT_USED(cancellation_event); - ZEND_ASYNC_EVENT_SET_EXC_CAUGHT(cancellation_event); - } - - zend_coroutine_t *current_coroutine = ZEND_ASYNC_CURRENT_COROUTINE; - if (UNEXPECTED(current_coroutine == NULL)) { + if (UNEXPECTED(awaiter == NULL || zend_scope == NULL || ZEND_ASYNC_SCOPE_IS_CLOSED(zend_scope))) { return; } - async_scope_object_t *scope_object = THIS_SCOPE; - if (UNEXPECTED(scope_object->scope == NULL || ZEND_ASYNC_SCOPE_IS_CLOSED(&scope_object->scope->scope))) { - return; - } - - if (false == ZEND_ASYNC_SCOPE_IS_CANCELLED(&scope_object->scope->scope)) { - async_throw_error("Attempt to await a Scope that has not been cancelled"); - } + async_scope_t *scope = (async_scope_t *) zend_scope; - // Check for deadlock: current coroutine belongs to this scope or its children - if (async_scope_contains_coroutine(scope_object->scope, current_coroutine, 0)) { + // Deadlock guard: the awaiter must not belong to this scope or its children. + if (async_scope_contains_coroutine(scope, awaiter, 0)) { async_throw_error( "Cannot await completion of scope from a coroutine that belongs to the same scope or its children"); - RETURN_THROWS(); + return; } if (UNEXPECTED(EG(exception))) { - RETURN_THROWS(); + return; } - // Check if scope is already finished (no active coroutines and no child scopes) - if (scope_object->scope->coroutines.length == 0 && scope_object->scope->scope.scopes.length == 0) { + // Already drained — no active coroutines and no child scopes. + if (scope->coroutines.length == 0 && scope->scope.scopes.length == 0) { return; } - ZEND_ASYNC_WAKER_NEW(current_coroutine); + ZEND_ASYNC_WAKER_NEW(awaiter); if (UNEXPECTED(EG(exception))) { - RETURN_THROWS(); + return; } - // We need to create a custom callback to handle errors coming from coroutines. + // Resumes only when COMPLETELY_DONE; routes any child error through the handler. scope_coroutine_callback_t *scope_callback = (scope_coroutine_callback_t *) zend_async_coroutine_callback_new( - current_coroutine, callback_resolve_when_zombie_completed, sizeof(scope_coroutine_callback_t)); + awaiter, callback_resolve_when_zombie_completed, sizeof(scope_coroutine_callback_t)); if (UNEXPECTED(scope_callback == NULL)) { - ZEND_ASYNC_WAKER_DESTROY(current_coroutine); - RETURN_THROWS(); + ZEND_ASYNC_WAKER_DESTROY(awaiter); + return; } - if (error_handler_fci.size != 0) { - scope_callback->error_fci = &error_handler_fci; - scope_callback->error_fci_cache = &error_handler_fcc; + if (error_fci != NULL && error_fci->size != 0) { + scope_callback->error_fci = error_fci; + scope_callback->error_fci_cache = error_fci_cache; } else { scope_callback->error_fci = NULL; scope_callback->error_fci_cache = NULL; } - if (UNEXPECTED(!zend_async_resume_when(current_coroutine, &scope_object->scope->scope.event, false, NULL, - &scope_callback->callback))) { - ZEND_ASYNC_WAKER_DESTROY(current_coroutine); - RETURN_THROWS(); + if (UNEXPECTED(!zend_async_resume_when( + awaiter, &zend_scope->event, false, NULL, &scope_callback->callback))) { + ZEND_ASYNC_WAKER_DESTROY(awaiter); + return; } - if (cancellation_obj != NULL) { - zend_async_resume_when(current_coroutine, - ZEND_ASYNC_OBJECT_TO_EVENT(cancellation_obj), - false, - zend_async_waker_callback_cancel, - NULL); + if (cancellation != NULL) { + zend_async_resume_when(awaiter, cancellation, false, zend_async_waker_callback_cancel, NULL); if (UNEXPECTED(EG(exception))) { - zend_async_waker_clean(current_coroutine); - RETURN_THROWS(); + zend_async_waker_clean(awaiter); + return; } } ZEND_ASYNC_SUSPEND(); - zend_async_waker_clean(current_coroutine); + zend_async_waker_clean(awaiter); +} + +METHOD(awaitAfterCancellation) +{ + zend_fcall_info error_handler_fci = { 0 }; + zend_fcall_info_cache error_handler_fcc = { 0 }; + zend_object *cancellation_obj = NULL; + + ZEND_PARSE_PARAMETERS_START(0, 2) + Z_PARAM_OPTIONAL + Z_PARAM_FUNC_OR_NULL(error_handler_fci, error_handler_fcc) + Z_PARAM_OBJ_OR_NULL(cancellation_obj) + ZEND_PARSE_PARAMETERS_END(); + + // Mark cancellation token as used immediately, before any early returns + zend_async_event_t *cancellation_event = NULL; + if (cancellation_obj != NULL) { + cancellation_event = ZEND_ASYNC_OBJECT_TO_EVENT(cancellation_obj); + ZEND_ASYNC_EVENT_SET_RESULT_USED(cancellation_event); + ZEND_ASYNC_EVENT_SET_EXC_CAUGHT(cancellation_event); + } + + zend_coroutine_t *current_coroutine = ZEND_ASYNC_CURRENT_COROUTINE; + if (UNEXPECTED(current_coroutine == NULL)) { + return; + } + + async_scope_object_t *scope_object = THIS_SCOPE; + if (UNEXPECTED(scope_object->scope == NULL || ZEND_ASYNC_SCOPE_IS_CLOSED(&scope_object->scope->scope))) { + return; + } + + if (false == ZEND_ASYNC_SCOPE_IS_CANCELLED(&scope_object->scope->scope)) { + async_throw_error("Attempt to await a Scope that has not been cancelled"); + RETURN_THROWS(); + } + + async_scope_await_after_cancellation( + &scope_object->scope->scope, current_coroutine, + error_handler_fci.size != 0 ? &error_handler_fci : NULL, + error_handler_fci.size != 0 ? &error_handler_fcc : NULL, + cancellation_event); } METHOD(isFinished) diff --git a/scope.h b/scope.h index baed803..7a02dc0 100644 --- a/scope.h +++ b/scope.h @@ -83,6 +83,12 @@ void async_register_scope_ce(void); /* Check if coroutine belongs to this scope or any of its child scopes */ bool async_scope_contains_coroutine(async_scope_t *scope, zend_coroutine_t *coroutine, uint32_t depth); +/* C core of Scope::awaitAfterCancellation (see scope.c). */ +void async_scope_await_after_cancellation( + zend_async_scope_t *scope, zend_coroutine_t *awaiter, + zend_fcall_info *error_fci, zend_fcall_info_cache *error_fci_cache, + zend_async_event_t *cancellation); + void async_scope_notify_coroutine_finished(async_coroutine_t *coroutine); /* Mark coroutine as zombie and update active count */ diff --git a/tests/thread_pool/065-task_scope_nursery_no_uaf.phpt b/tests/thread_pool/065-task_scope_nursery_no_uaf.phpt new file mode 100644 index 0000000..e174592 --- /dev/null +++ b/tests/thread_pool/065-task_scope_nursery_no_uaf.phpt @@ -0,0 +1,46 @@ +--TEST-- +ThreadPool: a coroutine spawned but never awaited inside a sync task cannot outlive the per-task snapshot (UAF regression) +--SKIPIF-- + +--FILE-- +submit(function () { + // Spawned, never awaited: still pending when the task body returns, so the + // worker must cancel and drain it before freeing this task's snapshot. + spawn(function () { delay(10000); }); + return 'task-done'; +}); + +var_dump(await($f)); + +$pool->close(); +echo "ok\n"; +--EXPECT-- +string(9) "task-done" +ok diff --git a/tests/thread_pool/066-task_fatal_rejects_future.phpt b/tests/thread_pool/066-task_fatal_rejects_future.phpt new file mode 100644 index 0000000..cc4a3b8 --- /dev/null +++ b/tests/thread_pool/066-task_fatal_rejects_future.phpt @@ -0,0 +1,45 @@ +--TEST-- +ThreadPool: a fatal (OOM) in a sync task rejects its future with ThreadTransferException (no hang/UAF/leak) +--SKIPIF-- + +--INI-- +memory_limit=64M +--FILE-- +submit(function () { + $s = str_repeat('x', 500 * 1024 * 1024); // exceeds memory_limit -> bailout + return strlen($s); +}); + +try { + var_dump(await($f)); +} catch (\Throwable $e) { + printf("%s: %s\n", get_class($e), + str_contains($e->getMessage(), 'memory size') ? 'memory exhausted' : 'other'); +} + +echo "done\n"; +--EXPECTF-- +%AAsync\ThreadTransferException: memory exhausted +done diff --git a/tests/thread_pool/067-coroutine_task_fatal_no_trigger_leak.phpt b/tests/thread_pool/067-coroutine_task_fatal_no_trigger_leak.phpt new file mode 100644 index 0000000..f923240 --- /dev/null +++ b/tests/thread_pool/067-coroutine_task_fatal_no_trigger_leak.phpt @@ -0,0 +1,43 @@ +--TEST-- +ThreadPool: a fatal in a coroutine-mode task delivers the cause and disposes the worker's channel trigger (no libuv loop leak) +--SKIPIF-- + +--INI-- +memory_limit=64M +--FILE-- +submit(function () { + $s = str_repeat('x', 500 * 1024 * 1024); // exceeds memory_limit -> bailout + return strlen($s); +}); + +try { + var_dump(await($f)); +} catch (\Throwable $e) { + printf("%s: %s\n", get_class($e), + str_contains($e->getMessage(), 'memory size') ? 'memory exhausted' : 'other'); +} +echo "done\n"; +--EXPECTF-- +%AAsync\ThreadTransferException: memory exhausted +done diff --git a/tests/thread_pool/068-concurrency_task_fatal_no_slot_leak.phpt b/tests/thread_pool/068-concurrency_task_fatal_no_slot_leak.phpt new file mode 100644 index 0000000..fe79ceb --- /dev/null +++ b/tests/thread_pool/068-concurrency_task_fatal_no_slot_leak.phpt @@ -0,0 +1,39 @@ +--TEST-- +ThreadPool: a fatal with a concurrency limit disposes the worker's slot_event trigger (no libuv loop leak) +--SKIPIF-- + +--INI-- +memory_limit=64M +--FILE-- + 0 the worker parks on its slot_event trigger + * at the limit. A fatal in a task longjmps past the worker's `done:` cleanup, + * which disposes slot_event — leaving its open uv_async to block uv_loop_close + * and leak the libuv loop. The worker's bailout handler now disposes slot_event + * (no-leak verified by LeakSanitizer). Also asserts the fatal cause reaches the + * awaiter (reject, not a silent null). + */ +use Async\ThreadPool; +use function Async\await; + +$pool = new ThreadPool(1, 0, null, true, 1); // coroutine mode, concurrency = 1 + +$f = $pool->submit(function () { + $s = str_repeat('x', 500 * 1024 * 1024); + return strlen($s); +}); + +try { + var_dump(await($f)); +} catch (\Throwable $e) { + printf("%s: %s\n", get_class($e), + str_contains($e->getMessage(), 'memory size') ? 'memory exhausted' : 'other'); +} +echo "done\n"; +--EXPECTF-- +%AAsync\ThreadTransferException: memory exhausted +done diff --git a/thread.c b/thread.c index 179391d..e5dd913 100644 --- a/thread.c +++ b/thread.c @@ -1604,7 +1604,10 @@ static void thread_copy_callable( thread_release_subgraph_zval(&transferred); break; } - zend_string *pkey = zend_string_dup(key, 1); + /* Private persistent copy: an interned key aliases the parent's + * interned table and dangles after the parent shuts down. */ + zend_string *pkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 1); + GC_MAKE_PERSISTENT_LOCAL(pkey); zend_hash_add(dst->bound_vars, pkey, &transferred); zend_string_release(pkey); } ZEND_HASH_FOREACH_END(); @@ -1645,6 +1648,16 @@ static void thread_copy_callable( */ static void thread_release_closure_copy(thread_release_ctx_t *ctx, async_thread_closure_copy_t *copy) { + /* Release name strings (no-op while interned; the base ref once materialized). */ + if (copy->func != NULL) { + if (copy->func->function_name != NULL) { + zend_string_release(copy->func->function_name); + } + if (copy->func->filename != NULL) { + zend_string_release(copy->func->filename); + } + } + if (copy->bound_vars) { zval *val; ZEND_HASH_FOREACH_VAL(copy->bound_vars, val) { @@ -1692,6 +1705,28 @@ async_thread_snapshot_t *async_thread_snapshot_create(const zend_fcall_t *entry, return snapshot; } +/* Heap-copy the op_array's interned (arena-backed) name strings so holders that + * outlive the arena (closure, PG(last_error_file)) are freed by refcount. */ +static void thread_materialize_op_array_names(zend_op_array *op_array) +{ + if (op_array->function_name != NULL && ZSTR_IS_INTERNED(op_array->function_name)) { + op_array->function_name = zend_string_init( + ZSTR_VAL(op_array->function_name), ZSTR_LEN(op_array->function_name), 0); + } + + if (op_array->filename != NULL && ZSTR_IS_INTERNED(op_array->filename)) { + op_array->filename = zend_string_init( + ZSTR_VAL(op_array->filename), ZSTR_LEN(op_array->filename), 0); + } +} + +void async_thread_snapshot_materialize_entry(async_thread_snapshot_t *snapshot) +{ + if (snapshot != NULL && snapshot->entry.func != NULL) { + thread_materialize_op_array_names(snapshot->entry.func); + } +} + /** * Free snapshot resources. */ diff --git a/thread.h b/thread.h index 08da0aa..95a752f 100644 --- a/thread.h +++ b/thread.h @@ -73,6 +73,10 @@ async_thread_snapshot_t *async_thread_snapshot_create( */ void async_thread_snapshot_destroy(async_thread_snapshot_t *snapshot); +/* Heap-copy the entry op_array's name strings so they outlive the snapshot + * arena. Per-task snapshots only; idempotent. */ +void async_thread_snapshot_materialize_entry(async_thread_snapshot_t *snapshot); + /////////////////////////////////////////////////////////// /// Thread lifecycle — PHP request in child thread /////////////////////////////////////////////////////////// diff --git a/thread_channel.c b/thread_channel.c index 4166264..837a9bd 100644 --- a/thread_channel.c +++ b/thread_channel.c @@ -112,7 +112,26 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) zend_async_resume_when(ZEND_ASYNC_CURRENT_COROUTINE, &trigger->base, false, zend_async_waker_callback_resolve, NULL); - ZEND_ASYNC_SUSPEND(); + + /* A bailout through SUSPEND would skip the dispose paths below and leak the + * trigger (open uv_async blocks uv_loop_close). Catch, dispose, re-raise. */ + bool channel_bailed = false; + zend_try { + ZEND_ASYNC_SUSPEND(); + } zend_catch { + channel_bailed = true; + } zend_end_try(); + + if (UNEXPECTED(channel_bailed)) { + ASYNC_MUTEX_LOCK(ch->mutex); + zend_hash_index_del(&ch->sender_triggers, (zend_ulong)(uintptr_t) trigger); + ASYNC_MUTEX_UNLOCK(ch->mutex); + ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); + async_thread_release_transferred_zval(&persistent_copy); + trigger->base.dispose(&trigger->base); + zend_bailout(); + } + ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); /* Woke up — remove from sender queue */ @@ -182,7 +201,25 @@ static bool thread_channel_receive( zend_async_resume_when(ZEND_ASYNC_CURRENT_COROUTINE, cancellation, false, zend_async_waker_callback_resolve, NULL); } - ZEND_ASYNC_SUSPEND(); + + /* A bailout through SUSPEND would skip the dispose paths below and leak the + * trigger (open uv_async blocks uv_loop_close). Catch, dispose, re-raise. */ + bool channel_bailed = false; + zend_try { + ZEND_ASYNC_SUSPEND(); + } zend_catch { + channel_bailed = true; + } zend_end_try(); + + if (UNEXPECTED(channel_bailed)) { + ASYNC_MUTEX_LOCK(ch->mutex); + zend_hash_index_del(&ch->receiver_triggers, (zend_ulong)(uintptr_t) trigger); + ASYNC_MUTEX_UNLOCK(ch->mutex); + ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); + trigger->base.dispose(&trigger->base); + zend_bailout(); + } + ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); /* Woke up — remove from receiver queue, observe closed state */ diff --git a/thread_pool.c b/thread_pool.c index 0c23cbd..5a4fa4b 100644 --- a/thread_pool.c +++ b/thread_pool.c @@ -158,11 +158,11 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c * this one (see thread_pool_spawn_task_coroutine). Created lazily, only * when coroutine_mode is enabled — sync workers don't need it. */ zend_async_scope_t *pool_scope = NULL; - /* Concurrency slot accounting (only used when pool->concurrency > 0). - * Worker parks on slot_event when at the limit; dispose decrements - * active and fires slot_event to wake it. */ + /* Concurrency accounting (pool->concurrency > 0 only). Worker parks on + * slot_event at the limit. Volatile: assigned in the try, read by the + * bailout handler below, so it must survive the longjmp. */ int32_t active_count = 0; - zend_async_trigger_event_t *slot_event = NULL; + zend_async_trigger_event_t * volatile slot_event = NULL; ZEND_ASSERT(event == NULL); @@ -338,6 +338,9 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c ZVAL_UNDEF(&retval); ZVAL_UNDEF(&callable); + /* Heap-copy op_array names so they outlive the arena on a fatal. */ + async_thread_snapshot_materialize_entry(snapshot); + async_thread_create_closure(&snapshot->entry, &callable); if (UNEXPECTED(EG(exception))) { @@ -433,15 +436,63 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c goto task_cleanup; } - /* A real fatal error longjmps (zend_bailout); exit()/die() instead - * throws an unwind-exit token into EG(exception). Neither may be - * re-raised or passed to reject() — the token can't cross the fiber - * boundary and would crash the worker fiber. */ - volatile bool task_bailed = false; + /* Sync mode: run the body as a coroutine in a per-task nursery scope so + * Async\spawn() inside it lands there. Cancel + drain before freeing the + * snapshot so an un-awaited child can't outlive its arena. */ + zend_coroutine_t *worker_coro = ZEND_ASYNC_CURRENT_COROUTINE; + zend_async_scope_t *task_scope = + worker_coro != NULL ? ZEND_ASYNC_NEW_SCOPE(ZEND_ASYNC_CURRENT_SCOPE) : NULL; + + if (UNEXPECTED(task_scope == NULL)) { + zend_atomic_int_dec(&pool->base.running_count); + zend_atomic_int_inc(&pool->base.completed_count); + if (EG(exception)) { + async_future_shared_state_reject(state, EG(exception)); + zend_clear_exception(); + } + goto task_cleanup; + } + + /* Nursery (NOT-safe): un-awaited children cancelled at exit. Pinned so + * it survives the drain; unpinned before RELEASE. */ + ZEND_ASYNC_SCOPE_CLR_DISPOSE_SAFELY(task_scope); + ZEND_ASYNC_SCOPE_SET_OWNER_PINNED(task_scope); + + zend_coroutine_t *body = ZEND_ASYNC_SPAWN_WITH(task_scope); + if (UNEXPECTED(body == NULL)) { + ZEND_ASYNC_SCOPE_CLR_OWNER_PINNED(task_scope); + ZEND_ASYNC_SCOPE_RELEASE(task_scope); + zend_atomic_int_dec(&pool->base.running_count); + zend_atomic_int_inc(&pool->base.completed_count); + if (EG(exception)) { + async_future_shared_state_reject(state, EG(exception)); + zend_clear_exception(); + } + goto task_cleanup; + } + + /* Hand the call to the body; params ownership moves to it, snapshot + * stays ours to free after the drain. */ + zend_fcall_t *fcall = ecalloc(1, sizeof(zend_fcall_t)); + fcall->fci = fci; + fcall->fci_cache = fcc; + fcall->fci.param_count = param_count; + fcall->fci.params = params; + fcall->fci.retval = &body->result; + Z_TRY_ADDREF(fcall->fci.function_name); + body->fcall = fcall; + params = NULL; + + /* Await the body; its callback copies result/error into our waker. A + * fatal re-raises zend_bailout() out of the coroutine — caught here. */ + bool body_bailed = false; + ZEND_ASYNC_WAKER_NEW(worker_coro); + zend_async_resume_when(worker_coro, &body->event, false, + zend_async_waker_callback_resolve, NULL); zend_try { - zend_call_function(&fci, &fcc); + ZEND_ASYNC_SUSPEND(); } zend_catch { - task_bailed = true; + body_bailed = true; } zend_end_try(); /* Decrement running and bump completed BEFORE notifying the awaiter @@ -450,34 +501,69 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c zend_atomic_int_dec(&pool->base.running_count); zend_atomic_int_inc(&pool->base.completed_count); - const bool task_exited = (EG(exception) != NULL - && (zend_is_unwind_exit(EG(exception)) - || zend_is_graceful_exit(EG(exception)))); - - if (task_exited) { - /* exit()/die() is graceful "this task is done" — the worker's - * request survives it, so resolve the future with null (no return - * value) and keep serving the next task. Never pass the unwind - * token to reject(): it can't cross the fiber to the awaiter. */ - zend_clear_exception(); - zval null_result; - ZVAL_NULL(&null_result); - async_future_shared_state_complete(state, &null_result); - } else if (task_bailed) { - /* A real fatal (e.g. OOM) leaves the worker's request unusable — - * deliver a transfer exception and tear the pool down. */ + if (UNEXPECTED(body_bailed)) { + /* Fatal in the body: reject this task and tear the pool down. */ + zend_async_waker_clean(worker_coro); zend_object *bex = thread_pool_bailout_exception(); async_future_shared_state_reject(state, bex); thread_pool_close(pool); thread_pool_drain_tasks(pool, true, bex); OBJ_RELEASE(bex); + ZEND_ASYNC_SCOPE_CLR_OWNER_PINNED(task_scope); + ZEND_ASYNC_SCOPE_RELEASE(task_scope); + zval_ptr_dtor(&callable); + zval_ptr_dtor(&retval); + async_thread_snapshot_destroy(snapshot); + async_future_shared_state_delref(state); + zval_ptr_dtor(&task); + break; + } + + zend_object *body_error = NULL; + if (worker_coro->waker != NULL && worker_coro->waker->error != NULL) { + body_error = worker_coro->waker->error; + worker_coro->waker->error = NULL; } else if (EG(exception) != NULL) { - async_future_shared_state_reject(state, EG(exception)); + body_error = EG(exception); + GC_ADDREF(body_error); zend_clear_exception(); + } + + if (body_error != NULL) { + async_future_shared_state_reject(state, body_error); + OBJ_RELEASE(body_error); + } else if (worker_coro->waker != NULL + && Z_TYPE(worker_coro->waker->result) != IS_UNDEF) { + async_future_shared_state_complete(state, &worker_coro->waker->result); } else { - async_future_shared_state_complete(state, &retval); + zval null_result; + ZVAL_NULL(&null_result); + async_future_shared_state_complete(state, &null_result); + } + + zend_async_waker_clean(worker_coro); + + /* Cancel + await un-awaited children before freeing the snapshot + * arena that backs their op_arrays. */ + if (!ZEND_ASYNC_SCOPE_IS_CLOSED(task_scope)) { + ZEND_ASYNC_SCOPE_CANCEL(task_scope, NULL, false, false); + ZEND_ASYNC_SCOPE_AWAIT_AFTER_CANCELLATION(task_scope, worker_coro, NULL, NULL, NULL); + if (UNEXPECTED(EG(exception))) { + zend_clear_exception(); + } } + ZEND_ASYNC_SCOPE_CLR_OWNER_PINNED(task_scope); + ZEND_ASYNC_SCOPE_RELEASE(task_scope); + + /* Drop the closure ref and free the snapshot. */ + zval_ptr_dtor(&callable); + zval_ptr_dtor(&retval); + async_thread_snapshot_destroy(snapshot); + async_future_shared_state_delref(state); + zval_ptr_dtor(&task); + continue; + task_cleanup: if (params) { for (uint32_t i = 0; i < param_count; i++) { @@ -539,6 +625,13 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c thread_pool_close(pool); thread_pool_drain_tasks(pool, true, bex); OBJ_RELEASE(bex); + + /* Bailout longjmped past `done:` (which disposes slot_event). Its open + * uv_async would block uv_loop_close — dispose it while reactor is up. */ + if (slot_event != NULL) { + slot_event->base.dispose(&slot_event->base); + slot_event = NULL; + } } /* Release worker's ref on pool */ @@ -584,9 +677,11 @@ static void pool_task_dispose(zend_coroutine_t *coroutine) } else if (Z_TYPE(coroutine->result) != IS_UNDEF) { async_future_shared_state_complete(ctx->state, &coroutine->result); } else { - zval undef; - ZVAL_UNDEF(&undef); - async_future_shared_state_complete(ctx->state, &undef); + /* UNDEF result, no exception = the body bailed out (fatal/OOM/exit). + * Reject with the cause instead of resolving to a silent null. */ + zend_object *bex = thread_pool_bailout_exception(); + async_future_shared_state_reject(ctx->state, bex); + OBJ_RELEASE(bex); } async_future_shared_state_delref(ctx->state);