Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5f48f12
feat(sync): add delayed task submission for throttling
jpnurmi Feb 5, 2026
6d98462
Make flush delay-aware so it waits for delayed tasks
jpnurmi Feb 6, 2026
a399d2a
Add sentry__bgworker_submit_at for absolute timestamps
jpnurmi Feb 6, 2026
b97d337
fix(sync): discard pending delayed tasks on shutdown
jpnurmi Feb 13, 2026
c28a807
fix(sync): let is_done handle delayed tasks on shutdown
jpnurmi Feb 13, 2026
3e19e39
test(sync): add delayed task tests for insert-at-head and cleanup
jpnurmi Feb 15, 2026
90295b8
fix(bgworker): skip far-future delayed tasks in flush delay calculation
jpnurmi Feb 16, 2026
ba36aa7
fix(bgworker): walk queue to find last eligible task in flush
jpnurmi Feb 16, 2026
b827096
fix(bgworker): prevent unsigned wraparound in delayed task deadline
jpnurmi Feb 16, 2026
6ee3c9f
fix(bgworker): prevent head insertion from re-executing current task
jpnurmi Feb 16, 2026
bfbfe36
fix(bgworker): use absolute timestamp for flush sentinel scheduling
jpnurmi Feb 16, 2026
c24c7f8
fix(bgworker): update current_task->next_task in foreach_matching
jpnurmi Feb 17, 2026
1adbbd2
fix(bgworker): clear current_task when foreach_matching drops it
jpnurmi Feb 17, 2026
499c76b
fix(bgworker): use stable shutdown completion check
jpnurmi Feb 17, 2026
e830200
fix(bgworker): check timeout before join in shutdown
jpnurmi Feb 17, 2026
e25efac
test(bgworker): verify submit after foreach drops current_task
jpnurmi Feb 17, 2026
b686fda
test(bgworker): verify submit after foreach drops next task
jpnurmi Feb 17, 2026
a8081eb
docs(sync): add docstring to add_saturate helper
jpnurmi Feb 17, 2026
4969b03
fix(bgworker): skip current_task in flush deadline scan
jpnurmi Feb 17, 2026
ba02fbb
fix(bgworker): clamp delayed task wait to prevent uint32 wrapping
jpnurmi Feb 17, 2026
7a15acc
Merge remote-tracking branch 'upstream/master' into jpnurmi/feat/dela…
jpnurmi Feb 18, 2026
3a477a5
Merge remote-tracking branch 'upstream/master' into jpnurmi/feat/dela…
jpnurmi Feb 23, 2026
49ad07b
docs: add comment about benign race window in bgworker flush
jpnurmi Feb 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 107 additions & 13 deletions src/sentry_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,20 @@ thread_setname(sentry_threadid_t thread_id, const char *thread_name)
* `done` *from* the worker signaling that it will close down and can be joined.
*/

/**
* Overflow-safe addition that clamps to UINT64_MAX instead of wrapping.
*/
static uint64_t
add_saturate(uint64_t a, uint64_t b)
{
return b <= UINT64_MAX - a ? a + b : UINT64_MAX;
}

struct sentry_bgworker_task_s;
typedef struct sentry_bgworker_task_s {
struct sentry_bgworker_task_s *next_task;
long refcount;
uint64_t execute_after;
sentry_task_exec_func_t exec_func;
void (*cleanup_func)(void *task_data);
void *task_data;
Expand Down Expand Up @@ -155,6 +165,7 @@ struct sentry_bgworker_s {
sentry_mutex_t task_lock;
sentry_bgworker_task_t *first_task;
sentry_bgworker_task_t *last_task;
sentry_bgworker_task_t *current_task;
void *state;
void (*free_state)(void *state);
long refcount;
Expand Down Expand Up @@ -225,7 +236,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw)
static bool
sentry__bgworker_is_done(sentry_bgworker_t *bgw)
{
return !bgw->first_task && !sentry__atomic_fetch(&bgw->running);
return (!bgw->first_task
|| sentry__monotonic_time() < bgw->first_task->execute_after)
&& !sentry__atomic_fetch(&bgw->running);
}

SENTRY_THREAD_FN
Expand Down Expand Up @@ -260,7 +273,18 @@ worker_thread(void *data)
continue;
}

// wait for a delayed task, wake up to new submissions
{
uint64_t now = sentry__monotonic_time();
if (now < task->execute_after) {
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
(uint32_t)MIN(task->execute_after - now, UINT32_MAX));
continue;
}
}

sentry__task_incref(task);
bgw->current_task = task;
sentry__mutex_unlock(&bgw->task_lock);

SENTRY_DEBUG("executing task on worker thread");
Expand All @@ -274,6 +298,7 @@ worker_thread(void *data)
// if not, we pop it and `decref` again, removing the _is inside
// list_ refcount.
sentry__mutex_lock(&bgw->task_lock);
bgw->current_task = NULL;
if (bgw->first_task == task) {
bgw->first_task = task->next_task;
if (task == bgw->last_task) {
Expand Down Expand Up @@ -350,11 +375,28 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
sentry__cond_init(&flush_task->signal);
sentry__mutex_init(&flush_task->lock);

// place the flush sentinel after the last task due within the timeout;
// tasks delayed beyond the timeout cannot complete in time anyway
uint64_t before = sentry__monotonic_time();
uint64_t deadline = add_saturate(before, timeout);
uint64_t execute_after = before;
sentry__mutex_lock(&bgw->task_lock);
for (sentry_bgworker_task_t *t
= bgw->current_task ? bgw->current_task->next_task : bgw->first_task;
t && t->execute_after <= deadline; t = t->next_task) {
if (t->execute_after > execute_after) {
execute_after = t->execute_after;
}
}
// NOTE: another thread could submit between unlock and submit_at, making
// execute_after stale. Flush semantics make this harmless.
sentry__mutex_unlock(&bgw->task_lock);

sentry__mutex_lock(&flush_task->lock);

/* submit the task that triggers our condvar once it runs */
sentry__bgworker_submit(bgw, sentry__flush_task,
(void (*)(void *))sentry__flush_task_decref, flush_task);
sentry__bgworker_submit_at(bgw, sentry__flush_task,
(void (*)(void *))sentry__flush_task_decref, flush_task, execute_after);

uint64_t started = sentry__monotonic_time();
bool was_flushed = false;
Expand Down Expand Up @@ -397,12 +439,6 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
uint64_t started = sentry__monotonic_time();
sentry__mutex_lock(&bgw->task_lock);
while (true) {
if (sentry__bgworker_is_done(bgw)) {
sentry__mutex_unlock(&bgw->task_lock);
sentry__thread_join(bgw->thread_id);
return 0;
}

uint64_t now = sentry__monotonic_time();
if (now > started && now - started > timeout) {
sentry__atomic_store(&bgw->running, 0);
Expand All @@ -413,6 +449,12 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
return 1;
}

if (!sentry__atomic_fetch(&bgw->running)) {
sentry__mutex_unlock(&bgw->task_lock);
sentry__thread_join(bgw->thread_id);
return 0;
}

// this will implicitly release the lock, and re-acquire on wake
sentry__cond_wait_timeout(&bgw->done_signal, &bgw->task_lock, 250);
}
Expand All @@ -422,6 +464,29 @@ int
sentry__bgworker_submit(sentry_bgworker_t *bgw,
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
void *task_data)
{
SENTRY_DEBUG("submitting task to background worker thread");
return sentry__bgworker_submit_at(
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
}

int
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
void *task_data, uint64_t delay_ms)
{
SENTRY_DEBUGF("submitting %" PRIu64
" ms delayed task to background worker thread",
delay_ms);
uint64_t execute_after = add_saturate(sentry__monotonic_time(), delay_ms);
return sentry__bgworker_submit_at(
bgw, exec_func, cleanup_func, task_data, execute_after);
}

int
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
void *task_data, uint64_t execute_after)
{
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
if (!task) {
Expand All @@ -432,19 +497,42 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
}
task->next_task = NULL;
task->refcount = 1;
task->execute_after = execute_after;
task->exec_func = exec_func;
task->cleanup_func = cleanup_func;
task->task_data = task_data;

SENTRY_DEBUG("submitting task to background worker thread");
sentry__mutex_lock(&bgw->task_lock);

if (!bgw->first_task) {
// empty queue
bgw->first_task = task;
}
if (bgw->last_task) {
bgw->last_task = task;
} else if (bgw->last_task->execute_after <= task->execute_after) {
// append last (common fast path for FIFO immediates)
bgw->last_task->next_task = task;
bgw->last_task = task;
} else {
// insert sorted by execute_after; skip past current_task which
// may be executing without the lock held
sentry_bgworker_task_t *prev = bgw->current_task;
sentry_bgworker_task_t *cur = prev ? prev->next_task : bgw->first_task;
while (cur && cur->execute_after <= task->execute_after) {
prev = cur;
cur = cur->next_task;
}

task->next_task = cur;
if (prev) {
prev->next_task = task;
} else {
bgw->first_task = task;
}
if (!task->next_task) {
bgw->last_task = task;
}
}
bgw->last_task = task;

sentry__cond_wake(&bgw->submit_signal);
sentry__mutex_unlock(&bgw->task_lock);

Expand Down Expand Up @@ -475,6 +563,12 @@ sentry__bgworker_foreach_matching(sentry_bgworker_t *bgw,
} else {
bgw->first_task = next_task;
}
if (bgw->current_task == task) {
bgw->current_task = NULL;
} else if (bgw->current_task
&& bgw->current_task->next_task == task) {
bgw->current_task->next_task = next_task;
}
sentry__task_decref(task);
dropped++;
} else {
Expand Down
11 changes: 11 additions & 0 deletions src/sentry_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
/**
* This will submit a new task to the background thread.
*
* The `_delayed` variant delays execution by the specified delay in
* milliseconds, and the `_at` variant executes after the specified monotonic
* timestamp. The latter is mostly useful for testing to ensure deterministic
* ordering of tasks regardless of OS preemption between submissions.
*
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
* Returns 0 on success.
*/
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
void *task_data);
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
void *task_data, uint64_t delay_ms);
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
void *task_data, uint64_t execute_after);

/**
* This function will iterate through all the current tasks of the worker
Expand Down
Loading
Loading