Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,9 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
}
}

auto currentTime = kj::systemPreciseCalendarClock().now();
KJ_SWITCH_ONEOF(persistent.armAlarmHandler(
scheduledTime, context.getCurrentTraceSpan(), false, actorId)) {
scheduledTime, context.getCurrentTraceSpan(), currentTime, false, actorId)) {
KJ_CASE_ONEOF(armResult, ActorCacheInterface::RunAlarmHandler) {
auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
if (handler.alarm == kj::none) {
Expand Down
16 changes: 10 additions & 6 deletions src/workerd/io/actor-cache-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4938,6 +4938,8 @@ KJ_TEST("ActorCache alarm get/put") {

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
auto twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;
// Used as the "current time" parameter for armAlarmHandler in tests.
auto testCurrentTime = kj::UNIX_EPOCH;
{
// Test alarm writes happen transactionally with storage ops
test.setAlarm(oneMs);
Expand Down Expand Up @@ -4979,7 +4981,8 @@ KJ_TEST("ActorCache alarm get/put") {

{
// we have a cached time == nullptr, so we should not attempt to run an alarm
auto armResult = test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, nullptr);
auto armResult =
test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorCache::CancelAlarmHandler>());
auto cancelResult = kj::mv(armResult.get<ActorCache::CancelAlarmHandler>());
KJ_ASSERT(cancelResult.waitBeforeCancel.poll(ws));
Expand All @@ -4997,7 +5000,7 @@ KJ_TEST("ActorCache alarm get/put") {
{
// Test that alarm handler handle clears alarm when dropped with no writes
{
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr);
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
}
mockStorage->expectCall("deleteAlarm", ws)
Expand All @@ -5010,7 +5013,7 @@ KJ_TEST("ActorCache alarm get/put") {

// Test that alarm handler handle does not clear alarm when dropped with writes
{
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr);
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
test.setAlarm(twoMs);
}
Expand All @@ -5024,7 +5027,7 @@ KJ_TEST("ActorCache alarm get/put") {

// Test that alarm handler handle does not cache delete when it fails
{
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr);
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
}
mockStorage->expectCall("deleteAlarm", ws)
Expand All @@ -5036,7 +5039,7 @@ KJ_TEST("ActorCache alarm get/put") {
{
// Test that alarm handler handle does not cache alarm delete when noCache == true
{
auto armResult = test.cache.armAlarmHandler(twoMs, nullptr, true);
auto armResult = test.cache.armAlarmHandler(twoMs, nullptr, testCurrentTime, true);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
}
mockStorage->expectCall("deleteAlarm", ws)
Expand Down Expand Up @@ -5073,6 +5076,7 @@ KJ_TEST("ActorCache alarm delete when flush fails") {
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
auto testCurrentTime = kj::UNIX_EPOCH;

{
auto time = expectUncached(test.getAlarm());
Expand All @@ -5090,7 +5094,7 @@ KJ_TEST("ActorCache alarm delete when flush fails") {
// we want to test that even if a flush is retried
// that the post-delete actions for a checked delete happen.
{
auto handle = test.cache.armAlarmHandler(oneMs, nullptr);
auto handle = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);

auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == kj::none);
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/io/actor-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ kj::Maybe<kj::Promise<void>> ActorCache::evictStale(kj::Date now) {
}

kj::OneOf<ActorCache::CancelAlarmHandler, ActorCache::RunAlarmHandler> ActorCache::armAlarmHandler(
kj::Date scheduledTime, SpanParent parentSpan, bool noCache, kj::StringPtr actorId) {
kj::Date scheduledTime,
SpanParent parentSpan,
kj::Date /*currentTime -- unused*/,
bool noCache,
kj::StringPtr actorId) {
noCache = noCache || lru.options.noCache;

KJ_ASSERT(!currentAlarmTime.is<DeferredAlarmDelete>());
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/io/actor-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,12 @@ class ActorCacheInterface: public ActorCacheOps {
};

// Call when entering the alarm handler.
//
// `currentTime` is used to determine if an overdue alarm should run immediately even when
// the local alarm state differs from the scheduled time (to avoid blocking on storage sync).
virtual kj::OneOf<CancelAlarmHandler, RunAlarmHandler> armAlarmHandler(kj::Date scheduledTime,
SpanParent parentSpan,
kj::Date currentTime,
bool noCache = false,
kj::StringPtr actorId = "") = 0;

Expand Down Expand Up @@ -363,6 +367,7 @@ class ActorCache final: public ActorCacheInterface {

kj::OneOf<CancelAlarmHandler, RunAlarmHandler> armAlarmHandler(kj::Date scheduledTime,
SpanParent parentSpan,
kj::Date currentTime,
bool noCache = false,
kj::StringPtr actorId = "") override;
void cancelDeferredAlarmDeletion() override;
Expand Down
38 changes: 21 additions & 17 deletions src/workerd/io/actor-sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ static constexpr kj::Date twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date threeMs = 3 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date fourMs = 4 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date fiveMs = 5 * kj::MILLISECONDS + kj::UNIX_EPOCH;
// Used as the "current time" parameter for armAlarmHandler in tests.
// Set to epoch (before all test alarm times) so existing tests aren't affected by
// the overdue alarm check.
static constexpr kj::Date testCurrentTime = kj::UNIX_EPOCH;

template <typename T>
kj::Promise<T> eagerlyReportExceptions(kj::Promise<T> promise, kj::SourceLocation location = {}) {
Expand Down Expand Up @@ -586,7 +590,7 @@ KJ_TEST("tells alarm handler to cancel when committed alarm is empty") {
ActorSqliteTest test;

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
// We expect armAlarmHandler() to tell us to cancel the alarm.
KJ_ASSERT(armResult.is<ActorCache::CancelAlarmHandler>());
auto waitPromise = kj::mv(armResult.get<ActorCache::CancelAlarmHandler>().waitBeforeCancel);
Expand All @@ -612,7 +616,7 @@ KJ_TEST("tells alarm handler to reschedule when handler alarm is later than comm
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// Request handler run at 2ms. Expect cancellation with rescheduling.
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
auto cancelResult = kj::mv(armResult.get<ActorSqlite::CancelAlarmHandler>());

Expand All @@ -636,7 +640,7 @@ KJ_TEST("tells alarm handler to reschedule when handler alarm is earlier than co
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

// Expect that armAlarmHandler() tells caller to cancel after rescheduling completes.
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
auto cancelResult = kj::mv(armResult.get<ActorSqlite::CancelAlarmHandler>());

Expand All @@ -661,7 +665,7 @@ KJ_TEST("does not cancel handler when local db alarm state is later than schedul

test.setAlarm(twoMs);
{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
}
test.pollAndExpectCalls({"commit"})[0]->fulfill();
Expand All @@ -680,7 +684,7 @@ KJ_TEST("does not cancel handler when local db alarm state is earlier than sched

test.setAlarm(oneMs);
{
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
}
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
Expand All @@ -698,7 +702,7 @@ KJ_TEST("getAlarm() returns null during handler") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.pollAndExpectCalls({});

Expand All @@ -719,7 +723,7 @@ KJ_TEST("alarm handler handle clears alarm when dropped with no writes") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
}
test.pollAndExpectCalls({"commit"})[0]->fulfill();
Expand All @@ -738,7 +742,7 @@ KJ_TEST("alarm deleter does not clear alarm when dropped with writes") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.setAlarm(twoMs);
}
Expand All @@ -759,7 +763,7 @@ KJ_TEST("can cancel deferred alarm deletion during handler") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.actor.cancelDeferredAlarmDeletion();
}
Expand All @@ -778,7 +782,7 @@ KJ_TEST("canceling deferred alarm deletion outside handler has no effect") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
}
test.pollAndExpectCalls({"commit"})[0]->fulfill();
Expand All @@ -803,7 +807,7 @@ KJ_TEST("canceling deferred alarm deletion outside handler edge case") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
}
test.actor.cancelDeferredAlarmDeletion();
Expand All @@ -825,7 +829,7 @@ KJ_TEST("canceling deferred alarm deletion is idempotent") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.actor.cancelDeferredAlarmDeletion();
test.actor.cancelDeferredAlarmDeletion();
Expand All @@ -846,7 +850,7 @@ KJ_TEST("alarm handler cleanup succeeds when output gate is broken") {
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
auto deferredDelete = kj::mv(armResult.get<ActorSqlite::RunAlarmHandler>().deferredDelete);

Expand Down Expand Up @@ -893,7 +897,7 @@ KJ_TEST("handler alarm is not deleted when commit fails") {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
Expand Down Expand Up @@ -1340,7 +1344,7 @@ KJ_TEST("rolling back transaction leaves deferred alarm deletion in expected sta
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

{
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

auto txn = test.actor.startTransaction();
Expand Down Expand Up @@ -1373,7 +1377,7 @@ KJ_TEST("committing transaction leaves deferred alarm deletion in expected state
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

{
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

auto txn = test.actor.startTransaction();
Expand Down Expand Up @@ -1404,7 +1408,7 @@ KJ_TEST("rolling back nested transaction leaves deferred alarm deletion in expec
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

{
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

auto txn1 = test.actor.startTransaction();
Expand Down
35 changes: 32 additions & 3 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -863,8 +863,11 @@ void ActorSqlite::shutdown(kj::Maybe<const kj::Exception&> maybeException) {
}

kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSqlite::
armAlarmHandler(
kj::Date scheduledTime, SpanParent parentSpan, bool noCache, kj::StringPtr actorId) {
armAlarmHandler(kj::Date scheduledTime,
SpanParent parentSpan,
kj::Date currentTime,
bool noCache,
kj::StringPtr actorId) {
KJ_ASSERT(!inAlarmHandler);

if (haveDeferredDelete) {
Expand All @@ -879,6 +882,19 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
// If there's a clean db time that differs from the requested handler's scheduled time, this
// run should be canceled.
if (willFireEarlier(scheduledTime, localAlarmState)) {
// If the local alarm time is already in the past, just run the handler now. This avoids
// blocking alarm execution on storage sync when storage is overloaded. The alarm will
// either delete itself on success or reschedule on failure.
if (localAlarmState != kj::none && willFireEarlier(localAlarmState, currentTime)) {
LOG_WARNING_PERIODICALLY(
"NOSENTRY SQLite alarm overdue, running despite AlarmManager mismatch", scheduledTime,
KJ_ASSERT_NONNULL(localAlarmState), currentTime, actorId);
haveDeferredDelete = true;
inAlarmHandler = true;
static const DeferredAlarmDeleter disposer;
return RunAlarmHandler{.deferredDelete = kj::Own<void>(this, disposer)};
}

// If the handler's scheduled time is earlier than the clean scheduled time, we may be
// recovering from a failed db commit or scheduling request, so we need to request that
// the alarm be rescheduled for the current db time, and tell the caller to wait for
Expand Down Expand Up @@ -909,10 +925,23 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
// which suggests that either the alarm manager is working with stale data or that local
// alarm time has somehow gotten out of sync with the scheduled alarm time.

// Only log if the alarm manager is significantly late (>10 seconds behind SQLite)
// We know localAlarmState has a value here because we're in the branch where it's earlier
// than scheduledTime (not equal, and not later).
auto localTime = KJ_ASSERT_NONNULL(localAlarmState);

// If the local alarm time is already in the past, just run the handler now. This avoids
// blocking alarm execution on storage sync when storage is overloaded.
if (localTime <= currentTime) {
LOG_WARNING_PERIODICALLY(
"NOSENTRY SQLite alarm overdue, running despite stale AlarmManager time",
scheduledTime, localTime, currentTime, actorId);
haveDeferredDelete = true;
inAlarmHandler = true;
static const DeferredAlarmDeleter disposer;
return RunAlarmHandler{.deferredDelete = kj::Own<void>(this, disposer)};
}

// Only log if the alarm manager is significantly late (>10 seconds behind SQLite)
if (scheduledTime - localTime > 10 * kj::SECONDS) {
LOG_WARNING_PERIODICALLY(
"NOSENTRY SQLite alarm handler canceled.", scheduledTime, actorId, localTime);
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
void shutdown(kj::Maybe<const kj::Exception&> maybeException) override;
kj::OneOf<CancelAlarmHandler, RunAlarmHandler> armAlarmHandler(kj::Date scheduledTime,
SpanParent parentSpan,
kj::Date currentTime,
bool noCache = false,
kj::StringPtr actorId = "") override;
void cancelDeferredAlarmDeletion() override;
Expand Down