Skip to content

Commit 97679bc

Browse files
committed
chore: remove global epoxy contention
1 parent 866069d commit 97679bc

86 files changed

Lines changed: 5905 additions & 9558 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.agent/specs/epoxy-mutable-keys.md

Lines changed: 784 additions & 0 deletions
Large diffs are not rendered by default.

.agent/todo/epoxy-pr-fixes.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Epoxy PR Fixes
2+
3+
TODO items for the `04-04-chore_remove_global_epoxy_contention` PR.
4+
5+
## Items
6+
7+
- [ ] **Use BARE for `KvAcceptedValue` serialization.** `engine/packages/epoxy/src/keys/keys.rs:13` uses raw `serde_bare` with a comment claiming versioned protocol is unnecessary because accepted state is transient. We should still use the versioned BARE protocol path for consistency. Remove the comment justifying raw serde_bare and switch to the protocol-based serialization.
8+
9+
- [ ] **Add doc comment to `LegacyCommittedValueKey`.** `engine/packages/epoxy/src/keys/keys.rs:73` has no doc comment. Add a brief comment explaining this is the v1 committed value key used for dual-read fallback, and link to the "Legacy Fallback" section in `engine/packages/epoxy/README.md`.
10+
11+
- [ ] **Add file-level doc comment to `keys/keys.rs`.** Add a module-level comment at the top of `engine/packages/epoxy/src/keys/keys.rs` linking to `engine/packages/epoxy/README.md` for the full key layout and subspace structure.
12+
13+
- [ ] **Delete `read_value.rs`, inline the read logic into each caller.** `get_local.rs` needs only the value (no cache), `get_optimistic.rs` needs value + cache. Inline the legacy fallback transaction directly into each op and remove `engine/packages/epoxy/src/ops/kv/read_value.rs` and its `mod` entry. Update `engine/CLAUDE.md` references to `read_value.rs`.
14+
15+
- [ ] **Flatten `Proposal`/`Command`/`CommandKind` into a single key+value input.** Every caller wraps exactly one command in `vec![Command { kind: ... }]` and `from_proposal` bails if `len() != 1`. Remove the `Proposal`, `Command`, `CommandKind`, and `NoopCommand` types. `Input` should take key + value directly (or just `SetIfAbsentProposal`). Update callers in `pegboard/workflows/actor/keys.rs`, `pegboard/workflows/actor2/keys.rs`, `api-peer/src/internal.rs`, and test utils.
16+
17+
- [ ] **Design request-level caching/mutability control for epoxy.** This is a design task -- write a spec in `.agent/specs/` before implementing. Key changes needed:
18+
- **Versioned values:** Add version numbers or timestamps to committed values so ballot selection and commit handlers can distinguish newer writes from older ones (currently short-circuit on first committed value at `ballot.rs:126`, `commit.rs:15`, `accept.rs:29`).
19+
- **Ballot selection:** Can't short-circuit on first committed value. Must allow new proposals for already-committed keys.
20+
- **Changelog catch-up:** `changelog.rs:123-128` asserts duplicate entries have identical values (`ensure!(existing_value == entry.value)`). With mutable keys, same key can appear multiple times with different values. Fix: overwrite with newer value instead of asserting equality (entries are versionstamp-ordered).
21+
- **Cache invalidation:** Restore the old purger workflow mechanism (deleted in this PR as `workflows/purger.rs`). No TTLs.
22+
- **No quorum reads needed.** Caller controls staleness tradeoff at request level by choosing whether to use cache or do fresh fanout. Per-request `CachingBehavior` on reads (e.g. `Optimistic` = use cache, `SkipCache` = fresh fanout).
23+
24+
- [ ] **Reword quorum read comment in `get_optimistic.rs:45`.** Replace "We cannot use quorum reads for the fanout read because the optimistic path is intentionally a best-effort lookup" with something like "The fanout asks any single remote replica rather than waiting for a quorum. Since committed values are immutable, any replica that has the value is authoritative. The tradeoff is that if all replicas holding the value are offline, the fanout returns None."
25+
26+
- [ ] **Rename `EPOXY` to `EPOXY_V1` in `universaldb/src/utils/keys.rs`.** The constant currently named `EPOXY` is the v1 subspace integer. Rename it to `EPOXY_V1` (keeping the same integer value) for clarity now that `EPOXY_V2` exists. Update all references (e.g. `keys::legacy_subspace` in `engine/packages/epoxy/src/keys/mod.rs`).

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs-internal/engine/ACTOR_KEY_RESERVATION.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Actor Key Reservation
22

3-
Epoxy is used to globally store which datacenter an actor lives in for a given actor ID.
3+
Epoxy uses per-key Paxos to globally store which datacenter an actor key resolves to.
44

55
## Schema
66

@@ -12,8 +12,8 @@ The reservation ID includes the datacenter of where the actor lives.
1212

1313
Actors have 2 touch points with Epoxy:
1414

15-
- Reserve actor key (epaxos fast path = 1 RTT, epaxos slow path = 2 RTT)
16-
- Resolve actor ID for key (fast path = 0 RTT, slow path = 1 RTT to nearest DC that has the key)
15+
- Reserve actor key. Fresh keys use the per-key Paxos fast path in 1 RTT. If contention or recovery forces `Prepare`, the retry path is `Prepare -> PreAccept -> Commit`, which adds another round and makes a failed-then-retried write 3 RTT total.
16+
- Resolve actor ID for key. Hot reads are local. Cache misses use one best-effort fanout round to the nearest datacenter that has the key.
1717

1818
Resolving actor ID uses `kv_get_optimistic`, which assumes a value does not change after being set. This allows us to cache the actor's datacenter locally and never have to read from other nodes once we've resolved it once.
1919

@@ -98,4 +98,3 @@ _This section needs more elaboration on edge cases._
9898
## Current Limitation: Keys are tied to a datacenter
9999

100100
Because we cannot change the value of the reservation ID, actors can only be created in the original for a given ID.
101-

engine/CLAUDE.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,25 @@ When changing a versioned VBARE schema, follow the existing migration pattern.
2626
- `engine/sdks/rust/runner-protocol/src/lib.rs` `PROTOCOL_MK2_VERSION`
2727
- `engine/sdks/typescript/runner/src/mod.ts` `PROTOCOL_VERSION`
2828
- Update the Rust latest re-export in `engine/sdks/rust/runner-protocol/src/lib.rs` to the new generated module.
29+
30+
## Epoxy durable keys
31+
32+
- Keep shared epoxy tuple layouts in `engine/packages/epoxy/src/keys/keys.rs`. Replica modules should import those `FormalKey` types instead of redefining local tuple paths.
33+
- When adding a new epoxy tuple segment, add the shared constant in `engine/packages/universaldb/src/utils/keys.rs` first so all helpers serialize the same on-disk path.
34+
- In epoxy's per-key replica handlers, scope transactions with `keys::subspace(replica_id)` and use the shared `KvValueKey`, `KvBallotKey`, and `KvAcceptedKey` types directly. Successful Prepare replies should echo the promised ballot in `PrepareResponseOk.highest_ballot`, while any prior accepted record comes from `kv/{key}/accepted`.
35+
- Epoxy changelog writes use versionstamped keys. `Transaction::write` only does plain `pack`, so changelog code should build the full key with `keys::subspace(replica_id).pack_with_versionstamp(...)`, substitute the incomplete versionstamp, and then write it with `tx.set`.
36+
- Epoxy changelog catch-up must stick to one active source replica for the entire pagination loop. Changelog cursors are per-replica versionstamps, so switching sources mid-stream is invalid.
37+
- During epoxy replica bootstrap, `catch_up_replica` should treat "no active source replica yet" as a successful no-op. Fresh clusters can start with every replica in `Joining`, so retrying forever on a missing active source stalls initial bring-up.
38+
- Epoxy replica data lives under per-replica subspaces inside the shared UniversalDB. If a workflow needs another replica's changelog or KV state for coordination or metrics, read that replica's `keys::subspace(replica_id)` directly instead of adding a second RPC just to count local durable state.
39+
- Epoxy quorum helpers use Fast Paxos math, not the older `f`-based EPaxos formula. Keep `slow_q = floor(n / 2) + 1`, derive `fast_q = n - floor((slow_q - 1) / 2)`, and keep sender-excluded fanout quorums as those sizes minus one for `Fast`, `Slow`, and `All` while `Any` still targets one remote response.
40+
- Broadcast the updated epoxy config to all replicas before sending `BeginLearning`. Commit fanout uses `get_all_replicas(config)`, so the learner only receives live commits during catch-up after the rest of the cluster knows about it.
41+
- Epoxy v2 durable state lives under `keys::subspace(replica_id)`, which now points at the `EPOXY_V2` subspace. Legacy committed values remain read-only under `keys::legacy_subspace(replica_id)`.
42+
- Reuse the common local read order from `engine/packages/epoxy/src/ops/kv/read_value.rs`: v2 `KvValueKey`, legacy `LegacyCommittedValueKey`, legacy `KvValueKey`, then v2 `KvOptimisticCacheKey`.
43+
- Ballot selection should only read legacy committed values from `keys::legacy_subspace(replica_id)`. Do not write new ballot, accepted, config, or changelog state into the legacy subspace.
44+
- `engine/packages/epoxy/src/ops/propose.rs` keeps a compatibility `Proposal` and `Command` surface, but the v2 consensus path only supports a single-key set-if-absent value. Use `CheckAndSetCommand { expect_one_of: [None], new_value: Some(...) }` for actor key reservation, or a single `SetCommand` with `value: Some(...)` when you only need the same immutable insert semantics.
45+
- Epoxy slow-path Prepare retries should not immediately rebump ballots in a tight loop. Keep the retry path in `engine/packages/epoxy/src/ops/propose.rs` on a bounded exponential backoff with jitter: start at 10 ms, double each retry to a 1 s base cap, add 0-50% jitter, and fail after 10 retries instead of spinning until the overall request timeout.
46+
- The per-key Prepare and PreAccept rounds should include the local replica by sending a self-targeted epoxy request through `http_client::send_message`. Self-routing lands in `message_request` directly, so local and remote replicas share the same handler path and quorum accounting.
47+
- Keep the epoxy HTTP transport split between `/epoxy/message` for replica RPCs and `/epoxy/changelog-read` for pagination. Route handlers should reject the wrong request kind so changelog reads cannot silently drift back onto the generic message endpoint.
48+
- When you add fields to epoxy workflow state structs, mark the new fields with `#[serde(default)]`. Gasoline replays serialized workflow state, so new coordinator or replica state fields must deserialize cleanly from older runs.
49+
- Epoxy changelog GC should truncate through the latest currently visible changelog key, not clear the whole subspace blindly. Use `KeySelector::last_less_than(changelog_subspace_end)` plus `end_of_key_range(last_key)` so concurrent later appends stay outside the cleared range.
50+
- Epoxy integration tests that spin up `tests/common::TestCtx` must call `shutdown()` before returning. Dropping the harness alone leaves workflow workers and API tasks alive long enough to interfere with later tests in the same process.

engine/packages/api-peer/src/internal.rs

Lines changed: 47 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use anyhow::*;
2-
use epoxy_protocol::protocol::{ReplicaId, SlotId};
2+
use epoxy::{
3+
ops::propose::{Command, CommandKind, Proposal, SetCommand},
4+
protocol::ReplicaId,
5+
};
36
use gas::prelude::*;
47
use indexmap::IndexMap;
58
use rivet_api_builder::ApiCtx;
@@ -226,13 +229,11 @@ pub async fn get_epoxy_replica_debug(
226229
) -> Result<GetEpoxyReplicaDebugResponse> {
227230
let replica_id = ctx.config().epoxy_replica_id();
228231

229-
let (config, ballot, instance_number) = ctx
232+
let config = ctx
230233
.udb()?
231234
.run(|tx| async move {
232235
let config = epoxy::utils::read_config(&tx, replica_id).await?;
233-
let ballot = epoxy::replica::ballot::get_ballot(&tx, replica_id).await?;
234-
let instance_number = epoxy::utils::read_instance_number(&tx, replica_id).await?;
235-
Result::Ok((config, ballot, instance_number))
236+
Result::Ok(config)
236237
})
237238
.await?;
238239

@@ -284,11 +285,12 @@ pub async fn get_epoxy_replica_debug(
284285
},
285286
state: EpoxyReplicaDebugState {
286287
ballot: EpoxyBallot {
287-
epoch: ballot.epoch,
288-
ballot: ballot.ballot,
289-
replica_id: ballot.replica_id,
288+
epoch: 0,
289+
ballot: 0,
290+
replica_id,
290291
},
291-
instance_number,
292+
// Epoxy v2 no longer maintains replica-global ballot or instance counters.
293+
instance_number: 0,
292294
},
293295
})
294296
}
@@ -310,7 +312,7 @@ pub struct GetEpoxyKeyDebugResponse {
310312
#[derive(Serialize, Deserialize, Clone)]
311313
pub struct EpoxyKeyInstance {
312314
pub replica_id: ReplicaId,
313-
pub slot_id: SlotId,
315+
pub slot_id: u64,
314316
pub log_entry: Option<EpoxyKeyLogEntry>,
315317
}
316318

@@ -325,7 +327,7 @@ pub struct EpoxyKeyLogEntry {
325327
#[derive(Serialize, Deserialize, Clone)]
326328
pub struct EpoxyInstance {
327329
pub replica_id: ReplicaId,
328-
pub slot_id: SlotId,
330+
pub slot_id: u64,
329331
}
330332

331333
/// Returns debug information for a specific key on this replica.
@@ -342,46 +344,33 @@ pub async fn get_epoxy_key_debug(
342344
let key_bytes = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &path.key)
343345
.context("invalid base64 key")?;
344346

345-
let instances = ctx
346-
.udb()?
347-
.run(|tx| {
348-
let key_bytes = key_bytes.clone();
349-
async move {
350-
let protocol_instances =
351-
epoxy::utils::read_key_instances(&tx, replica_id, key_bytes).await?;
352-
353-
let mut instances = Vec::new();
354-
for instance in protocol_instances {
355-
let log_entry =
356-
epoxy::utils::read_log_entry(&tx, replica_id, &instance).await?;
357-
instances.push(EpoxyKeyInstance {
358-
replica_id: instance.replica_id,
359-
slot_id: instance.slot_id,
360-
log_entry: log_entry.map(|entry| EpoxyKeyLogEntry {
361-
status: format!("{:?}", entry.state),
362-
ballot: EpoxyBallot {
363-
epoch: entry.ballot.epoch,
364-
ballot: entry.ballot.ballot,
365-
replica_id: entry.ballot.replica_id,
366-
},
367-
seq: entry.seq,
368-
deps: entry
369-
.deps
370-
.into_iter()
371-
.map(|d| EpoxyInstance {
372-
replica_id: d.replica_id,
373-
slot_id: d.slot_id,
374-
})
375-
.collect(),
376-
}),
377-
});
378-
}
379-
380-
Result::Ok(instances)
381-
}
347+
let local_value = ctx
348+
.op(epoxy::ops::kv::get_local::Input {
349+
replica_id,
350+
key: key_bytes,
382351
})
383352
.await?;
384353

354+
let instances = local_value
355+
.value
356+
.map(|_| {
357+
vec![EpoxyKeyInstance {
358+
replica_id,
359+
slot_id: 0,
360+
log_entry: Some(EpoxyKeyLogEntry {
361+
status: "committed".to_string(),
362+
ballot: EpoxyBallot {
363+
epoch: 0,
364+
ballot: 0,
365+
replica_id,
366+
},
367+
seq: 0,
368+
deps: Vec::new(),
369+
}),
370+
}]
371+
})
372+
.unwrap_or_default();
373+
385374
// Compute instances by status
386375
let mut instances_by_status = IndexMap::new();
387376
for instance in &instances {
@@ -536,6 +525,7 @@ pub async fn get_epoxy_kv_optimistic(
536525
.op(epoxy::ops::kv::get_optimistic::Input {
537526
replica_id,
538527
key: key_bytes,
528+
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
539529
})
540530
.await?;
541531

@@ -567,7 +557,6 @@ pub async fn set_epoxy_kv(
567557
body: SetEpoxyKvRequest,
568558
) -> Result<SetEpoxyKvResponse> {
569559
use base64::Engine;
570-
use epoxy_protocol::protocol;
571560

572561
let key_bytes = base64::engine::general_purpose::STANDARD
573562
.decode(&path.key)
@@ -582,16 +571,21 @@ pub async fn set_epoxy_kv(
582571
})
583572
.transpose()?;
584573

574+
let value_bytes = value_bytes.ok_or_else(|| {
575+
anyhow!("epoxy v2 debug set only supports immutable set-if-absent writes")
576+
})?;
577+
585578
let result = ctx
586579
.op(epoxy::ops::propose::Input {
587-
proposal: protocol::Proposal {
588-
commands: vec![protocol::Command {
589-
kind: protocol::CommandKind::SetCommand(protocol::SetCommand {
580+
proposal: Proposal {
581+
commands: vec![Command {
582+
kind: CommandKind::SetCommand(SetCommand {
590583
key: key_bytes,
591-
value: value_bytes,
584+
value: Some(value_bytes),
592585
}),
593586
}],
594587
},
588+
mutable: false,
595589
purge_cache: true,
596590
target_replicas: None,
597591
})

engine/packages/epoxy/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ gas.workspace = true
1414
epoxy-protocol.workspace = true
1515
futures-util.workspace = true
1616
lazy_static.workspace = true
17+
rand.workspace = true
1718
reqwest.workspace = true
1819
rivet-api-builder.workspace = true
1920
rivet-config.workspace = true
@@ -34,7 +35,6 @@ tracing.workspace = true
3435
universaldb.workspace = true
3536
url.workspace = true
3637
uuid.workspace = true
37-
vbare.workspace = true
3838

3939
[dev-dependencies]
4040
gas.workspace = true

0 commit comments

Comments
 (0)