Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -184,27 +184,23 @@ HibernatableWebSocketCustomEvent::HibernatableWebSocketCustomEvent(
params(kj::mv(params)),
manager(manager) {}

// TODO(cleanup): Try to reduce duplication with consumeParams()
tracing::EventInfo HibernatableWebSocketCustomEvent::getEventInfo() const {
// Try to extract event type from params if available
// Try to extract event type from params if available
tracing::HibernatableWebSocketEventInfo::Type HibernatableWebSocketCustomEvent::getEventType()
const {
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(socketParams, HibernatableSocketParams) {
KJ_SWITCH_ONEOF(socketParams.eventType) {
KJ_CASE_ONEOF(text, HibernatableSocketParams::Text) {
return tracing::HibernatableWebSocketEventInfo(
tracing::HibernatableWebSocketEventInfo::Message());
KJ_CASE_ONEOF(_, HibernatableSocketParams::Text) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return tracing::HibernatableWebSocketEventInfo(
tracing::HibernatableWebSocketEventInfo::Message());
KJ_CASE_ONEOF(_, HibernatableSocketParams::Data) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return tracing::HibernatableWebSocketEventInfo(
tracing::HibernatableWebSocketEventInfo::Close{close.code, close.wasClean});
return tracing::HibernatableWebSocketEventInfo::Close{close.code, close.wasClean};
}
KJ_CASE_ONEOF(error, HibernatableSocketParams::Error) {
return tracing::HibernatableWebSocketEventInfo(
tracing::HibernatableWebSocketEventInfo::Error());
KJ_CASE_ONEOF(_, HibernatableSocketParams::Error) {
return tracing::HibernatableWebSocketEventInfo::Error{};
}
}
}
Expand All @@ -214,23 +210,24 @@ tracing::EventInfo HibernatableWebSocketCustomEvent::getEventInfo() const {
switch (payload.which()) {
case rpc::HibernatableWebSocketEventMessage::Payload::TEXT:
case rpc::HibernatableWebSocketEventMessage::Payload::DATA:
return tracing::HibernatableWebSocketEventInfo(
tracing::HibernatableWebSocketEventInfo::Message());
return tracing::HibernatableWebSocketEventInfo::Message{};
case rpc::HibernatableWebSocketEventMessage::Payload::CLOSE: {
auto close = payload.getClose();
return tracing::HibernatableWebSocketEventInfo(
tracing::HibernatableWebSocketEventInfo::Close{close.getCode(), close.getWasClean()});
return tracing::HibernatableWebSocketEventInfo::Close{
close.getCode(), close.getWasClean()};
}
case rpc::HibernatableWebSocketEventMessage::Payload::ERROR:
return tracing::HibernatableWebSocketEventInfo(
tracing::HibernatableWebSocketEventInfo::Error());
return tracing::HibernatableWebSocketEventInfo::Error{};
}
KJ_UNREACHABLE;
}
}
KJ_UNREACHABLE;
}

tracing::EventInfo HibernatableWebSocketCustomEvent::getEventInfo() const {
return tracing::HibernatableWebSocketEventInfo(getEventType());
}

HibernatableSocketParams HibernatableWebSocketCustomEvent::consumeParams() {
KJ_IF_SOME(p, params.tryGet<kj::Own<HibernationReader>>()) {
kj::Maybe<HibernatableSocketParams> eventParameters;
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/api/hibernatable-web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class HibernatableWebSocketCustomEvent final: public WorkerInterface::CustomEven
// HibernatableSocketParams first.
HibernatableSocketParams consumeParams();

// Peeks at params to extract the event type for tracing, without consuming them.
tracing::HibernatableWebSocketEventInfo::Type getEventType() const;

uint16_t typeId;
kj::OneOf<HibernatableSocketParams, kj::Own<HibernationReader>> params;
kj::Maybe<uint32_t> timeoutMs;
Expand Down
21 changes: 12 additions & 9 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
auto incomingRequest =
kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "request() can only be called once"));
this->incomingRequest = kj::none;
incomingRequest->delivered();
auto& context = incomingRequest->getContext();

auto wrappedResponse = kj::heap<ResponseSentTracker>(response);
Expand Down Expand Up @@ -292,6 +291,8 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
workerTracer = t;
}

incomingRequest->delivered();

auto metricsForCatch = kj::addRef(incomingRequest->getMetrics());
auto metricsForProxyTask = kj::addRef(incomingRequest->getMetrics());

Expand Down Expand Up @@ -575,7 +576,6 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
auto incomingRequest =
kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "runScheduled() can only be called once"));
this->incomingRequest = kj::none;
incomingRequest->delivered();
auto& context = incomingRequest->getContext();

KJ_ASSERT(context.getActor() == kj::none);
Expand All @@ -585,10 +585,12 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(

double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;

KJ_IF_SOME(t, context.getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(*incomingRequest, tracing::ScheduledEventInfo(eventTime, kj::str(cron)));
}

incomingRequest->delivered();

// Scheduled handlers run entirely in waitUntil() tasks.
context.addWaitUntil(context.run(
[scheduledTime, cron, entrypointName = entrypointName, props = kj::mv(props), &context,
Expand Down Expand Up @@ -642,13 +644,14 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
co_return result;
}

// There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events).
incomingRequest->delivered();

// There isn't a pre-existing alarm, we can set event info and call `delivered()` (which emits
// metrics events).
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(*incomingRequest, tracing::AlarmEventInfo(scheduledTime));
}

incomingRequest->delivered();

auto scheduleAlarmResult = co_await actor.scheduleAlarm(scheduledTime);
KJ_SWITCH_ONEOF(scheduleAlarmResult) {
KJ_CASE_ONEOF(af, WorkerInterface::AlarmFulfiller) {
Expand Down Expand Up @@ -736,13 +739,13 @@ kj::Promise<bool> WorkerEntrypoint::test() {
auto incomingRequest =
kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "test() can only be called once"));
this->incomingRequest = kj::none;
incomingRequest->delivered();

auto& context = incomingRequest->getContext();
KJ_IF_SOME(t, context.getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(*incomingRequest, tracing::CustomEventInfo());
}

incomingRequest->delivered();

context.addWaitUntil(context.run([entrypointName = entrypointName, props = kj::mv(props),
&context, &metrics = incomingRequest->getMetrics()](
Worker::Lock& lock) mutable -> kj::Promise<void> {
Expand Down
1 change: 0 additions & 1 deletion src/workerd/io/worker-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class WorkerInterface: public kj::HttpService {
virtual uint16_t getType() = 0;

// Get event info for tracing.
// Return none if this event type doesn't need tracing.
virtual tracing::EventInfo getEventInfo() const = 0;

// If the CustomEvent fails before any of the other methods are called, this may be invoked
Expand Down
Loading