Skip to content

Commit da489fa

Browse files
committed
feat: basic envoy tunnel impl
1 parent 5c9a464 commit da489fa

34 files changed

Lines changed: 1071 additions & 135 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/config-schema.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/src/actors/delete.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
3434
}),
3535
)?;
3636

37+
let namespace = namespace_res.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
38+
3739
let actor = actors_res
3840
.actors
3941
.into_iter()
@@ -45,13 +47,12 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
4547
return Err(pegboard::errors::Actor::NotFound.build());
4648
}
4749

48-
let namespace = namespace_res.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
49-
5050
// Verify the actor belongs to the specified namespace
5151
if actor.namespace_id != namespace.namespace_id {
5252
return Err(pegboard::errors::Actor::NotFound.build());
5353
}
5454

55+
// TODO: Actor v2
5556
let res = ctx
5657
.signal(pegboard::workflows::actor::Destroy {})
5758
.to_workflow::<pegboard::workflows::actor::Workflow>()

engine/packages/config/src/config/pegboard.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub struct Pegboard {
125125
/// Max time since last seen before actor is considered stale, in milliseconds.
126126
pub envoy_event_demuxer_max_last_seen_threshold: Option<u64>,
127127
/// Max response payload size in bytes from actors.
128-
pub envoy_max_response_payload_body_size: Option<usize>,
128+
pub envoy_max_response_payload_size: Option<usize>,
129129
/// Ping interval for envoy updates in milliseconds.
130130
pub envoy_update_ping_interval: Option<u64>,
131131
/// How long after last ping before considering a envoy ineligible for allocation.
@@ -282,8 +282,8 @@ impl Pegboard {
282282
.unwrap_or(30_000)
283283
}
284284

285-
pub fn envoy_max_response_payload_body_size(&self) -> usize {
286-
self.envoy_max_response_payload_body_size
285+
pub fn envoy_max_response_payload_size(&self) -> usize {
286+
self.envoy_max_response_payload_size
287287
.unwrap_or(20 * 1024 * 1024) // 20 MiB
288288
}
289289

engine/packages/gasoline/src/builder/common/workflow.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ where
148148
} else {
149149
tracing::debug!(?tags, "unique workflow already exists");
150150
}
151+
} else {
152+
tracing::debug!(?actual_workflow_id, "dispatched workflow");
151153
}
152154

153155
if workflow_id == actual_workflow_id {

engine/packages/guard/src/shared_state.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ impl SharedState {
1717
}
1818

1919
pub async fn start(&self) -> Result<()> {
20-
self.pegboard_gateway.start().await?;
20+
tokio::try_join!(
21+
self.pegboard_gateway.start(),
22+
self.pegboard_gateway2.start(),
23+
)?;
24+
2125
Ok(())
2226
}
2327
}

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ pub async fn handle_init(
334334
serverless_drain_grace_period: conn
335335
.is_serverless
336336
.then(|| pb.serverless_drain_grace_period() as i64),
337+
max_response_payload_size: pb.envoy_max_response_payload_size() as u64,
337338
},
338339
}));
339340
let init_msg_serialized = init_msg.serialize(conn.protocol_version)?;

engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -454,12 +454,7 @@ async fn handle_tunnel_message(
454454
let inner_data_len = tunnel_message_inner_data_len(&msg.message_kind);
455455

456456
// Enforce incoming payload size
457-
if inner_data_len
458-
> ctx
459-
.config()
460-
.pegboard()
461-
.envoy_max_response_payload_body_size()
462-
{
457+
if inner_data_len > ctx.config().pegboard().envoy_max_response_payload_size() {
463458
return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build());
464459
}
465460

engine/packages/pegboard-gateway2/src/shared_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ impl SharedState {
308308
while let Ok(NextOutput::Message(msg)) = sub.next().await {
309309
tracing::trace!(
310310
payload_len = msg.payload.len(),
311-
"received message from pubsub"
311+
"received message from envoy"
312312
);
313313

314314
match versioned::ToGateway::deserialize_with_embedded_version(&msg.payload) {

engine/packages/pegboard/src/workflows/actor2/mod.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,17 @@ struct CheckEnvoyLivenessInput {
358358
envoy_key: String,
359359
}
360360

361+
#[derive(Debug, Serialize, Deserialize)]
362+
struct CheckEnvoyLivenessOutput {
363+
expired: bool,
364+
now: i64,
365+
}
366+
361367
#[activity(CheckEnvoyLiveness)]
362-
async fn check_envoy_liveness(ctx: &ActivityCtx, input: &CheckEnvoyLivenessInput) -> Result<bool> {
368+
async fn check_envoy_liveness(
369+
ctx: &ActivityCtx,
370+
input: &CheckEnvoyLivenessInput,
371+
) -> Result<CheckEnvoyLivenessOutput> {
363372
let state = ctx.state::<State>()?;
364373
let envoy_lost_threshold = ctx.config().pegboard().envoy_lost_threshold();
365374

@@ -378,7 +387,7 @@ async fn check_envoy_liveness(ctx: &ActivityCtx, input: &CheckEnvoyLivenessInput
378387
let now = util::timestamp::now();
379388
let expired = last_ping_ts < now - envoy_lost_threshold;
380389

381-
Ok(expired)
390+
Ok(CheckEnvoyLivenessOutput { expired, now })
382391
})
383392
.custom_instrument(tracing::info_span!("actor_check_envoy_liveness_tx"))
384393
.await
@@ -392,7 +401,7 @@ async fn listen_for_signals(
392401
metrics_workflow_id: Id,
393402
) -> Result<Vec<Main>> {
394403
// Listen for signals based on transition
395-
let signals = match &state.transition {
404+
let signals = match &mut state.transition {
396405
Transition::Allocating {
397406
lost_timeout_ts, ..
398407
}
@@ -432,20 +441,22 @@ async fn listen_for_signals(
432441
// Listen for signals with periodic liveness check timeout
433442
let signals = ctx
434443
.listen_n_until::<Main>(
435-
last_liveness_check_ts + ctx.config().pegboard().envoy_lost_threshold(),
444+
*last_liveness_check_ts + ctx.config().pegboard().envoy_lost_threshold(),
436445
256,
437446
)
438447
.await?;
439448

440449
// Perform liveness check
441450
if signals.is_empty() {
442-
let expired = ctx
451+
let res = ctx
443452
.activity(CheckEnvoyLivenessInput {
444453
envoy_key: envoy.envoy_key.clone(),
445454
})
446455
.await?;
447456

448-
if expired {
457+
*last_liveness_check_ts = res.now;
458+
459+
if res.expired {
449460
vec![Main::Lost(Lost {
450461
generation: state.generation,
451462
reason: LostReason::EnvoyConnectionLost,

0 commit comments

Comments
 (0)