Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,14 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = outcome};
.outcome = outcome,
.errorDescription = kj::str(description)};
})
.then([&context](WorkerInterface::AlarmResult result)
-> kj::Promise<WorkerInterface::AlarmResult> {
return context.waitForOutputLocks().then(
[result]() { return kj::mv(result); }, [&context](kj::Exception&& e) {
return context.waitForOutputLocks().then([result = kj::mv(result)]() mutable {
return kj::mv(result);
}, [&context](kj::Exception&& e) {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
kj::String actorId;
KJ_SWITCH_ONEOF(actor.getId()) {
Expand Down Expand Up @@ -620,7 +622,8 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = EventOutcome::EXCEPTION};
.outcome = EventOutcome::EXCEPTION,
.errorDescription = kj::str(e.getDescription())};
});
});
}
Expand Down
16 changes: 10 additions & 6 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,10 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
// TODO(someday) If the request responsible for fulfilling this alarm were to be cancelled, then
// we could probably take over and try to fulfill it ourselves. Maybe we'd want to loop on
// `actor.getAlarm()`? We'd have to distinguish between rescheduling and request cancellation.
auto result = co_await promise;
co_return result;
auto outcome = co_await promise;
co_return AlarmResult{.retry = outcome.retry,
.retryCountsAgainstLimit = outcome.retryCountsAgainstLimit,
.outcome = outcome.outcome};
}

// There isn't a pre-existing alarm, we can set event info and call `delivered()` (which emits
Expand Down Expand Up @@ -806,19 +808,21 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
}

// We succeeded, inform any other entrypoints that may be waiting upon us.
af.fulfill(result);
af.fulfill(result.asOutcome());
cancellationGuard.cancel();
co_return result;
co_return kj::mv(result);
} catch (const kj::Exception& e) {
// We failed, inform any other entrypoints that may be waiting upon us.
af.reject(e);
cancellationGuard.cancel();
throw;
}
}
KJ_CASE_ONEOF(result, WorkerInterface::AlarmResult) {
KJ_CASE_ONEOF(outcome, WorkerInterface::AlarmOutcome) {
// The alarm was cancelled while we were waiting to run, go ahead and return the result.
co_return result;
co_return AlarmResult{.retry = outcome.retry,
.retryCountsAgainstLimit = outcome.retryCountsAgainstLimit,
.outcome = outcome.outcome};
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/workerd/io/worker-interface.c++
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,14 @@ kj::Promise<WorkerInterface::AlarmResult> RpcWorkerInterface::runAlarm(
req.setRetryCount(retryCount);
return req.send().then([](auto resp) {
auto respResult = resp.getResult();
kj::Maybe<kj::String> errorDescription;
if (respResult.hasErrorDescription()) {
errorDescription = kj::str(respResult.getErrorDescription());
}
return WorkerInterface::AlarmResult{.retry = respResult.getRetry(),
.retryCountsAgainstLimit = respResult.getRetryCountsAgainstLimit(),
.outcome = respResult.getOutcome()};
.outcome = respResult.getOutcome(),
.errorDescription = kj::mv(errorDescription)};
});
}

Expand All @@ -419,7 +424,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> RpcWorkerInterface::customEven

// ======================================================================================
WorkerInterface::AlarmFulfiller::AlarmFulfiller(
kj::Own<kj::PromiseFulfiller<AlarmResult>> fulfiller)
kj::Own<kj::PromiseFulfiller<AlarmOutcome>> fulfiller)
: maybeFulfiller(kj::mv(fulfiller)) {}

WorkerInterface::AlarmFulfiller::~AlarmFulfiller() noexcept(false) {
Expand All @@ -428,7 +433,7 @@ WorkerInterface::AlarmFulfiller::~AlarmFulfiller() noexcept(false) {
}
}

void WorkerInterface::AlarmFulfiller::fulfill(const AlarmResult& result) {
void WorkerInterface::AlarmFulfiller::fulfill(const AlarmOutcome& result) {
KJ_IF_SOME(fulfiller, getFulfiller()) {
fulfiller.fulfill(kj::cp(result));
}
Expand All @@ -442,14 +447,14 @@ void WorkerInterface::AlarmFulfiller::reject(const kj::Exception& e) {

void WorkerInterface::AlarmFulfiller::cancel() {
KJ_IF_SOME(fulfiller, getFulfiller()) {
fulfiller.fulfill(AlarmResult{
fulfiller.fulfill(AlarmOutcome{
.retry = false,
.outcome = EventOutcome::CANCELED,
});
}
}

kj::Maybe<kj::PromiseFulfiller<WorkerInterface::AlarmResult>&> WorkerInterface::AlarmFulfiller::
kj::Maybe<kj::PromiseFulfiller<WorkerInterface::AlarmOutcome>&> WorkerInterface::AlarmFulfiller::
getFulfiller() {
KJ_IF_SOME(fulfiller, maybeFulfiller) {
if (fulfiller.get()->isWaiting()) {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ struct AlarmRun @0xfa8ea4e97e23b03d {

retry @1 :Bool;
retryCountsAgainstLimit @2 :Bool = true;
errorDescription @3 :Text;
}

struct QueueMessage @0x944adb18c0352295 {
Expand Down
23 changes: 18 additions & 5 deletions src/workerd/io/worker-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,42 @@ class WorkerInterface: public kj::HttpService {
EventOutcome outcome = EventOutcome::UNKNOWN;
};

// Copyable subset of AlarmResult, used by ForkedPromise for alarm deduplication in Worker::Actor.
struct AlarmOutcome {
bool retry = true;
bool retryCountsAgainstLimit = true;
EventOutcome outcome = EventOutcome::UNKNOWN;
};

struct AlarmResult {
Comment thread
jqmmes marked this conversation as resolved.
bool retry = true;
bool retryCountsAgainstLimit = true;
EventOutcome outcome = EventOutcome::UNKNOWN;
kj::Maybe<kj::String> errorDescription;

AlarmOutcome asOutcome() const {
return {
.retry = retry, .retryCountsAgainstLimit = retryCountsAgainstLimit, .outcome = outcome};
}
};

class AlarmFulfiller {
public:
AlarmFulfiller(kj::Own<kj::PromiseFulfiller<AlarmResult>> fulfiller);
AlarmFulfiller(kj::Own<kj::PromiseFulfiller<AlarmOutcome>> fulfiller);
KJ_DISALLOW_COPY(AlarmFulfiller);
AlarmFulfiller(AlarmFulfiller&&) = default;
AlarmFulfiller& operator=(AlarmFulfiller&&) = default;
~AlarmFulfiller() noexcept(false);
void fulfill(const AlarmResult& result);
void fulfill(const AlarmOutcome& result);
void reject(const kj::Exception& e);
void cancel();

private:
kj::Maybe<kj::Own<kj::PromiseFulfiller<AlarmResult>>> maybeFulfiller;
kj::Maybe<kj::PromiseFulfiller<AlarmResult>&> getFulfiller();
kj::Maybe<kj::Own<kj::PromiseFulfiller<AlarmOutcome>>> maybeFulfiller;
kj::Maybe<kj::PromiseFulfiller<AlarmOutcome>&> getFulfiller();
};

using ScheduleAlarmResult = kj::OneOf<AlarmResult, AlarmFulfiller>;
using ScheduleAlarmResult = kj::OneOf<AlarmOutcome, AlarmFulfiller>;

// Trigger a scheduled event with the given scheduled (unix timestamp) time and cron string.
// The cron string must be valid until the returned promise completes.
Expand Down
12 changes: 6 additions & 6 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3727,7 +3727,7 @@ struct Worker::Actor::Impl {

struct ScheduledAlarm {
ScheduledAlarm(
kj::Date scheduledTime, kj::PromiseFulfillerPair<WorkerInterface::AlarmResult> pf)
kj::Date scheduledTime, kj::PromiseFulfillerPair<WorkerInterface::AlarmOutcome> pf)
: scheduledTime(scheduledTime),
resultFulfiller(kj::mv(pf.fulfiller)),
resultPromise(pf.promise.fork()) {}
Expand All @@ -3737,16 +3737,16 @@ struct Worker::Actor::Impl {

kj::Date scheduledTime;
WorkerInterface::AlarmFulfiller resultFulfiller;
kj::ForkedPromise<WorkerInterface::AlarmResult> resultPromise;
kj::ForkedPromise<WorkerInterface::AlarmOutcome> resultPromise;
kj::Promise<void> cleanupPromise = resultPromise.addBranch().then(
[](WorkerInterface::AlarmResult&&) {}, [](kj::Exception&&) {});
[](WorkerInterface::AlarmOutcome&&) {}, [](kj::Exception&&) {});
// The first thing we do after we get a result should be to remove the running alarm (if we got
// that far). So we grab the first branch now and ignore any results, before anyone else has a
// chance to do so.
};
struct RunningAlarm {
kj::Date scheduledTime;
kj::ForkedPromise<WorkerInterface::AlarmResult> resultPromise;
kj::ForkedPromise<WorkerInterface::AlarmOutcome> resultPromise;
};
// If valid, we have an alarm invocation that has not yet received an `AlarmFulfiller` and thus
// is either waiting for a running alarm or its scheduled time.
Expand Down Expand Up @@ -4104,7 +4104,7 @@ void Worker::Actor::Impl::HooksImpl::updateAlarmInMemory(kj::Maybe<kj::Date> new
maybeAlarmPreviewTask = retry();
}

kj::Maybe<kj::Promise<WorkerInterface::AlarmResult>> Worker::Actor::getAlarm(
kj::Maybe<kj::Promise<WorkerInterface::AlarmOutcome>> Worker::Actor::getAlarm(
kj::Date scheduledTime) {
KJ_IF_SOME(runningAlarm, impl->maybeRunningAlarm) {
if (runningAlarm.scheduledTime == scheduledTime) {
Expand Down Expand Up @@ -4141,7 +4141,7 @@ kj::Promise<WorkerInterface::ScheduleAlarmResult> Worker::Actor::scheduleAlarm(

KJ_IASSERT(impl->maybeScheduledAlarm == kj::none);
auto& scheduledAlarm = impl->maybeScheduledAlarm.emplace(
scheduledTime, kj::newPromiseAndFulfiller<WorkerInterface::AlarmResult>());
scheduledTime, kj::newPromiseAndFulfiller<WorkerInterface::AlarmOutcome>());

// Probably don't need to use kj::coCapture for this but doing so just to be on the
// safe side...
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ class Worker::Actor final: public kj::Refcounted {

// If there is a scheduled or running alarm with the given `scheduledTime`, return a promise to
// its result. This allows use to de-dupe multiple requests to a single `IoContext::run()`.
kj::Maybe<kj::Promise<WorkerInterface::AlarmResult>> getAlarm(kj::Date scheduledTime);
kj::Maybe<kj::Promise<WorkerInterface::AlarmOutcome>> getAlarm(kj::Date scheduledTime);

// Wait for `Date.now()` to be greater than or equal to `scheduledTime`. If the promise resolves
// to an `AlarmFulfiller`, then the caller is responsible for invoking `fulfill()`, `reject()`, or
Expand Down
Loading