From f55db1bb82ec399e4699ac27758b8df806711120 Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:49:26 -0700 Subject: [PATCH] storage: make decode_and_mfp yield configurable We've seen evidence in production of this operator often going more than 5s without yielding, especially during rehydration and large unindexed SELECTs. This affects interactivity as well as the time between when a timely-driven Future is woken and when it is scheduled again. Attempt to fix this with two tweaks to our yield heuristics. First, make the fuel between yields configurable through LD. Second, also count each mfp evaluation against our fuel, in case it contains a restrictive filter. --- src/adapter/src/flags.rs | 1 + src/compute/src/render/mod.rs | 2 -- src/compute/src/sink/persist_sink.rs | 2 -- src/persist-client/src/cfg.proto | 1 + src/persist-client/src/cfg.rs | 32 +++++++++++++++++++++ src/sql/src/session/vars.rs | 15 ++++++++++ src/storage-operators/src/persist_source.rs | 32 +++++++++++++-------- src/storage/src/render/sinks.rs | 2 -- src/storage/src/render/sources.rs | 4 --- 9 files changed, 69 insertions(+), 22 deletions(-) diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index 60d3ee3ce4cd8..ed41e768c8c20 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -180,6 +180,7 @@ fn persist_config(config: &SystemVars) -> PersistParameters { storage_sink_minimum_batch_updates: Some( config.storage_persist_sink_minimum_batch_updates(), ), + storage_source_decode_fuel: Some(config.storage_source_decode_fuel()), next_listen_batch_retryer: Some(RetryParameters { initial_backoff: config.persist_next_listen_batch_retryer_initial_backoff(), multiplier: config.persist_next_listen_batch_retryer_multiplier(), diff --git a/src/compute/src/render/mod.rs b/src/compute/src/render/mod.rs index cbea84e29ceb5..1ab233d8fd6b1 100644 --- a/src/compute/src/render/mod.rs +++ b/src/compute/src/render/mod.rs @@ -236,8 +236,6 @@ pub fn build_compute_dataflow( dataflow.until.clone(), mfp.as_mut(), Some(flow_control), - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); // If `mfp` is non-identity, we need to apply what remains. diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs index 5f47b2ae66ca9..2fc738a294941 100644 --- a/src/compute/src/sink/persist_sink.rs +++ b/src/compute/src/sink/persist_sink.rs @@ -102,8 +102,6 @@ where Antichain::new(), // we want all updates None, // no MFP None, // no flow control - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); use differential_dataflow::AsCollection; let persist_collection = ok_stream diff --git a/src/persist-client/src/cfg.proto b/src/persist-client/src/cfg.proto index 6afd83c80b51e..5ecaa848c129d 100644 --- a/src/persist-client/src/cfg.proto +++ b/src/persist-client/src/cfg.proto @@ -19,6 +19,7 @@ message ProtoPersistParameters { mz_proto.ProtoDuration consensus_connect_timeout = 3; optional uint64 sink_minimum_batch_updates = 4; optional uint64 storage_sink_minimum_batch_updates = 8; + optional uint64 storage_source_decode_fuel = 21; optional ProtoRetryParameters next_listen_batch_retryer = 5; optional uint64 stats_audit_percent = 9; optional bool stats_collection_enabled = 6; diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index bce68788c2127..1c2f9f7a223e5 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -191,6 +191,9 @@ impl PersistConfig { storage_sink_minimum_batch_updates: AtomicUsize::new( Self::DEFAULT_SINK_MINIMUM_BATCH_UPDATES, ), + storage_source_decode_fuel: AtomicUsize::new( + Self::DEFAULT_STORAGE_SOURCE_DECODE_FUEL, + ), next_listen_batch_retryer: RwLock::new(Self::DEFAULT_NEXT_LISTEN_BATCH_RETRYER), stats_audit_percent: AtomicUsize::new(Self::DEFAULT_STATS_AUDIT_PERCENT), stats_collection_enabled: AtomicBool::new(Self::DEFAULT_STATS_COLLECTION_ENABLED), @@ -250,6 +253,14 @@ impl PersistConfig { .load(DynamicConfig::LOAD_ORDERING) } + /// The maximum amount of work to do in the persist_source mfp_and_decode + /// operator before yielding. + pub fn storage_source_decode_fuel(&self) -> usize { + self.dynamic + .storage_source_decode_fuel + .load(DynamicConfig::LOAD_ORDERING) + } + /// Returns a new instance of [PersistConfig] for tests. pub fn new_for_tests() -> Self { use mz_build_info::DUMMY_BUILD_INFO; @@ -319,6 +330,9 @@ impl PersistConfig { /// Default value for [`PersistConfig::sink_minimum_batch_updates`]. pub const DEFAULT_SINK_MINIMUM_BATCH_UPDATES: usize = 0; + /// Default value for [`PersistConfig::storage_source_decode_fuel`]. + pub const DEFAULT_STORAGE_SOURCE_DECODE_FUEL: usize = 1_000_000; + /// Default value for [`DynamicConfig::next_listen_batch_retry_params`]. pub const DEFAULT_NEXT_LISTEN_BATCH_RETRYER: RetryParameters = RetryParameters { initial_backoff: Duration::from_millis(4), @@ -425,6 +439,7 @@ pub struct DynamicConfig { reader_lease_duration: RwLock, sink_minimum_batch_updates: AtomicUsize, storage_sink_minimum_batch_updates: AtomicUsize, + storage_source_decode_fuel: AtomicUsize, stats_audit_percent: AtomicUsize, stats_collection_enabled: AtomicBool, stats_filter_enabled: AtomicBool, @@ -780,6 +795,8 @@ pub struct PersistParameters { pub sink_minimum_batch_updates: Option, /// Configures [`PersistConfig::storage_sink_minimum_batch_updates`]. pub storage_sink_minimum_batch_updates: Option, + /// Configures [`PersistConfig::storage_source_decode_fuel`]. + pub storage_source_decode_fuel: Option, /// Configures [`DynamicConfig::stats_audit_percent`]. pub stats_audit_percent: Option, /// Configures [`DynamicConfig::stats_collection_enabled`]. @@ -816,6 +833,7 @@ impl PersistParameters { reader_lease_duration: self_reader_lease_duration, sink_minimum_batch_updates: self_sink_minimum_batch_updates, storage_sink_minimum_batch_updates: self_storage_sink_minimum_batch_updates, + storage_source_decode_fuel: self_storage_source_decode_fuel, next_listen_batch_retryer: self_next_listen_batch_retryer, stats_audit_percent: self_stats_audit_percent, stats_collection_enabled: self_stats_collection_enabled, @@ -838,6 +856,7 @@ impl PersistParameters { reader_lease_duration: other_reader_lease_duration, sink_minimum_batch_updates: other_sink_minimum_batch_updates, storage_sink_minimum_batch_updates: other_storage_sink_minimum_batch_updates, + storage_source_decode_fuel: other_storage_source_decode_fuel, next_listen_batch_retryer: other_next_listen_batch_retryer, stats_audit_percent: other_stats_audit_percent, stats_collection_enabled: other_stats_collection_enabled, @@ -879,6 +898,9 @@ impl PersistParameters { if let Some(v) = other_storage_sink_minimum_batch_updates { *self_storage_sink_minimum_batch_updates = Some(v); } + if let Some(v) = other_storage_source_decode_fuel { + *self_storage_source_decode_fuel = Some(v); + } if let Some(v) = other_next_listen_batch_retryer { *self_next_listen_batch_retryer = Some(v); } @@ -926,6 +948,7 @@ impl PersistParameters { reader_lease_duration, sink_minimum_batch_updates, storage_sink_minimum_batch_updates, + storage_source_decode_fuel, next_listen_batch_retryer, stats_audit_percent, stats_collection_enabled, @@ -947,6 +970,7 @@ impl PersistParameters { && reader_lease_duration.is_none() && sink_minimum_batch_updates.is_none() && storage_sink_minimum_batch_updates.is_none() + && storage_source_decode_fuel.is_none() && next_listen_batch_retryer.is_none() && stats_audit_percent.is_none() && stats_collection_enabled.is_none() @@ -977,6 +1001,7 @@ impl PersistParameters { reader_lease_duration, sink_minimum_batch_updates, storage_sink_minimum_batch_updates, + storage_source_decode_fuel, next_listen_batch_retryer, stats_audit_percent, stats_collection_enabled, @@ -1053,6 +1078,11 @@ impl PersistParameters { DynamicConfig::STORE_ORDERING, ); } + if let Some(storage_source_decode_fuel) = storage_source_decode_fuel { + cfg.dynamic + .storage_source_decode_fuel + .store(*storage_source_decode_fuel, DynamicConfig::STORE_ORDERING); + } if let Some(retry_params) = next_listen_batch_retryer { let mut retry = cfg .dynamic @@ -1129,6 +1159,7 @@ impl RustType for PersistParameters { storage_sink_minimum_batch_updates: self .storage_sink_minimum_batch_updates .into_proto(), + storage_source_decode_fuel: self.storage_source_decode_fuel.into_proto(), next_listen_batch_retryer: self.next_listen_batch_retryer.into_proto(), stats_audit_percent: self.stats_audit_percent.into_proto(), stats_collection_enabled: self.stats_collection_enabled.into_proto(), @@ -1159,6 +1190,7 @@ impl RustType for PersistParameters { storage_sink_minimum_batch_updates: proto .storage_sink_minimum_batch_updates .into_rust()?, + storage_source_decode_fuel: proto.storage_source_decode_fuel.into_rust()?, next_listen_batch_retryer: proto.next_listen_batch_retryer.into_rust()?, stats_audit_percent: proto.stats_audit_percent.into_rust()?, stats_collection_enabled: proto.stats_collection_enabled.into_rust()?, diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 498cb6e982062..fd552b338b03e 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1086,6 +1086,15 @@ const STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES: ServerVar = ServerVar { internal: true }; +/// Controls [`mz_persist_client::cfg::PersistConfig::storage_source_decode_fuel`]. +const STORAGE_SOURCE_DECODE_FUEL: ServerVar = ServerVar { + name: UncasedStr::new("storage_source_decode_fuel"), + value: &PersistConfig::DEFAULT_STORAGE_SOURCE_DECODE_FUEL, + description: "The maximum amount of work to do in the persist_source mfp_and_decode \ + operator before yielding.", + internal: true, +}; + const STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: ServerVar = ServerVar { name: UncasedStr::new("storage_record_source_sink_namespaced_errors"), value: &true, @@ -2459,6 +2468,7 @@ impl SystemVars { .with_var(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO) .with_var(&PERSIST_SINK_MINIMUM_BATCH_UPDATES) .with_var(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES) + .with_var(&STORAGE_SOURCE_DECODE_FUEL) .with_var(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS) .with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF) .with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER) @@ -3054,6 +3064,10 @@ impl SystemVars { *self.expect_value(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES) } + pub fn storage_source_decode_fuel(&self) -> usize { + *self.expect_value(&STORAGE_SOURCE_DECODE_FUEL) + } + /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter. pub fn storage_record_source_sink_namespaced_errors(&self) -> bool { *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS) @@ -4879,6 +4893,7 @@ fn is_persist_config_var(name: &str) -> bool { || name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() || name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() || name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() + || name == STORAGE_SOURCE_DECODE_FUEL.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP.name() diff --git a/src/storage-operators/src/persist_source.rs b/src/storage-operators/src/persist_source.rs index 86f9b38682086..3490ee0894f1a 100644 --- a/src/storage-operators/src/persist_source.rs +++ b/src/storage-operators/src/persist_source.rs @@ -22,6 +22,7 @@ use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; use mz_ore::vec::VecExt; use mz_persist_client::cache::PersistClientCache; +use mz_persist_client::cfg::PersistConfig; use mz_persist_client::fetch::FetchedPart; use mz_persist_client::fetch::SerdeLeasedBatchPart; use mz_persist_client::operators::shard_source::shard_source; @@ -86,7 +87,7 @@ use crate::metrics::BackpressureMetrics; /// using [`timely::dataflow::operators::generic::operator::empty`]. /// /// [advanced by]: differential_dataflow::lattice::Lattice::advance_by -pub fn persist_source( +pub fn persist_source( scope: &mut G, source_id: GlobalId, persist_clients: Arc, @@ -95,7 +96,6 @@ pub fn persist_source( until: Antichain, map_filter_project: Option<&mut MfpPlan>, flow_control: Option>, - yield_fn: YFn, ) -> ( Stream, Stream, @@ -103,7 +103,6 @@ pub fn persist_source( ) where G: Scope, - YFn: Fn(Instant, usize) -> bool + 'static, { let (stream, token) = scope.scoped( &format!("skip_granular_backpressure({})", source_id), @@ -122,7 +121,6 @@ where summary: Refines::to_inner(fc.summary), metrics: fc.metrics, }), - yield_fn, ); (stream.leave(), token) }, @@ -143,7 +141,7 @@ type RefinedScope<'g, G> = Child<'g, G, (::Timestamp, u64)>; /// /// [advanced by]: differential_dataflow::lattice::Lattice::advance_by #[allow(clippy::needless_borrow)] -pub fn persist_source_core<'g, G, YFn>( +pub fn persist_source_core<'g, G>( scope: &RefinedScope<'g, G>, source_id: GlobalId, persist_clients: Arc, @@ -152,15 +150,14 @@ pub fn persist_source_core<'g, G, YFn>( until: Antichain, map_filter_project: Option<&mut MfpPlan>, flow_control: Option>>, - yield_fn: YFn, ) -> ( Stream, (Result, (mz_repr::Timestamp, u64), Diff)>, Rc, ) where G: Scope, - YFn: Fn(Instant, usize) -> bool + 'static, { + let cfg = persist_clients.cfg().clone(); let name = source_id.to_string(); let desc = metadata.relation_desc.clone(); let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone()); @@ -214,7 +211,7 @@ where } }, ); - let rows = decode_and_mfp(&fetched, &name, until, map_filter_project, yield_fn); + let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project); (rows, token) } @@ -242,16 +239,15 @@ fn filter_may_match( result.may_contain(Datum::True) || result.may_fail() } -pub fn decode_and_mfp( +pub fn decode_and_mfp( + cfg: PersistConfig, fetched: &Stream>, name: &str, until: Antichain, mut map_filter_project: Option<&mut MfpPlan>, - yield_fn: YFn, ) -> Stream, G::Timestamp, Diff)> where G: Scope, - YFn: Fn(Instant, usize) -> bool + 'static, { let scope = fetched.scope(); let mut builder = OperatorBuilder::new( @@ -291,6 +287,11 @@ where } }); + // Get the yield fuel once per schedule to amortize the cost of + // loading the atomic. + let yield_fuel = cfg.storage_source_decode_fuel(); + let yield_fn = |_, work| work >= yield_fuel; + let mut work = 0; let start_time = Instant::now(); let mut output = updates_output.activate(); @@ -300,7 +301,7 @@ where &mut work, &name, start_time, - &yield_fn, + yield_fn, &until, map_filter_project.as_ref(), &mut datum_vec, @@ -365,6 +366,13 @@ impl PendingWork { match (key, val) { (Ok(SourceData(Ok(row))), Ok(())) => { if let Some(mfp) = map_filter_project { + // We originally accounted work as the number of outputs, to give downstream + // operators a chance to reduce down anything we've emitted. This mfp call + // might have a restrictive filter, which would have been counted as no + // work. However, in practice, we've been decode_and_mfp be a source of + // interactivity loss during rehydration, so we now also count each mfp + // evaluation against our fuel. + *work += 1; let arena = mz_repr::RowArena::new(); let mut datums_local = datum_vec.borrow_with(&row); for result in mfp.evaluate( diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index 03116d7a09382..ae3fae0e0e939 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -56,8 +56,6 @@ pub(crate) fn render_sink<'g, G: Scope>( timely::progress::Antichain::new(), None, None, - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); tokens.push(source_token); diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index b3d6fbfa3fc38..05bbfe6ab0b0a 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -360,8 +360,6 @@ where Antichain::new(), None, None, - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); let (tx_source_ok, tx_source_err) = ( tx_source_ok_stream.as_collection(), @@ -483,8 +481,6 @@ where Antichain::new(), None, flow_control, - // Copy the logic in DeltaJoin/Get/Join to start. - |_timer, count| count > 1_000_000, ); ( stream.as_collection(),