From c451b2a3e5dd704fd52ff948f27659f71a2f5f09 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 21 Oct 2025 14:56:42 +0200 Subject: [PATCH 1/2] persist: register schema at write time, not writer open time This commit changes persist clients to defer calling `register_schema` on a shard from write handle creation time to the time of the first append operation. The rationale is that we don't need to enforce a schema if we are not attempting to write to a shard. The plan is for 0dt upgrades to make good use of the new, more lenient behavior. Read-only environments can open write handles with evolved schemas without having to durably write down the new schemas. This will allow us to back out of version upgrades without the risk of permanently poisoning the persist state for lower versions. --- src/persist-client/src/internal/apply.rs | 16 +++++ src/persist-client/src/internal/encoding.rs | 2 - src/persist-client/src/internal/machine.rs | 5 ++ src/persist-client/src/lib.rs | 28 ++------ src/persist-client/src/schema.rs | 4 +- src/persist-client/src/write.rs | 35 ++++++++++ src/txn-wal/src/txns.rs | 74 +++++++++++---------- 7 files changed, 105 insertions(+), 59 deletions(-) diff --git a/src/persist-client/src/internal/apply.rs b/src/persist-client/src/internal/apply.rs index 83274093fb68e..38c6fbb8df6dc 100644 --- a/src/persist-client/src/internal/apply.rs +++ b/src/persist-client/src/internal/apply.rs @@ -254,6 +254,22 @@ where }) } + /// Returns the ID of the given schema, if known at the current state. + pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option { + self.state + .read_lock(&self.metrics.locks.applier_read_cacheable, |state| { + // The common case is that the requested schema is a recent one, so as a minor + // optimization, do this search in reverse order. + let mut schemas = state.collections.schemas.iter().rev(); + schemas + .find(|(_, x)| { + K::decode_schema(&x.key) == *key_schema + && V::decode_schema(&x.val) == *val_schema + }) + .map(|(id, _)| *id) + }) + } + /// Returns whether the current's state `since` and `upper` are both empty. /// /// Due to sharing state with other handles, successive reads to this fn or any other may diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index 782f6702c4f72..d15f43f6d33df 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -65,8 +65,6 @@ use crate::{PersistConfig, ShardId, WriterId, cfg}; /// A key and value `Schema` of data written to a batch or shard. #[derive(Debug)] pub struct Schemas { - // TODO: Remove the Option once this finishes rolling out and all shards - // have a registered schema. /// Id under which this schema is registered in the shard's schema registry, /// if any. pub id: Option, diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index ded5b05c5ad69..5f1cfdc29706d 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -700,6 +700,11 @@ where self.applier.latest_schema() } + /// Returns the ID of the given schema, if known at the current state. + pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option { + self.applier.find_schema(key_schema, val_schema) + } + /// See [crate::PersistClient::compare_and_evolve_schema]. /// /// TODO: Unify this with [Self::register_schema]? diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 288983df7eda5..91253a19de2a1 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice; use itertools::Itertools; use mz_build_info::{BuildInfo, build_info}; use mz_dyncfg::ConfigSet; -use mz_ore::{instrument, soft_assert_or_log}; +use mz_ore::instrument; use mz_persist::location::{Blob, Consensus, ExternalError}; use mz_persist_types::schema::SchemaId; use mz_persist_types::{Codec, Codec64, Opaque}; @@ -490,10 +490,6 @@ impl PersistClient { /// /// Use this to save latency and a bit of persist traffic if you're just /// going to immediately drop or expire the [ReadHandle]. - /// - /// The `_schema` parameter is currently unused, but should be an object - /// that represents the schema of the data in the shard. This will be required - /// in the future. #[instrument(level = "debug", fields(shard = %shard_id))] pub async fn open_writer( &self, @@ -511,23 +507,11 @@ impl PersistClient { let machine = self.make_machine(shard_id, diagnostics.clone()).await?; let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime)); - // TODO: Because schemas are ordered, as part of the persist schema - // changes work, we probably want to build some way to allow persist - // users to control the order. For example, maybe a - // `PersistClient::compare_and_append_schema(current_schema_id, - // next_schema)`. Presumably this would then be passed in to open_writer - // instead of us implicitly registering it here. - // NB: The overwhelming common case is that this schema is already - // registered. In this case, the cmd breaks early and nothing is - // written to (or read from) CRDB. - let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await; - maintenance.start_performing(&machine, &gc); - soft_assert_or_log!( - schema_id.is_some(), - "unable to register schemas {:?} {:?}", - key_schema, - val_schema, - ); + // We defer registering the schema until write time, to allow opening + // write handles in a "read-only" mode where they don't implicitly + // modify persist state. But it might already be registered, in which + // case we can fetch its ID. + let schema_id = machine.find_schema(&*key_schema, &*val_schema); let writer_id = WriterId::new(); let schemas = Schemas { diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs index 70ee6303cede9..285facb963be5 100644 --- a/src/persist-client/src/schema.rs +++ b/src/persist-client/src/schema.rs @@ -605,7 +605,7 @@ mod tests { let schema0 = StringsSchema(vec![false]); let schema1 = StringsSchema(vec![false, true]); - let write0 = client + let mut write0 = client .open_writer::( shard_id, Arc::new(schema0.clone()), @@ -614,6 +614,8 @@ mod tests { ) .await .unwrap(); + + write0.ensure_schema_registered().await; assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0)); // Not backward compatible (yet... we don't support dropping a column at diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 52de8ea125f3d..07a85181d911c 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -221,6 +221,31 @@ where self.write_schemas.id } + /// Registers the write schema, if it isn't already registered. + /// + /// # Panics + /// + /// This method expects that either the shard doesn't yet have any schema registered, or one of + /// the registered schemas is the same as the write schema. If all registered schemas are + /// different from the write schema, it panics. + pub async fn ensure_schema_registered(&mut self) -> SchemaId { + let Schemas { id, key, val } = &self.write_schemas; + + if let Some(id) = id { + return *id; + } + + let (schema_id, maintenance) = self.machine.register_schema(key, val).await; + maintenance.start_performing(&self.machine, &self.gc); + + let Some(schema_id) = schema_id else { + panic!("unable to register schemas: {key:?} {val:?}"); + }; + + self.write_schemas.id = Some(schema_id); + schema_id + } + /// A cached version of the shard-global `upper` frontier. /// /// This is the most recent upper discovered by this handle. It is @@ -507,6 +532,16 @@ where } } + // If we are writing any data, we require a registered write schema. + // + // We allow appending empty batches without a registered schema, because + // some clients expect to be able to advance the write frontier without + // knowing the schema. For example, shard finalization may not always + // know the schemas of shards to be finalized. + if batches.iter().any(|b| b.batch.len > 0) && self.schema_id().is_none() { + self.ensure_schema_registered().await; + } + let lower = expected_upper.clone(); let upper = new_upper; let since = Antichain::from_elem(T::minimum()); diff --git a/src/txn-wal/src/txns.rs b/src/txn-wal/src/txns.rs index 88d20f69e7226..6d862dceea4d5 100644 --- a/src/txn-wal/src/txns.rs +++ b/src/txn-wal/src/txns.rs @@ -214,7 +214,15 @@ where ) -> Result { let op = &Arc::clone(&self.metrics).register; op.run(async { - let data_writes = data_writes.into_iter().collect::>(); + let mut data_writes = data_writes.into_iter().collect::>(); + + // The txns system requires that all participating data shards have a + // schema registered. Importantly, we must register a data shard's + // schema _before_ we publish it to the txns shard. + for data_write in &mut data_writes { + data_write.ensure_schema_registered().await; + } + let updates = data_writes .iter() .map(|data_write| { @@ -285,13 +293,11 @@ where } } for data_write in data_writes { - let new_schema_id = data_write.schema_id(); - // If we already have a write handle for a newer version of a table, don't replace // it! Currently we only support adding columns to tables with a default value, so // the latest/newest schema will always be the most complete. // - // TODO(alter_table): Revist when we support dropping columns. + // TODO(alter_table): Revisit when we support dropping columns. match self.datas.data_write_for_commit.get(&data_write.shard_id()) { None => { self.datas @@ -299,31 +305,35 @@ where .insert(data_write.shard_id(), DataWriteCommit(data_write)); } Some(previous) => { - match (previous.schema_id(), new_schema_id) { - (Some(previous_id), None) => { - mz_ore::soft_panic_or_log!( - "tried registering a WriteHandle replacing one with a SchemaId prev_schema_id: {:?} shard_id: {:?}", - previous_id, - previous.shard_id(), - ); - }, - (Some(previous_id), Some(new_id)) if previous_id > new_id => { - mz_ore::soft_panic_or_log!( - "tried registering a WriteHandle with an older SchemaId prev_schema_id: {:?} new_schema_id: {:?} shard_id: {:?}", - previous_id, - new_id, - previous.shard_id(), - ); - }, - (previous_schema_id, new_schema_id) => { - if previous_schema_id.is_none() && new_schema_id.is_none() { - tracing::warn!("replacing WriteHandle without any SchemaIds to reason about"); - } else { - tracing::info!(?previous_schema_id, ?new_schema_id, shard_id = ?previous.shard_id(), "replacing WriteHandle"); - } - self.datas.data_write_for_commit.insert(data_write.shard_id(), DataWriteCommit(data_write)); - } + let new_schema_id = data_write.schema_id().expect("ensured above"); + + if let Some(prev_schema_id) = previous.schema_id() + && prev_schema_id > new_schema_id + { + mz_ore::soft_panic_or_log!( + "tried registering a WriteHandle with an older SchemaId; \ + prev_schema_id: {} new_schema_id: {} shard_id: {}", + prev_schema_id, + new_schema_id, + previous.shard_id(), + ); + continue; + } else if previous.schema_id().is_none() { + mz_ore::soft_panic_or_log!( + "encountered data shard without a schema; shard_id: {}", + previous.shard_id(), + ); } + + tracing::info!( + prev_schema_id = ?previous.schema_id(), + ?new_schema_id, + shard_id = %previous.shard_id(), + "replacing WriteHandle" + ); + self.datas + .data_write_for_commit + .insert(data_write.shard_id(), DataWriteCommit(data_write)); } } } @@ -756,12 +766,8 @@ where .expect("codecs have not changed"); let (key_schema, val_schema) = match schemas { Some((_, key_schema, val_schema)) => (Arc::new(key_schema), Arc::new(val_schema)), - // - For new shards we will always have at least one schema - // registered by the time we reach this point, because that - // happens at txn-registration time. - // - For pre-existing shards, every txns shard will have had - // open_writer called on it at least once in the previous release, - // so the schema should exist. + // We will always have at least one schema registered by the time we reach this point, + // because that is ensured at txn-registration time. None => unreachable!("data shard {} should have a schema", data_id), }; let wrapped = self From 990ffbbb7f382c1b6603c0c743c49f7eff335c46 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 28 Oct 2025 12:56:58 +0100 Subject: [PATCH 2/2] persist: provide `WriteHandle::advance_upper` ... and use it to enable shard finalization without a known shard schema. --- src/persist-client/src/cli/admin.rs | 17 +----- src/persist-client/src/lib.rs | 14 +---- src/persist-client/src/write.rs | 55 ++++++++++++++++--- src/storage-client/src/storage_collections.rs | 33 +++-------- 4 files changed, 56 insertions(+), 63 deletions(-) diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index e5367f9caf108..65d8b0bc3f43b 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -231,7 +231,6 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> { Arc::clone(&pubsub_sender), )); - // We need a PersistClient to open a write handle so we can append an empty batch. let persist_client = PersistClient::new( cfg, blob, @@ -259,21 +258,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> { diagnostics, ) .await?; - - if !write_handle.upper().is_empty() { - let empty_batch: Vec<( - (crate::cli::inspect::K, crate::cli::inspect::V), - u64, - i64, - )> = vec![]; - let lower = write_handle.upper().clone(); - let upper = Antichain::new(); - - let result = write_handle.append(empty_batch, lower, upper).await?; - if let Err(err) = result { - anyhow::bail!("failed to force downgrade upper, {err:?}"); - } - } + write_handle.advance_upper(&Antichain::new()).await; } if force_downgrade_since { diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 91253a19de2a1..34420f25b2536 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -1976,7 +1976,6 @@ mod tests { #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn finalize_empty_shard(dyncfgs: ConfigUpdates) { - const EMPTY: &[(((), ()), u64, i64)] = &[]; let persist_client = new_test_client(&dyncfgs).await; let shard_id = ShardId::new(); @@ -1990,11 +1989,7 @@ mod tests { // Advance since and upper to empty, which is a pre-requisite for // finalization/tombstoning. let () = read.downgrade_since(&Antichain::new()).await; - let () = write - .compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new()) - .await - .expect("usage should be valid") - .expect("upper should match"); + let () = write.advance_upper(&Antichain::new()).await; let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests()) @@ -2031,7 +2026,6 @@ mod tests { #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn finalize_shard(dyncfgs: ConfigUpdates) { - const EMPTY: &[(((), ()), u64, i64)] = &[]; const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)]; let persist_client = new_test_client(&dyncfgs).await; @@ -2053,11 +2047,7 @@ mod tests { // Advance since and upper to empty, which is a pre-requisite for // finalization/tombstoning. let () = read.downgrade_since(&Antichain::new()).await; - let () = write - .compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new()) - .await - .expect("usage should be valid") - .expect("upper should match"); + let () = write.advance_upper(&Antichain::new()).await; let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests()) diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 07a85181d911c..4927e9008df06 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -281,6 +281,50 @@ where &self.upper } + /// Advance the shard's upper by the given frontier. + /// + /// If the provided `target` is less than or equal to the shard's upper, this is a no-op. + /// + /// In contrast to the various compare-and-append methods, this method does not require the + /// handle's write schema to be registered with the shard. That is, it is fine to use a dummy + /// schema when creating a writer just to advance a shard upper. + pub async fn advance_upper(&mut self, target: &Antichain) { + // We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is + // already beyond the target. + let mut lower = self.shared_upper().clone(); + + while !PartialOrder::less_equal(target, &lower) { + let since = Antichain::from_elem(T::minimum()); + let desc = Description::new(lower.clone(), target.clone(), since); + let batch = HollowBatch::empty(desc); + + let heartbeat_timestamp = (self.cfg.now)(); + let res = self + .machine + .compare_and_append( + &batch, + &self.writer_id, + &self.debug_state, + heartbeat_timestamp, + ) + .await; + + use CompareAndAppendRes::*; + let new_upper = match res { + Success(_seq_no, maintenance) => { + maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref()); + batch.desc.upper().clone() + } + UpperMismatch(_seq_no, actual_upper) => actual_upper, + InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"), + InlineBackpressure => unreachable!("batch was empty"), + }; + + self.upper.clone_from(&new_upper); + lower = new_upper; + } + } + /// Applies `updates` to this shard and downgrades this handle's upper to /// `upper`. /// @@ -532,15 +576,8 @@ where } } - // If we are writing any data, we require a registered write schema. - // - // We allow appending empty batches without a registered schema, because - // some clients expect to be able to advance the write frontier without - // knowing the schema. For example, shard finalization may not always - // know the schemas of shards to be finalized. - if batches.iter().any(|b| b.batch.len > 0) && self.schema_id().is_none() { - self.ensure_schema_registered().await; - } + // Before we append any data, we require a registered write schema. + self.ensure_schema_registered().await; let lower = expected_upper.clone(); let upper = new_upper; diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index ace20dd86e326..1c1eef7c780b9 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -3248,40 +3248,23 @@ async fn finalize_shards_task( Some(shard_id) } else { debug!(%shard_id, "finalizing shard"); - let finalize = || async move { + let finalize = || async move { // TODO: thread the global ID into the shard finalization WAL let diagnostics = Diagnostics::from_purpose("finalizing shards"); - let schemas = persist_client.latest_schema::(shard_id, diagnostics.clone()).await.expect("codecs have not changed"); - let (key_schema, val_schema) = match schemas { - Some((_, key_schema, val_schema)) => (key_schema, val_schema), - None => (RelationDesc::empty(), UnitSchema), - }; - - let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![]; + // We only use the writer to advance the upper, so using a dummy schema is + // fine. let mut write_handle: WriteHandle = persist_client .open_writer( shard_id, - Arc::new(key_schema), - Arc::new(val_schema), + Arc::new(RelationDesc::empty()), + Arc::new(UnitSchema), diagnostics, ) .await .expect("invalid persist usage"); - - let upper = write_handle.upper(); - - if !upper.is_empty() { - let append = write_handle - .append(empty_batch, upper.clone(), Antichain::new()) - .await?; - - if let Err(e) = append { - warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}"); - return Ok(()); - } - } + write_handle.advance_upper(&Antichain::new()).await; write_handle.expire().await; if force_downgrade_since { @@ -3317,9 +3300,7 @@ async fn finalize_shards_task( .compare_and_downgrade_since(&epoch, (&epoch, &new_since)) .await; if let Err(e) = downgrade { - warn!( - "tried to finalize a shard with an advancing epoch: {e:?}" - ); + warn!("tried to finalize a shard with an advancing epoch: {e:?}"); return Ok(()); } // Not available now, so finalization is broken.