diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index 60d3ee3ce4cd8..ed41e768c8c20 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -180,6 +180,7 @@ fn persist_config(config: &SystemVars) -> PersistParameters { storage_sink_minimum_batch_updates: Some( config.storage_persist_sink_minimum_batch_updates(), ), + storage_source_decode_fuel: Some(config.storage_source_decode_fuel()), next_listen_batch_retryer: Some(RetryParameters { initial_backoff: config.persist_next_listen_batch_retryer_initial_backoff(), multiplier: config.persist_next_listen_batch_retryer_multiplier(), diff --git a/src/compute/src/render/mod.rs b/src/compute/src/render/mod.rs index cbea84e29ceb5..1ab233d8fd6b1 100644 --- a/src/compute/src/render/mod.rs +++ b/src/compute/src/render/mod.rs @@ -236,8 +236,6 @@ pub fn build_compute_dataflow( dataflow.until.clone(), mfp.as_mut(), Some(flow_control), - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); // If `mfp` is non-identity, we need to apply what remains. diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs index 5f47b2ae66ca9..2fc738a294941 100644 --- a/src/compute/src/sink/persist_sink.rs +++ b/src/compute/src/sink/persist_sink.rs @@ -102,8 +102,6 @@ where Antichain::new(), // we want all updates None, // no MFP None, // no flow control - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); use differential_dataflow::AsCollection; let persist_collection = ok_stream diff --git a/src/persist-client/src/cfg.proto b/src/persist-client/src/cfg.proto index 6afd83c80b51e..5ecaa848c129d 100644 --- a/src/persist-client/src/cfg.proto +++ b/src/persist-client/src/cfg.proto @@ -19,6 +19,7 @@ message ProtoPersistParameters { mz_proto.ProtoDuration consensus_connect_timeout = 3; optional uint64 sink_minimum_batch_updates = 4; optional uint64 storage_sink_minimum_batch_updates = 8; + optional uint64 storage_source_decode_fuel = 21; optional ProtoRetryParameters next_listen_batch_retryer = 5; optional uint64 stats_audit_percent = 9; optional bool stats_collection_enabled = 6; diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index bce68788c2127..1c2f9f7a223e5 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -191,6 +191,9 @@ impl PersistConfig { storage_sink_minimum_batch_updates: AtomicUsize::new( Self::DEFAULT_SINK_MINIMUM_BATCH_UPDATES, ), + storage_source_decode_fuel: AtomicUsize::new( + Self::DEFAULT_STORAGE_SOURCE_DECODE_FUEL, + ), next_listen_batch_retryer: RwLock::new(Self::DEFAULT_NEXT_LISTEN_BATCH_RETRYER), stats_audit_percent: AtomicUsize::new(Self::DEFAULT_STATS_AUDIT_PERCENT), stats_collection_enabled: AtomicBool::new(Self::DEFAULT_STATS_COLLECTION_ENABLED), @@ -250,6 +253,14 @@ impl PersistConfig { .load(DynamicConfig::LOAD_ORDERING) } + /// The maximum amount of work to do in the persist_source mfp_and_decode + /// operator before yielding. + pub fn storage_source_decode_fuel(&self) -> usize { + self.dynamic + .storage_source_decode_fuel + .load(DynamicConfig::LOAD_ORDERING) + } + /// Returns a new instance of [PersistConfig] for tests. pub fn new_for_tests() -> Self { use mz_build_info::DUMMY_BUILD_INFO; @@ -319,6 +330,9 @@ impl PersistConfig { /// Default value for [`PersistConfig::sink_minimum_batch_updates`]. pub const DEFAULT_SINK_MINIMUM_BATCH_UPDATES: usize = 0; + /// Default value for [`PersistConfig::storage_source_decode_fuel`]. + pub const DEFAULT_STORAGE_SOURCE_DECODE_FUEL: usize = 1_000_000; + /// Default value for [`DynamicConfig::next_listen_batch_retry_params`]. pub const DEFAULT_NEXT_LISTEN_BATCH_RETRYER: RetryParameters = RetryParameters { initial_backoff: Duration::from_millis(4), @@ -425,6 +439,7 @@ pub struct DynamicConfig { reader_lease_duration: RwLock, sink_minimum_batch_updates: AtomicUsize, storage_sink_minimum_batch_updates: AtomicUsize, + storage_source_decode_fuel: AtomicUsize, stats_audit_percent: AtomicUsize, stats_collection_enabled: AtomicBool, stats_filter_enabled: AtomicBool, @@ -780,6 +795,8 @@ pub struct PersistParameters { pub sink_minimum_batch_updates: Option, /// Configures [`PersistConfig::storage_sink_minimum_batch_updates`]. pub storage_sink_minimum_batch_updates: Option, + /// Configures [`PersistConfig::storage_source_decode_fuel`]. + pub storage_source_decode_fuel: Option, /// Configures [`DynamicConfig::stats_audit_percent`]. pub stats_audit_percent: Option, /// Configures [`DynamicConfig::stats_collection_enabled`]. @@ -816,6 +833,7 @@ impl PersistParameters { reader_lease_duration: self_reader_lease_duration, sink_minimum_batch_updates: self_sink_minimum_batch_updates, storage_sink_minimum_batch_updates: self_storage_sink_minimum_batch_updates, + storage_source_decode_fuel: self_storage_source_decode_fuel, next_listen_batch_retryer: self_next_listen_batch_retryer, stats_audit_percent: self_stats_audit_percent, stats_collection_enabled: self_stats_collection_enabled, @@ -838,6 +856,7 @@ impl PersistParameters { reader_lease_duration: other_reader_lease_duration, sink_minimum_batch_updates: other_sink_minimum_batch_updates, storage_sink_minimum_batch_updates: other_storage_sink_minimum_batch_updates, + storage_source_decode_fuel: other_storage_source_decode_fuel, next_listen_batch_retryer: other_next_listen_batch_retryer, stats_audit_percent: other_stats_audit_percent, stats_collection_enabled: other_stats_collection_enabled, @@ -879,6 +898,9 @@ impl PersistParameters { if let Some(v) = other_storage_sink_minimum_batch_updates { *self_storage_sink_minimum_batch_updates = Some(v); } + if let Some(v) = other_storage_source_decode_fuel { + *self_storage_source_decode_fuel = Some(v); + } if let Some(v) = other_next_listen_batch_retryer { *self_next_listen_batch_retryer = Some(v); } @@ -926,6 +948,7 @@ impl PersistParameters { reader_lease_duration, sink_minimum_batch_updates, storage_sink_minimum_batch_updates, + storage_source_decode_fuel, next_listen_batch_retryer, stats_audit_percent, stats_collection_enabled, @@ -947,6 +970,7 @@ impl PersistParameters { && reader_lease_duration.is_none() && sink_minimum_batch_updates.is_none() && storage_sink_minimum_batch_updates.is_none() + && storage_source_decode_fuel.is_none() && next_listen_batch_retryer.is_none() && stats_audit_percent.is_none() && stats_collection_enabled.is_none() @@ -977,6 +1001,7 @@ impl PersistParameters { reader_lease_duration, sink_minimum_batch_updates, storage_sink_minimum_batch_updates, + storage_source_decode_fuel, next_listen_batch_retryer, stats_audit_percent, stats_collection_enabled, @@ -1053,6 +1078,11 @@ impl PersistParameters { DynamicConfig::STORE_ORDERING, ); } + if let Some(storage_source_decode_fuel) = storage_source_decode_fuel { + cfg.dynamic + .storage_source_decode_fuel + .store(*storage_source_decode_fuel, DynamicConfig::STORE_ORDERING); + } if let Some(retry_params) = next_listen_batch_retryer { let mut retry = cfg .dynamic @@ -1129,6 +1159,7 @@ impl RustType for PersistParameters { storage_sink_minimum_batch_updates: self .storage_sink_minimum_batch_updates .into_proto(), + storage_source_decode_fuel: self.storage_source_decode_fuel.into_proto(), next_listen_batch_retryer: self.next_listen_batch_retryer.into_proto(), stats_audit_percent: self.stats_audit_percent.into_proto(), stats_collection_enabled: self.stats_collection_enabled.into_proto(), @@ -1159,6 +1190,7 @@ impl RustType for PersistParameters { storage_sink_minimum_batch_updates: proto .storage_sink_minimum_batch_updates .into_rust()?, + storage_source_decode_fuel: proto.storage_source_decode_fuel.into_rust()?, next_listen_batch_retryer: proto.next_listen_batch_retryer.into_rust()?, stats_audit_percent: proto.stats_audit_percent.into_rust()?, stats_collection_enabled: proto.stats_collection_enabled.into_rust()?, diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 498cb6e982062..fd552b338b03e 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1086,6 +1086,15 @@ const STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES: ServerVar = ServerVar { internal: true }; +/// Controls [`mz_persist_client::cfg::PersistConfig::storage_source_decode_fuel`]. +const STORAGE_SOURCE_DECODE_FUEL: ServerVar = ServerVar { + name: UncasedStr::new("storage_source_decode_fuel"), + value: &PersistConfig::DEFAULT_STORAGE_SOURCE_DECODE_FUEL, + description: "The maximum amount of work to do in the persist_source mfp_and_decode \ + operator before yielding.", + internal: true, +}; + const STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: ServerVar = ServerVar { name: UncasedStr::new("storage_record_source_sink_namespaced_errors"), value: &true, @@ -2459,6 +2468,7 @@ impl SystemVars { .with_var(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO) .with_var(&PERSIST_SINK_MINIMUM_BATCH_UPDATES) .with_var(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES) + .with_var(&STORAGE_SOURCE_DECODE_FUEL) .with_var(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS) .with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF) .with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER) @@ -3054,6 +3064,10 @@ impl SystemVars { *self.expect_value(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES) } + pub fn storage_source_decode_fuel(&self) -> usize { + *self.expect_value(&STORAGE_SOURCE_DECODE_FUEL) + } + /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter. pub fn storage_record_source_sink_namespaced_errors(&self) -> bool { *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS) @@ -4879,6 +4893,7 @@ fn is_persist_config_var(name: &str) -> bool { || name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() || name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() || name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() + || name == STORAGE_SOURCE_DECODE_FUEL.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP.name() diff --git a/src/storage-operators/src/persist_source.rs b/src/storage-operators/src/persist_source.rs index 86f9b38682086..3490ee0894f1a 100644 --- a/src/storage-operators/src/persist_source.rs +++ b/src/storage-operators/src/persist_source.rs @@ -22,6 +22,7 @@ use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; use mz_ore::vec::VecExt; use mz_persist_client::cache::PersistClientCache; +use mz_persist_client::cfg::PersistConfig; use mz_persist_client::fetch::FetchedPart; use mz_persist_client::fetch::SerdeLeasedBatchPart; use mz_persist_client::operators::shard_source::shard_source; @@ -86,7 +87,7 @@ use crate::metrics::BackpressureMetrics; /// using [`timely::dataflow::operators::generic::operator::empty`]. /// /// [advanced by]: differential_dataflow::lattice::Lattice::advance_by -pub fn persist_source( +pub fn persist_source( scope: &mut G, source_id: GlobalId, persist_clients: Arc, @@ -95,7 +96,6 @@ pub fn persist_source( until: Antichain, map_filter_project: Option<&mut MfpPlan>, flow_control: Option>, - yield_fn: YFn, ) -> ( Stream, Stream, @@ -103,7 +103,6 @@ pub fn persist_source( ) where G: Scope, - YFn: Fn(Instant, usize) -> bool + 'static, { let (stream, token) = scope.scoped( &format!("skip_granular_backpressure({})", source_id), @@ -122,7 +121,6 @@ where summary: Refines::to_inner(fc.summary), metrics: fc.metrics, }), - yield_fn, ); (stream.leave(), token) }, @@ -143,7 +141,7 @@ type RefinedScope<'g, G> = Child<'g, G, (::Timestamp, u64)>; /// /// [advanced by]: differential_dataflow::lattice::Lattice::advance_by #[allow(clippy::needless_borrow)] -pub fn persist_source_core<'g, G, YFn>( +pub fn persist_source_core<'g, G>( scope: &RefinedScope<'g, G>, source_id: GlobalId, persist_clients: Arc, @@ -152,15 +150,14 @@ pub fn persist_source_core<'g, G, YFn>( until: Antichain, map_filter_project: Option<&mut MfpPlan>, flow_control: Option>>, - yield_fn: YFn, ) -> ( Stream, (Result, (mz_repr::Timestamp, u64), Diff)>, Rc, ) where G: Scope, - YFn: Fn(Instant, usize) -> bool + 'static, { + let cfg = persist_clients.cfg().clone(); let name = source_id.to_string(); let desc = metadata.relation_desc.clone(); let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone()); @@ -214,7 +211,7 @@ where } }, ); - let rows = decode_and_mfp(&fetched, &name, until, map_filter_project, yield_fn); + let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project); (rows, token) } @@ -242,16 +239,15 @@ fn filter_may_match( result.may_contain(Datum::True) || result.may_fail() } -pub fn decode_and_mfp( +pub fn decode_and_mfp( + cfg: PersistConfig, fetched: &Stream>, name: &str, until: Antichain, mut map_filter_project: Option<&mut MfpPlan>, - yield_fn: YFn, ) -> Stream, G::Timestamp, Diff)> where G: Scope, - YFn: Fn(Instant, usize) -> bool + 'static, { let scope = fetched.scope(); let mut builder = OperatorBuilder::new( @@ -291,6 +287,11 @@ where } }); + // Get the yield fuel once per schedule to amortize the cost of + // loading the atomic. + let yield_fuel = cfg.storage_source_decode_fuel(); + let yield_fn = |_, work| work >= yield_fuel; + let mut work = 0; let start_time = Instant::now(); let mut output = updates_output.activate(); @@ -300,7 +301,7 @@ where &mut work, &name, start_time, - &yield_fn, + yield_fn, &until, map_filter_project.as_ref(), &mut datum_vec, @@ -365,6 +366,13 @@ impl PendingWork { match (key, val) { (Ok(SourceData(Ok(row))), Ok(())) => { if let Some(mfp) = map_filter_project { + // We originally accounted work as the number of outputs, to give downstream + // operators a chance to reduce down anything we've emitted. This mfp call + // might have a restrictive filter, which would have been counted as no + // work. However, in practice, we've been decode_and_mfp be a source of + // interactivity loss during rehydration, so we now also count each mfp + // evaluation against our fuel. + *work += 1; let arena = mz_repr::RowArena::new(); let mut datums_local = datum_vec.borrow_with(&row); for result in mfp.evaluate( diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index 03116d7a09382..ae3fae0e0e939 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -56,8 +56,6 @@ pub(crate) fn render_sink<'g, G: Scope>( timely::progress::Antichain::new(), None, None, - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); tokens.push(source_token); diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index b3d6fbfa3fc38..05bbfe6ab0b0a 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -360,8 +360,6 @@ where Antichain::new(), None, None, - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); let (tx_source_ok, tx_source_err) = ( tx_source_ok_stream.as_collection(), @@ -483,8 +481,6 @@ where Antichain::new(), None, flow_control, - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); ( stream.as_collection(),