Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions src/lib/libpipefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ addToLibrary({
timestamp: new Date(),
#if PTHREADS || ASYNCIFY
readableHandlers: [],
registerReadableHandler: (callback) => {
callback.registerCleanupFunc(() => {
const i = pipe.readableHandlers.indexOf(callback);
if (i !== -1) pipe.readableHandlers.splice(i, 1);
});
pipe.readableHandlers.push(callback);
registerReadableHandler: (events, wake) => {
pipe.readableHandlers.push({ events, wake });
},
notifyReadableHandlers: () => {
while (pipe.readableHandlers.length > 0) {
const cb = pipe.readableHandlers.shift();
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
for (let i = pipe.readableHandlers.length - 1; i >= 0; i--) {
const { events, wake } = pipe.readableHandlers[i];

if (events & ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}})) {
pipe.readableHandlers.splice(i, 1);
wake();
}
}
pipe.readableHandlers = [];
}
#endif
};
Expand Down Expand Up @@ -97,7 +96,7 @@ addToLibrary({
blocks: 0,
};
},
poll(stream, timeout, notifyCallback) {
poll(stream, events, wake) {
var pipe = stream.node.pipe;

if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
Expand All @@ -110,7 +109,7 @@ addToLibrary({
}

#if PTHREADS || ASYNCIFY
if (notifyCallback) pipe.registerReadableHandler(notifyCallback);
if (wake) pipe.registerReadableHandler(events, wake);
#endif
return 0;
},
Expand Down
21 changes: 17 additions & 4 deletions src/lib/libsockfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ addToLibrary({
peers: {},
pending: [],
recv_queue: [],
wake_queue: [],
#if SOCKET_WEBRTC
#else
sock_ops: SOCKFS.websocket_sock_ops
Expand Down Expand Up @@ -104,9 +105,9 @@ addToLibrary({
},
// node and stream ops are backend agnostic
stream_ops: {
poll(stream) {
poll(stream, events, wake) {
var sock = stream.node.sock;
return sock.sock_ops.poll(sock);
return sock.sock_ops.poll(sock, events, wake);
},
ioctl(stream, request, varargs) {
var sock = stream.node.sock;
Expand Down Expand Up @@ -336,6 +337,15 @@ addToLibrary({

sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data });
SOCKFS.emit('message', sock.stream.fd);

for (let i = sock.wake_queue.length - 1; i >= 0; i--) {
const { events, wake } = sock.wake_queue[i];

if (events & ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}})) {
sock.wake_queue.splice(i, 1);
wake();
}
}
};

if (ENVIRONMENT_IS_NODE) {
Expand Down Expand Up @@ -378,20 +388,23 @@ addToLibrary({
//
// actual sock ops
//
poll(sock) {
poll(sock, events, wake) {
if (sock.type === {{{ cDefs.SOCK_STREAM }}} && sock.server) {
// listen sockets should only say they're available for reading
// if there are pending clients.
return sock.pending.length ? ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}) : 0;
}

if (wake) {
sock.wake_queue.push({ events, wake });
}

var mask = 0;
var dest = sock.type === {{{ cDefs.SOCK_STREAM }}} ? // we only care about the socket state for connection-based sockets
SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) :
null;

if (sock.recv_queue.length ||
!dest || // connection-less sockets are always ready to read
(dest && dest.socket.readyState === dest.socket.CLOSING) ||
(dest && dest.socket.readyState === dest.socket.CLOSED)) { // let recv return 0 once closed
mask |= ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
Expand Down
138 changes: 57 additions & 81 deletions src/lib/libsyscall.js
Original file line number Diff line number Diff line change
Expand Up @@ -554,102 +554,78 @@ var SyscallsLibrary = {
__syscall_poll__proxy: 'sync',
__syscall_poll__async: 'auto',
__syscall_poll: (fds, nfds, timeout) => {
#if PTHREADS || ASYNCIFY
#if PTHREADS
const isAsyncContext = PThread.currentProxiedOperationCallerThread;
#else
const isAsyncContext = true;
#endif
// Enable event handlers only when the poll call is proxied from a worker.
// TODO: Could use `Promise.withResolvers` here if we know its available.
var resolve;
var promise = new Promise((resolve_) => { resolve = resolve_; });
var cleanupFuncs = [];
var notifyDone = false;
function asyncPollComplete(count) {
if (notifyDone) {
return;
}
notifyDone = true;
#if RUNTIME_DEBUG
dbg('asyncPollComplete', count);
#endif
cleanupFuncs.forEach(cb => cb());
resolve(count);
}
function makeNotifyCallback(stream, pollfd) {
var cb = (flags) => {
if (notifyDone) {
return;
function dopoll(wakepoll) {
let count = 0;

for (let i = 0; i < nfds; i++) {
let pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i;
let fd = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.fd, 'i32') }}};
let events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}};
let flags = {{{ cDefs.POLLNVAL }}};
let stream = FS.getStream(fd);
if (stream) {
if (stream.stream_ops.poll) {
flags = stream.stream_ops.poll(stream, events, wakepoll);
} else {
flags = {{{ cDefs.POLLIN | cDefs.POLLOUT }}};
}
}
#if RUNTIME_DEBUG
dbg(`async poll notify: stream=${stream}`);
#endif
var events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}};
flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}};
#if ASSERTIONS
assert(flags)
#endif
if (flags) count++;
{{{ makeSetValue('pollfd', C_STRUCTS.pollfd.revents, 'flags', 'i16') }}};
asyncPollComplete(1);
}
cb.registerCleanupFunc = (f) => {
if (f) cleanupFuncs.push(f);
}
return cb;
}

if (isAsyncContext) {
#if RUNTIME_DEBUG
dbg('async poll start');
#endif
if (timeout > 0) {
var t = setTimeout(() => {
#if RUNTIME_DEBUG
dbg('poll: timeout', timeout);
#endif
asyncPollComplete(0);
}, timeout);
cleanupFuncs.push(() => clearTimeout(t));
}
}
#endif
return count;
};

var count = 0;
for (var i = 0; i < nfds; i++) {
var pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i;
var fd = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.fd, 'i32') }}};
var events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}};
var flags = {{{ cDefs.POLLNVAL }}};
var stream = FS.getStream(fd);
if (stream) {
if (stream.stream_ops.poll) {
#if PTHREADS || ASYNCIFY
if (isAsyncContext && timeout) {
flags = stream.stream_ops.poll(stream, timeout, makeNotifyCallback(stream, pollfd));
} else
#if PTHREADS
const isAsyncContext = PThread.currentProxiedOperationCallerThread;
#else
const isAsyncContext = true;
#endif
flags = stream.stream_ops.poll(stream, -1);
} else {
flags = {{{ cDefs.POLLIN | cDefs.POLLOUT }}};
}
}
flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}};
if (flags) count++;
{{{ makeSetValue('pollfd', C_STRUCTS.pollfd.revents, 'flags', 'i16') }}};
}

#if PTHREADS || ASYNCIFY
if (isAsyncContext) {
if (count || !timeout) {
asyncPollComplete(count);
}
return promise;
return new Promise((resolve) => {
const count = dopoll(() => {
if (awakened) {
return;
}

awakened = true;

// cancel timeout timer
if (timer) {
clearTimeout(timer);
timer = null;
}

// give the event loop a chance to service other streams before polling again
setTimeout(() => {
resolve(dopoll(null));
}, 0);
});
let awakened = false;
let timer = null;

if (count) {
resolve(count);
} else if (timeout >= 0) {
timer = setTimeout(() => {
#if RUNTIME_DEBUG
dbg('poll: timeout', timeout);
#endif
resolve(0);
}, timeout);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I love the timeout = 0x7fffffff; thing above.. and one downside is that it means we end up setting this timeout here even for the "infinite timeout" case.

i.e. i think it would be nice to skip this timeout completely in that case.

Does timeout = 0x7fffffff; really make things easier in other places?

Copy link
Collaborator Author

@inolen inolen Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code, it doesn't matter now. I had added that while shifting the code around trying to get the async code into the single #ifdef / Promise constructor, but didn't pay attention to its value after the dust settled.

}
});
}
#endif

const count = dopoll(null);
#if ASSERTIONS
if (!count && timeout != 0) warnOnce('non-zero poll() timeout not supported: ' + timeout)
if (!count && timeout) warnOnce('non-zero poll() timeout not supported in synchronous contexts');
#endif
return count;
},
Expand Down
43 changes: 26 additions & 17 deletions test/core/test_poll_blocking.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,30 @@ void sleep_ms(int ms) {
usleep(ms * 1000);
}

int64_t timeval_delta_ms(struct timeval* begin, struct timeval* end) {
int64_t delta_s = end->tv_sec - begin->tv_sec;
int64_t delta_us = end->tv_usec - begin->tv_usec;
assert(delta_s >= 0);
return (delta_s * 1000) + (delta_us / 1000);
int64_t timeval_delta_ms(struct timespec* begin, struct timespec* end) {
struct timespec diff = {
.tv_sec = end->tv_sec - begin->tv_sec,
.tv_nsec = end->tv_nsec - begin->tv_nsec
};

if (diff.tv_nsec < 0) {
diff.tv_nsec += 1000000000;
diff.tv_sec -= 1;
}

assert(diff.tv_sec >= 0);

return (diff.tv_sec * 1000) + (diff.tv_nsec / 1000000);
}

// Check if timeout works without fds
void test_timeout_without_fds() {
printf("test_timeout_without_fds\n");
struct timeval begin, end;
struct timespec begin, end;

gettimeofday(&begin, NULL);
clock_gettime(CLOCK_MONOTONIC, &begin);
assert(poll(NULL, 0, TIMEOUT_MS) == 0);
gettimeofday(&end, NULL);
clock_gettime(CLOCK_MONOTONIC, &end);

int64_t duration = timeval_delta_ms(&begin, &end);
printf(" -> duration: %lld ms\n", duration);
Expand All @@ -46,15 +55,15 @@ void test_timeout_without_fds() {
// Check if timeout works with fds without events
void test_timeout_with_fds_without_events() {
printf("test_timeout_with_fds_without_events\n");
struct timeval begin, end;
struct timespec begin, end;
int pipe_a[2];

assert(pipe(pipe_a) == 0);

gettimeofday(&begin, NULL);
clock_gettime(CLOCK_MONOTONIC, &begin);
struct pollfd fds = {pipe_a[0], 0, 0};
assert(poll(&fds, 1, TIMEOUT_MS) == 0);
gettimeofday(&end, NULL);
clock_gettime(CLOCK_MONOTONIC, &end);

int64_t duration = timeval_delta_ms(&begin, &end);
printf(" -> duration: %lld ms\n", duration);
Expand All @@ -77,7 +86,7 @@ void *write_after_sleep(void * arg) {
// Check if poll can unblock on an event
void test_unblock_poll() {
printf("test_unblock_poll\n");
struct timeval begin, end;
struct timespec begin, end;
pthread_t tid;
int pipe_a[2];

Expand All @@ -88,10 +97,10 @@ void test_unblock_poll() {
{pipe_a[0], POLLIN, 0},
{pipe_shared[0], POLLIN, 0},
};
gettimeofday(&begin, NULL);
clock_gettime(CLOCK_MONOTONIC, &begin);
assert(pthread_create(&tid, NULL, write_after_sleep, NULL) == 0);
assert(poll(fds, 2, -1) == 1);
gettimeofday(&end, NULL);
clock_gettime(CLOCK_MONOTONIC, &end);
assert(fds[1].revents & POLLIN);

int64_t duration = timeval_delta_ms(&begin, &end);
Expand All @@ -105,12 +114,12 @@ void test_unblock_poll() {
}

void *do_poll_in_thread(void * arg) {
struct timeval begin, end;
struct timespec begin, end;

gettimeofday(&begin, NULL);
clock_gettime(CLOCK_MONOTONIC, &begin);
struct pollfd fds = {pipe_shared[0], POLLIN, 0};
assert(poll(&fds, 1, 4000) == 1);
gettimeofday(&end, NULL);
clock_gettime(CLOCK_MONOTONIC, &end);
assert(fds.revents & POLLIN);

int64_t duration = timeval_delta_ms(&begin, &end);
Expand Down
Loading
Loading