From 7aefbc97bb6d8e1e2ba8fb9df79e68a7c906118e Mon Sep 17 00:00:00 2001 From: Anthony Pesch Date: Mon, 16 Feb 2026 23:57:37 +0100 Subject: [PATCH 1/2] test / convert new select and poll tests to using clock_gettime with CLOCK_MONOTONIC instead of gettimeofday whose implementation isn't guaranteed to be monotonic --- test/core/test_poll_blocking.c | 43 +++++++++++++--------- test/core/test_poll_blocking_asyncify.c | 33 +++++++++++------ test/core/test_select_blocking.c | 47 +++++++++++++++---------- 3 files changed, 77 insertions(+), 46 deletions(-) diff --git a/test/core/test_poll_blocking.c b/test/core/test_poll_blocking.c index bdbd6b1ee1571..680b770a3e673 100644 --- a/test/core/test_poll_blocking.c +++ b/test/core/test_poll_blocking.c @@ -22,21 +22,30 @@ void sleep_ms(int ms) { usleep(ms * 1000); } -int64_t timeval_delta_ms(struct timeval* begin, struct timeval* end) { - int64_t delta_s = end->tv_sec - begin->tv_sec; - int64_t delta_us = end->tv_usec - begin->tv_usec; - assert(delta_s >= 0); - return (delta_s * 1000) + (delta_us / 1000); +int64_t timeval_delta_ms(struct timespec* begin, struct timespec* end) { + struct timespec diff = { + .tv_sec = end->tv_sec - begin->tv_sec, + .tv_nsec = end->tv_nsec - begin->tv_nsec + }; + + if (diff.tv_nsec < 0) { + diff.tv_nsec += 1000000000; + diff.tv_sec -= 1; + } + + assert(diff.tv_sec >= 0); + + return (diff.tv_sec * 1000) + (diff.tv_nsec / 1000000); } // Check if timeout works without fds void test_timeout_without_fds() { printf("test_timeout_without_fds\n"); - struct timeval begin, end; + struct timespec begin, end; - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(poll(NULL, 0, TIMEOUT_MS) == 0); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); int64_t duration = timeval_delta_ms(&begin, &end); printf(" -> duration: %lld ms\n", duration); @@ -46,15 +55,15 @@ void test_timeout_without_fds() { // Check if timeout works with fds without events void test_timeout_with_fds_without_events() { printf("test_timeout_with_fds_without_events\n"); - struct timeval begin, end; + struct timespec begin, end; int pipe_a[2]; assert(pipe(pipe_a) == 0); - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); struct pollfd fds = {pipe_a[0], 0, 0}; assert(poll(&fds, 1, TIMEOUT_MS) == 0); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); int64_t duration = timeval_delta_ms(&begin, &end); printf(" -> duration: %lld ms\n", duration); @@ -77,7 +86,7 @@ void *write_after_sleep(void * arg) { // Check if poll can unblock on an event void test_unblock_poll() { printf("test_unblock_poll\n"); - struct timeval begin, end; + struct timespec begin, end; pthread_t tid; int pipe_a[2]; @@ -88,10 +97,10 @@ void test_unblock_poll() { {pipe_a[0], POLLIN, 0}, {pipe_shared[0], POLLIN, 0}, }; - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(pthread_create(&tid, NULL, write_after_sleep, NULL) == 0); assert(poll(fds, 2, -1) == 1); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); assert(fds[1].revents & POLLIN); int64_t duration = timeval_delta_ms(&begin, &end); @@ -105,12 +114,12 @@ void test_unblock_poll() { } void *do_poll_in_thread(void * arg) { - struct timeval begin, end; + struct timespec begin, end; - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); struct pollfd fds = {pipe_shared[0], POLLIN, 0}; assert(poll(&fds, 1, 4000) == 1); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); assert(fds.revents & POLLIN); int64_t duration = timeval_delta_ms(&begin, &end); diff --git a/test/core/test_poll_blocking_asyncify.c b/test/core/test_poll_blocking_asyncify.c index b4d9bf6c735a5..6adcd427f9ff9 100644 --- a/test/core/test_poll_blocking_asyncify.c +++ b/test/core/test_poll_blocking_asyncify.c @@ -19,21 +19,31 @@ #include -int64_t timeval_delta_ms(struct timeval* begin, struct timeval* end) { - int64_t delta_s = end->tv_sec - begin->tv_sec; - int64_t delta_us = end->tv_usec - begin->tv_usec; - assert(delta_s >= 0); - return (delta_s * 1000) + (delta_us / 1000); +int64_t timeval_delta_ms(struct timespec* begin, struct timespec* end) { + struct timespec diff = { + .tv_sec = end->tv_sec - begin->tv_sec, + .tv_nsec = end->tv_nsec - begin->tv_nsec + }; + + if (diff.tv_nsec < 0) { + diff.tv_nsec += 1000000000; + diff.tv_sec -= 1; + } + + assert(diff.tv_sec >= 0); + + return (diff.tv_sec * 1000) + (diff.tv_nsec / 1000000); } // Check if timeout works without fds void test_timeout_without_fds() { printf("test_timeout_without_fds\n"); - struct timeval begin, end; + struct timespec begin = {0}; + struct timespec end = {0}; - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(poll(NULL, 0, 1000) == 0); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); int64_t duration = timeval_delta_ms(&begin, &end); printf(" -> duration: %lld ms\n", duration); @@ -51,7 +61,8 @@ void write_to_pipe(void * arg) { // Check if poll can unblock on an event void test_unblock_poll() { printf("test_unblock_poll\n"); - struct timeval begin, end; + struct timespec begin = {0}; + struct timespec end = {0}; int pipe_a[2]; assert(pipe(pipe_a) == 0); @@ -62,9 +73,9 @@ void test_unblock_poll() { {pipe_shared[0], POLLIN, 0}, }; emscripten_set_timeout(write_to_pipe, 1000, NULL); - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(poll(fds, 2, -1) == 1); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); assert(fds[1].revents & POLLIN); int64_t duration = timeval_delta_ms(&begin, &end); diff --git a/test/core/test_select_blocking.c b/test/core/test_select_blocking.c index ddedab1aee851..8b422bedf2c3a 100644 --- a/test/core/test_select_blocking.c +++ b/test/core/test_select_blocking.c @@ -19,23 +19,33 @@ void sleep_ms(int ms) { usleep(ms * 1000); } -int64_t timeval_delta_ms(struct timeval* begin, struct timeval* end) { - int64_t delta_s = end->tv_sec - begin->tv_sec; - int64_t delta_us = end->tv_usec - begin->tv_usec; - assert(delta_s >= 0); - return (delta_s * 1000) + (delta_us / 1000); +int64_t timeval_delta_ms(struct timespec* begin, struct timespec* end) { + struct timespec diff = { + .tv_sec = end->tv_sec - begin->tv_sec, + .tv_nsec = end->tv_nsec - begin->tv_nsec + }; + + if (diff.tv_nsec < 0) { + diff.tv_nsec += 1000000000; + diff.tv_sec -= 1; + } + + assert(diff.tv_sec >= 0); + + return (diff.tv_sec * 1000) + (diff.tv_nsec / 1000000); } // Check if timeout works without fds void test_timeout_without_fds() { printf("test_timeout_without_fds\n"); - struct timeval tv, begin, end; + struct timespec begin, end; + struct timeval tv; tv.tv_sec = 0; tv.tv_usec = TIMEOUT_MS * 1000; - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(select(0, NULL, NULL, NULL, &tv) == 0); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); int64_t duration = timeval_delta_ms(&begin, &end); printf(" -> duration: %lld ms\n", duration); @@ -45,7 +55,8 @@ void test_timeout_without_fds() { // Check if timeout works with fds without events void test_timeout_with_fds_without_events() { printf("test_timeout_with_fds_without_events\n"); - struct timeval tv, begin, end; + struct timespec begin, end; + struct timeval tv; fd_set readfds; int pipe_a[2]; @@ -55,9 +66,9 @@ void test_timeout_with_fds_without_events() { tv.tv_usec = TIMEOUT_MS * 1000; FD_ZERO(&readfds); FD_SET(pipe_a[0], &readfds); - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(select(pipe_a[0] + 1, &readfds, NULL, NULL, &tv) == 0); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); int64_t duration = timeval_delta_ms(&begin, &end); printf(" -> duration: %lld ms\n", duration); @@ -80,7 +91,7 @@ void *write_after_sleep(void * arg) { // Check if select can unblock on an event void test_unblock_select() { printf("test_unblock_select\n"); - struct timeval begin, end; + struct timespec begin, end; fd_set readfds; pthread_t tid; int pipe_a[2]; @@ -92,10 +103,10 @@ void test_unblock_select() { FD_SET(pipe_a[0], &readfds); FD_SET(pipe_shared[0], &readfds); int maxfd = (pipe_a[0] > pipe_shared[0] ? pipe_a[0] : pipe_shared[0]); - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(pthread_create(&tid, NULL, write_after_sleep, NULL) == 0); assert(select(maxfd + 1, &readfds, NULL, NULL, NULL) == 1); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); assert(FD_ISSET(pipe_shared[0], &readfds)); int64_t duration = timeval_delta_ms(&begin, &end); @@ -109,9 +120,9 @@ void test_unblock_select() { } void *do_select_in_thread(void * arg) { - struct timeval begin, end; - fd_set readfds; + struct timespec begin, end; struct timeval tv; + fd_set readfds; tv.tv_sec = 4; tv.tv_usec = 0; @@ -119,9 +130,9 @@ void *do_select_in_thread(void * arg) { FD_SET(pipe_shared[0], &readfds); int maxfd = pipe_shared[0]; - gettimeofday(&begin, NULL); + clock_gettime(CLOCK_MONOTONIC, &begin); assert(select(maxfd + 1, &readfds, NULL, NULL, &tv) == 1); - gettimeofday(&end, NULL); + clock_gettime(CLOCK_MONOTONIC, &end); assert(FD_ISSET(pipe_shared[0], &readfds)); int64_t duration = timeval_delta_ms(&begin, &end); From 6172f0de67ebddf2acc8aa799539bb6fdd33a732 Mon Sep 17 00:00:00 2001 From: Anthony Pesch Date: Thu, 18 Dec 2025 17:06:25 +0100 Subject: [PATCH 2/2] wip async poll for websockets --- src/lib/libpipefs.js | 23 ++++--- src/lib/libsockfs.js | 21 +++++-- src/lib/libsyscall.js | 138 +++++++++++++++++------------------------- 3 files changed, 85 insertions(+), 97 deletions(-) diff --git a/src/lib/libpipefs.js b/src/lib/libpipefs.js index 7de0c8817f5f2..db030d2821048 100644 --- a/src/lib/libpipefs.js +++ b/src/lib/libpipefs.js @@ -23,19 +23,18 @@ addToLibrary({ timestamp: new Date(), #if PTHREADS || ASYNCIFY readableHandlers: [], - registerReadableHandler: (callback) => { - callback.registerCleanupFunc(() => { - const i = pipe.readableHandlers.indexOf(callback); - if (i !== -1) pipe.readableHandlers.splice(i, 1); - }); - pipe.readableHandlers.push(callback); + registerReadableHandler: (events, wake) => { + pipe.readableHandlers.push({ events, wake }); }, notifyReadableHandlers: () => { - while (pipe.readableHandlers.length > 0) { - const cb = pipe.readableHandlers.shift(); - if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); + for (let i = pipe.readableHandlers.length - 1; i >= 0; i--) { + const { events, wake } = pipe.readableHandlers[i]; + + if (events & ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}})) { + pipe.readableHandlers.splice(i, 1); + wake(); + } } - pipe.readableHandlers = []; } #endif }; @@ -97,7 +96,7 @@ addToLibrary({ blocks: 0, }; }, - poll(stream, timeout, notifyCallback) { + poll(stream, events, wake) { var pipe = stream.node.pipe; if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) { @@ -110,7 +109,7 @@ addToLibrary({ } #if PTHREADS || ASYNCIFY - if (notifyCallback) pipe.registerReadableHandler(notifyCallback); + if (wake) pipe.registerReadableHandler(events, wake); #endif return 0; }, diff --git a/src/lib/libsockfs.js b/src/lib/libsockfs.js index 01d6f831da2bf..39f29850772d5 100644 --- a/src/lib/libsockfs.js +++ b/src/lib/libsockfs.js @@ -68,6 +68,7 @@ addToLibrary({ peers: {}, pending: [], recv_queue: [], + wake_queue: [], #if SOCKET_WEBRTC #else sock_ops: SOCKFS.websocket_sock_ops @@ -104,9 +105,9 @@ addToLibrary({ }, // node and stream ops are backend agnostic stream_ops: { - poll(stream) { + poll(stream, events, wake) { var sock = stream.node.sock; - return sock.sock_ops.poll(sock); + return sock.sock_ops.poll(sock, events, wake); }, ioctl(stream, request, varargs) { var sock = stream.node.sock; @@ -336,6 +337,15 @@ addToLibrary({ sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data }); SOCKFS.emit('message', sock.stream.fd); + + for (let i = sock.wake_queue.length - 1; i >= 0; i--) { + const { events, wake } = sock.wake_queue[i]; + + if (events & ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}})) { + sock.wake_queue.splice(i, 1); + wake(); + } + } }; if (ENVIRONMENT_IS_NODE) { @@ -378,20 +388,23 @@ addToLibrary({ // // actual sock ops // - poll(sock) { + poll(sock, events, wake) { if (sock.type === {{{ cDefs.SOCK_STREAM }}} && sock.server) { // listen sockets should only say they're available for reading // if there are pending clients. return sock.pending.length ? ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}) : 0; } + if (wake) { + sock.wake_queue.push({ events, wake }); + } + var mask = 0; var dest = sock.type === {{{ cDefs.SOCK_STREAM }}} ? // we only care about the socket state for connection-based sockets SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) : null; if (sock.recv_queue.length || - !dest || // connection-less sockets are always ready to read (dest && dest.socket.readyState === dest.socket.CLOSING) || (dest && dest.socket.readyState === dest.socket.CLOSED)) { // let recv return 0 once closed mask |= ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); diff --git a/src/lib/libsyscall.js b/src/lib/libsyscall.js index d7cf372e7e3ef..3ba6cee57f7c8 100644 --- a/src/lib/libsyscall.js +++ b/src/lib/libsyscall.js @@ -554,102 +554,78 @@ var SyscallsLibrary = { __syscall_poll__proxy: 'sync', __syscall_poll__async: 'auto', __syscall_poll: (fds, nfds, timeout) => { -#if PTHREADS || ASYNCIFY -#if PTHREADS - const isAsyncContext = PThread.currentProxiedOperationCallerThread; -#else - const isAsyncContext = true; -#endif // Enable event handlers only when the poll call is proxied from a worker. // TODO: Could use `Promise.withResolvers` here if we know its available. - var resolve; - var promise = new Promise((resolve_) => { resolve = resolve_; }); - var cleanupFuncs = []; - var notifyDone = false; - function asyncPollComplete(count) { - if (notifyDone) { - return; - } - notifyDone = true; -#if RUNTIME_DEBUG - dbg('asyncPollComplete', count); -#endif - cleanupFuncs.forEach(cb => cb()); - resolve(count); - } - function makeNotifyCallback(stream, pollfd) { - var cb = (flags) => { - if (notifyDone) { - return; + function dopoll(wakepoll) { + let count = 0; + + for (let i = 0; i < nfds; i++) { + let pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i; + let fd = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.fd, 'i32') }}}; + let events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}}; + let flags = {{{ cDefs.POLLNVAL }}}; + let stream = FS.getStream(fd); + if (stream) { + if (stream.stream_ops.poll) { + flags = stream.stream_ops.poll(stream, events, wakepoll); + } else { + flags = {{{ cDefs.POLLIN | cDefs.POLLOUT }}}; + } } -#if RUNTIME_DEBUG - dbg(`async poll notify: stream=${stream}`); -#endif - var events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}}; flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}}; -#if ASSERTIONS - assert(flags) -#endif + if (flags) count++; {{{ makeSetValue('pollfd', C_STRUCTS.pollfd.revents, 'flags', 'i16') }}}; - asyncPollComplete(1); - } - cb.registerCleanupFunc = (f) => { - if (f) cleanupFuncs.push(f); } - return cb; - } - if (isAsyncContext) { -#if RUNTIME_DEBUG - dbg('async poll start'); -#endif - if (timeout > 0) { - var t = setTimeout(() => { -#if RUNTIME_DEBUG - dbg('poll: timeout', timeout); -#endif - asyncPollComplete(0); - }, timeout); - cleanupFuncs.push(() => clearTimeout(t)); - } - } -#endif + return count; + }; - var count = 0; - for (var i = 0; i < nfds; i++) { - var pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i; - var fd = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.fd, 'i32') }}}; - var events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}}; - var flags = {{{ cDefs.POLLNVAL }}}; - var stream = FS.getStream(fd); - if (stream) { - if (stream.stream_ops.poll) { #if PTHREADS || ASYNCIFY - if (isAsyncContext && timeout) { - flags = stream.stream_ops.poll(stream, timeout, makeNotifyCallback(stream, pollfd)); - } else +#if PTHREADS + const isAsyncContext = PThread.currentProxiedOperationCallerThread; +#else + const isAsyncContext = true; #endif - flags = stream.stream_ops.poll(stream, -1); - } else { - flags = {{{ cDefs.POLLIN | cDefs.POLLOUT }}}; - } - } - flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}}; - if (flags) count++; - {{{ makeSetValue('pollfd', C_STRUCTS.pollfd.revents, 'flags', 'i16') }}}; - } - -#if PTHREADS || ASYNCIFY if (isAsyncContext) { - if (count || !timeout) { - asyncPollComplete(count); - } - return promise; + return new Promise((resolve) => { + const count = dopoll(() => { + if (awakened) { + return; + } + + awakened = true; + + // cancel timeout timer + if (timer) { + clearTimeout(timer); + timer = null; + } + + // give the event loop a chance to service other streams before polling again + setTimeout(() => { + resolve(dopoll(null)); + }, 0); + }); + let awakened = false; + let timer = null; + + if (count) { + resolve(count); + } else if (timeout >= 0) { + timer = setTimeout(() => { +#if RUNTIME_DEBUG + dbg('poll: timeout', timeout); +#endif + resolve(0); + }, timeout); + } + }); } #endif + const count = dopoll(null); #if ASSERTIONS - if (!count && timeout != 0) warnOnce('non-zero poll() timeout not supported: ' + timeout) + if (!count && timeout) warnOnce('non-zero poll() timeout not supported in synchronous contexts'); #endif return count; },