Skip to content

Commit 3eb3259

Browse files
authored
Add wrapper struct to durable events and store trace context (#65)
* Add wrapper struct to durable events and store trace context This should result in Sentry displaying a link from the task execution trace (where 'await_event' is called) back to the trace that performed the 'emit_event' call Note that this is a *breaking change*, as we now wrap the user's payload in a struct when reading/writing to the database. Going forward, we'll be able to add new (optional) fields to this wrapper struct without breaking existing durable deployments * Run fmt * Fix metadata * Fix optional * Run fmt * Enforce that 'inner' and 'metadata' exist * Call process_event_payload_wrapper
1 parent b779f5a commit 3eb3259

9 files changed

Lines changed: 332 additions & 49 deletions

File tree

sql/schema.sql

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,10 @@ begin
792792
perform durable.emit_event(
793793
p_queue_name,
794794
'$child:' || p_task_id::text,
795-
jsonb_build_object('status', p_status) || coalesce(p_payload, '{}'::jsonb)
795+
jsonb_build_object(
796+
'inner', jsonb_build_object('status', p_status) || coalesce(p_payload, '{}'::jsonb),
797+
'metadata', '{}'::jsonb
798+
)
796799
);
797800
end if;
798801

@@ -1423,6 +1426,23 @@ begin
14231426
raise exception 'event_name must be provided';
14241427
end if;
14251428

1429+
-- Validate that if p_payload is not null, it has exactly the allowed keys ('inner' and 'metadata')
1430+
if p_payload is not null and jsonb_typeof(p_payload) = 'object' then
1431+
if exists (
1432+
select 1
1433+
from jsonb_object_keys(p_payload) as k
1434+
where k not in ('inner', 'metadata')
1435+
) then
1436+
raise exception 'p_payload may only contain ''inner'' and ''metadata'' keys';
1437+
end if;
1438+
if not p_payload ? 'inner' then
1439+
raise exception 'p_payload must contain an ''inner'' key';
1440+
end if;
1441+
if not p_payload ? 'metadata' then
1442+
raise exception 'p_payload must contain a ''metadata'' key';
1443+
end if;
1444+
end if;
1445+
14261446
-- Insert the event into the events table (first-writer-wins).
14271447
-- Subsequent emits for the same event are no-ops.
14281448
-- We use DO UPDATE WHERE payload IS NULL to handle the case where await_event

src/client.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use uuid::Uuid;
1111
use crate::error::{DurableError, DurableResult};
1212
use crate::task::{Task, TaskRegistry, TaskWrapper};
1313
use crate::types::{
14-
CancellationPolicy, RetryStrategy, SpawnDefaults, SpawnOptions, SpawnResult, SpawnResultRow,
15-
WorkerOptions,
14+
CancellationPolicy, DurableEventPayload, RetryStrategy, SpawnDefaults, SpawnOptions,
15+
SpawnResult, SpawnResultRow, WorkerOptions,
1616
};
1717

1818
/// Internal struct for serializing spawn options to the database.
@@ -684,7 +684,21 @@ where
684684
#[cfg(feature = "telemetry")]
685685
tracing::Span::current().record("queue", queue);
686686

687-
let payload_json = serde_json::to_value(payload)?;
687+
let inner_payload_json = serde_json::to_value(payload)?;
688+
689+
let mut payload_wrapper = DurableEventPayload {
690+
inner: inner_payload_json,
691+
metadata: JsonValue::Null,
692+
};
693+
694+
#[allow(unused_mut)] // mut is needed when telemetry feature is enabled
695+
let mut metadata_map: HashMap<String, JsonValue> = HashMap::new();
696+
697+
#[cfg(feature = "telemetry")]
698+
crate::telemetry::inject_trace_context(&mut metadata_map);
699+
payload_wrapper.metadata = serde_json::to_value(metadata_map)?;
700+
701+
let payload_json = serde_json::to_value(payload_wrapper)?;
688702

689703
let query = "SELECT durable.emit_event($1, $2, $3)";
690704
sqlx::query(query)

src/context.rs

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use uuid::Uuid;
88
use crate::Durable;
99
use crate::error::{ControlFlow, TaskError, TaskResult};
1010
use crate::task::Task;
11+
use crate::types::DurableEventPayload;
1112
use crate::types::{
1213
AwaitEventResult, CheckpointRow, ChildCompletePayload, ChildStatus, ClaimedTask, SpawnOptions,
1314
TaskHandle,
@@ -351,7 +352,9 @@ where
351352

352353
// Check cache for already-received event
353354
if let Some(cached) = self.checkpoint_cache.get(&checkpoint_name) {
354-
return Ok(serde_json::from_value(cached.clone())?);
355+
let durable_event_payload: DurableEventPayload =
356+
serde_json::from_value(cached.clone())?;
357+
return self.process_event_payload_wrapper(durable_event_payload);
355358
}
356359

357360
// Check if we were woken by this event but it timed out (null payload)
@@ -383,10 +386,39 @@ where
383386
}
384387

385388
// Event arrived - cache and return
386-
let payload = result.payload.unwrap_or(JsonValue::Null);
387-
self.checkpoint_cache
388-
.insert(checkpoint_name, payload.clone());
389-
Ok(serde_json::from_value(payload)?)
389+
let durable_event_payload = result.payload.unwrap_or(DurableEventPayload {
390+
inner: JsonValue::Null,
391+
metadata: JsonValue::Null,
392+
});
393+
self.checkpoint_cache.insert(
394+
checkpoint_name,
395+
serde_json::to_value(durable_event_payload.clone())?,
396+
);
397+
398+
self.process_event_payload_wrapper(durable_event_payload)
399+
}
400+
401+
fn process_event_payload_wrapper<T: DeserializeOwned>(
402+
&self,
403+
value: DurableEventPayload,
404+
) -> TaskResult<T> {
405+
#[cfg(feature = "telemetry")]
406+
{
407+
use opentelemetry::KeyValue;
408+
use opentelemetry::trace::TraceContextExt;
409+
use tracing_opentelemetry::OpenTelemetrySpanExt;
410+
411+
let metadata: Option<HashMap<String, JsonValue>> =
412+
serde_json::from_value(value.metadata)?;
413+
if let Some(metadata) = metadata {
414+
let context = crate::telemetry::extract_trace_context(&metadata);
415+
tracing::Span::current().add_link_with_attributes(
416+
context.span().span_context().clone(),
417+
vec![KeyValue::new("sentry.link.type", "previous_trace")],
418+
);
419+
}
420+
}
421+
Ok(serde_json::from_value(value.inner)?)
390422
}
391423

392424
/// Emit an event to this task's queue.
@@ -404,22 +436,13 @@ where
404436
)
405437
)]
406438
pub async fn emit_event<T: Serialize>(&self, event_name: &str, payload: &T) -> TaskResult<()> {
407-
if event_name.is_empty() {
408-
return Err(TaskError::Validation {
409-
message: "event_name must be non-empty".to_string(),
410-
});
411-
}
412-
413-
let payload_json = serde_json::to_value(payload)?;
414-
let query = "SELECT durable.emit_event($1, $2, $3)";
415-
sqlx::query(query)
416-
.bind(self.durable.queue_name())
417-
.bind(event_name)
418-
.bind(&payload_json)
419-
.execute(self.durable.pool())
420-
.await?;
421-
422-
Ok(())
439+
self.durable
440+
.emit_event(event_name, payload, None)
441+
.await
442+
.map_err(|e| TaskError::EmitEventFailed {
443+
event_name: event_name.to_string(),
444+
error: e,
445+
})
423446
}
424447

425448
/// Extend the task's lease to prevent timeout.
@@ -693,8 +716,11 @@ where
693716

694717
// Check cache for already-received event
695718
if let Some(cached) = self.checkpoint_cache.get(&checkpoint_name) {
696-
let payload: ChildCompletePayload = serde_json::from_value(cached.clone())?;
697-
return Self::process_child_payload(&step_name, payload);
719+
let durable_event_payload: DurableEventPayload =
720+
serde_json::from_value(cached.clone())?;
721+
let child_complete_payload: ChildCompletePayload =
722+
self.process_event_payload_wrapper(durable_event_payload)?;
723+
return Self::process_child_payload(&step_name, child_complete_payload);
698724
}
699725

700726
// Check if we were woken by this event but it timed out (null payload)
@@ -724,12 +750,18 @@ where
724750
}
725751

726752
// Event arrived - parse and return
727-
let payload_json = result.payload.unwrap_or(JsonValue::Null);
728-
self.checkpoint_cache
729-
.insert(checkpoint_name, payload_json.clone());
753+
let durable_event_payload = result.payload.unwrap_or(DurableEventPayload {
754+
inner: JsonValue::Null,
755+
metadata: JsonValue::Null,
756+
});
757+
self.checkpoint_cache.insert(
758+
checkpoint_name,
759+
serde_json::to_value(durable_event_payload.clone())?,
760+
);
730761

731-
let payload: ChildCompletePayload = serde_json::from_value(payload_json)?;
732-
Self::process_child_payload(&step_name, payload)
762+
let child_complete_payload: ChildCompletePayload =
763+
self.process_event_payload_wrapper(durable_event_payload)?;
764+
Self::process_child_payload(&step_name, child_complete_payload)
733765
}
734766

735767
/// Process the child completion payload and return the appropriate result.

src/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ pub enum TaskError {
7777
#[error("failed to spawn subtask `{name}`: {error}")]
7878
SubtaskSpawnFailed { name: String, error: DurableError },
7979

80+
/// Error occurred while trying to emit an event.
81+
#[error("failed to emit event `{event_name}`: {error}")]
82+
EmitEventFailed {
83+
event_name: String,
84+
error: DurableError,
85+
},
86+
8087
/// A child task failed.
8188
///
8289
/// Returned by [`TaskContext::join`](crate::TaskContext::join) when the child
@@ -231,6 +238,13 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue {
231238
"subtask_name": name,
232239
})
233240
}
241+
TaskError::EmitEventFailed { event_name, error } => {
242+
serde_json::json!({
243+
"name": "EmitEventFailed",
244+
"message": error.to_string(),
245+
"event_name": event_name,
246+
})
247+
}
234248
TaskError::ChildFailed { step_name, message } => {
235249
serde_json::json!({
236250
"name": "ChildFailed",

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ pub use context::TaskContext;
109109
pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult};
110110
pub use task::{ErasedTask, Task, TaskWrapper};
111111
pub use types::{
112-
CancellationPolicy, ClaimedTask, RetryStrategy, SpawnDefaults, SpawnOptions, SpawnResult,
113-
TaskHandle, WorkerOptions,
112+
CancellationPolicy, ClaimedTask, DurableEventPayload, RetryStrategy, SpawnDefaults,
113+
SpawnOptions, SpawnResult, TaskHandle, WorkerOptions,
114114
};
115115
pub use worker::Worker;
116116

0 commit comments

Comments
 (0)