Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ fn persist_config(config: &SystemVars) -> PersistParameters {
storage_sink_minimum_batch_updates: Some(
config.storage_persist_sink_minimum_batch_updates(),
),
storage_source_decode_fuel: Some(config.storage_source_decode_fuel()),
next_listen_batch_retryer: Some(RetryParameters {
initial_backoff: config.persist_next_listen_batch_retryer_initial_backoff(),
multiplier: config.persist_next_listen_batch_retryer_multiplier(),
Expand Down
2 changes: 0 additions & 2 deletions src/compute/src/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ pub fn build_compute_dataflow<A: Allocate>(
dataflow.until.clone(),
mfp.as_mut(),
Some(flow_control),
// Copy the logic in DeltaJoin/Get/Join to start.
|_timer, count| count > 1_000_000,
);

// If `mfp` is non-identity, we need to apply what remains.
Expand Down
2 changes: 0 additions & 2 deletions src/compute/src/sink/persist_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ where
Antichain::new(), // we want all updates
None, // no MFP
None, // no flow control
// Copy the logic in DeltaJoin/Get/Join to start.
|_timer, count| count > 1_000_000,
);
use differential_dataflow::AsCollection;
let persist_collection = ok_stream
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/cfg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message ProtoPersistParameters {
mz_proto.ProtoDuration consensus_connect_timeout = 3;
optional uint64 sink_minimum_batch_updates = 4;
optional uint64 storage_sink_minimum_batch_updates = 8;
optional uint64 storage_source_decode_fuel = 21;
optional ProtoRetryParameters next_listen_batch_retryer = 5;
optional uint64 stats_audit_percent = 9;
optional bool stats_collection_enabled = 6;
Expand Down
32 changes: 32 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ impl PersistConfig {
storage_sink_minimum_batch_updates: AtomicUsize::new(
Self::DEFAULT_SINK_MINIMUM_BATCH_UPDATES,
),
storage_source_decode_fuel: AtomicUsize::new(
Self::DEFAULT_STORAGE_SOURCE_DECODE_FUEL,
),
next_listen_batch_retryer: RwLock::new(Self::DEFAULT_NEXT_LISTEN_BATCH_RETRYER),
stats_audit_percent: AtomicUsize::new(Self::DEFAULT_STATS_AUDIT_PERCENT),
stats_collection_enabled: AtomicBool::new(Self::DEFAULT_STATS_COLLECTION_ENABLED),
Expand Down Expand Up @@ -250,6 +253,14 @@ impl PersistConfig {
.load(DynamicConfig::LOAD_ORDERING)
}

/// The maximum amount of work to do in the persist_source mfp_and_decode
/// operator before yielding.
pub fn storage_source_decode_fuel(&self) -> usize {
self.dynamic
.storage_source_decode_fuel
.load(DynamicConfig::LOAD_ORDERING)
}

/// Returns a new instance of [PersistConfig] for tests.
pub fn new_for_tests() -> Self {
use mz_build_info::DUMMY_BUILD_INFO;
Expand Down Expand Up @@ -319,6 +330,9 @@ impl PersistConfig {
/// Default value for [`PersistConfig::sink_minimum_batch_updates`].
pub const DEFAULT_SINK_MINIMUM_BATCH_UPDATES: usize = 0;

/// Default value for [`PersistConfig::storage_source_decode_fuel`].
pub const DEFAULT_STORAGE_SOURCE_DECODE_FUEL: usize = 1_000_000;

/// Default value for [`DynamicConfig::next_listen_batch_retry_params`].
pub const DEFAULT_NEXT_LISTEN_BATCH_RETRYER: RetryParameters = RetryParameters {
initial_backoff: Duration::from_millis(4),
Expand Down Expand Up @@ -425,6 +439,7 @@ pub struct DynamicConfig {
reader_lease_duration: RwLock<Duration>,
sink_minimum_batch_updates: AtomicUsize,
storage_sink_minimum_batch_updates: AtomicUsize,
storage_source_decode_fuel: AtomicUsize,
stats_audit_percent: AtomicUsize,
stats_collection_enabled: AtomicBool,
stats_filter_enabled: AtomicBool,
Expand Down Expand Up @@ -780,6 +795,8 @@ pub struct PersistParameters {
pub sink_minimum_batch_updates: Option<usize>,
/// Configures [`PersistConfig::storage_sink_minimum_batch_updates`].
pub storage_sink_minimum_batch_updates: Option<usize>,
/// Configures [`PersistConfig::storage_source_decode_fuel`].
pub storage_source_decode_fuel: Option<usize>,
/// Configures [`DynamicConfig::stats_audit_percent`].
pub stats_audit_percent: Option<usize>,
/// Configures [`DynamicConfig::stats_collection_enabled`].
Expand Down Expand Up @@ -816,6 +833,7 @@ impl PersistParameters {
reader_lease_duration: self_reader_lease_duration,
sink_minimum_batch_updates: self_sink_minimum_batch_updates,
storage_sink_minimum_batch_updates: self_storage_sink_minimum_batch_updates,
storage_source_decode_fuel: self_storage_source_decode_fuel,
next_listen_batch_retryer: self_next_listen_batch_retryer,
stats_audit_percent: self_stats_audit_percent,
stats_collection_enabled: self_stats_collection_enabled,
Expand All @@ -838,6 +856,7 @@ impl PersistParameters {
reader_lease_duration: other_reader_lease_duration,
sink_minimum_batch_updates: other_sink_minimum_batch_updates,
storage_sink_minimum_batch_updates: other_storage_sink_minimum_batch_updates,
storage_source_decode_fuel: other_storage_source_decode_fuel,
next_listen_batch_retryer: other_next_listen_batch_retryer,
stats_audit_percent: other_stats_audit_percent,
stats_collection_enabled: other_stats_collection_enabled,
Expand Down Expand Up @@ -879,6 +898,9 @@ impl PersistParameters {
if let Some(v) = other_storage_sink_minimum_batch_updates {
*self_storage_sink_minimum_batch_updates = Some(v);
}
if let Some(v) = other_storage_source_decode_fuel {
*self_storage_source_decode_fuel = Some(v);
}
if let Some(v) = other_next_listen_batch_retryer {
*self_next_listen_batch_retryer = Some(v);
}
Expand Down Expand Up @@ -926,6 +948,7 @@ impl PersistParameters {
reader_lease_duration,
sink_minimum_batch_updates,
storage_sink_minimum_batch_updates,
storage_source_decode_fuel,
next_listen_batch_retryer,
stats_audit_percent,
stats_collection_enabled,
Expand All @@ -947,6 +970,7 @@ impl PersistParameters {
&& reader_lease_duration.is_none()
&& sink_minimum_batch_updates.is_none()
&& storage_sink_minimum_batch_updates.is_none()
&& storage_source_decode_fuel.is_none()
&& next_listen_batch_retryer.is_none()
&& stats_audit_percent.is_none()
&& stats_collection_enabled.is_none()
Expand Down Expand Up @@ -977,6 +1001,7 @@ impl PersistParameters {
reader_lease_duration,
sink_minimum_batch_updates,
storage_sink_minimum_batch_updates,
storage_source_decode_fuel,
next_listen_batch_retryer,
stats_audit_percent,
stats_collection_enabled,
Expand Down Expand Up @@ -1053,6 +1078,11 @@ impl PersistParameters {
DynamicConfig::STORE_ORDERING,
);
}
if let Some(storage_source_decode_fuel) = storage_source_decode_fuel {
cfg.dynamic
.storage_source_decode_fuel
.store(*storage_source_decode_fuel, DynamicConfig::STORE_ORDERING);
}
if let Some(retry_params) = next_listen_batch_retryer {
let mut retry = cfg
.dynamic
Expand Down Expand Up @@ -1129,6 +1159,7 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
storage_sink_minimum_batch_updates: self
.storage_sink_minimum_batch_updates
.into_proto(),
storage_source_decode_fuel: self.storage_source_decode_fuel.into_proto(),
next_listen_batch_retryer: self.next_listen_batch_retryer.into_proto(),
stats_audit_percent: self.stats_audit_percent.into_proto(),
stats_collection_enabled: self.stats_collection_enabled.into_proto(),
Expand Down Expand Up @@ -1159,6 +1190,7 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
storage_sink_minimum_batch_updates: proto
.storage_sink_minimum_batch_updates
.into_rust()?,
storage_source_decode_fuel: proto.storage_source_decode_fuel.into_rust()?,
next_listen_batch_retryer: proto.next_listen_batch_retryer.into_rust()?,
stats_audit_percent: proto.stats_audit_percent.into_rust()?,
stats_collection_enabled: proto.stats_collection_enabled.into_rust()?,
Expand Down
15 changes: 15 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,15 @@ const STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES: ServerVar<usize> = ServerVar {
internal: true
};

/// Controls [`mz_persist_client::cfg::PersistConfig::storage_source_decode_fuel`].
const STORAGE_SOURCE_DECODE_FUEL: ServerVar<usize> = ServerVar {
name: UncasedStr::new("storage_source_decode_fuel"),
value: &PersistConfig::DEFAULT_STORAGE_SOURCE_DECODE_FUEL,
description: "The maximum amount of work to do in the persist_source mfp_and_decode \
operator before yielding.",
internal: true,
};

const STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: ServerVar<bool> = ServerVar {
name: UncasedStr::new("storage_record_source_sink_namespaced_errors"),
value: &true,
Expand Down Expand Up @@ -2459,6 +2468,7 @@ impl SystemVars {
.with_var(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
.with_var(&PERSIST_SINK_MINIMUM_BATCH_UPDATES)
.with_var(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
.with_var(&STORAGE_SOURCE_DECODE_FUEL)
.with_var(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
.with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
.with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
Expand Down Expand Up @@ -3054,6 +3064,10 @@ impl SystemVars {
*self.expect_value(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
}

pub fn storage_source_decode_fuel(&self) -> usize {
*self.expect_value(&STORAGE_SOURCE_DECODE_FUEL)
}

/// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
*self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
Expand Down Expand Up @@ -4879,6 +4893,7 @@ fn is_persist_config_var(name: &str) -> bool {
|| name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
|| name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
|| name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
|| name == STORAGE_SOURCE_DECODE_FUEL.name()
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.name()
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.name()
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP.name()
Expand Down
32 changes: 20 additions & 12 deletions src/storage-operators/src/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::vec::VecExt;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::fetch::FetchedPart;
use mz_persist_client::fetch::SerdeLeasedBatchPart;
use mz_persist_client::operators::shard_source::shard_source;
Expand Down Expand Up @@ -86,7 +87,7 @@ use crate::metrics::BackpressureMetrics;
/// using [`timely::dataflow::operators::generic::operator::empty`].
///
/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
pub fn persist_source<G, YFn>(
pub fn persist_source<G>(
scope: &mut G,
source_id: GlobalId,
persist_clients: Arc<PersistClientCache>,
Expand All @@ -95,15 +96,13 @@ pub fn persist_source<G, YFn>(
until: Antichain<Timestamp>,
map_filter_project: Option<&mut MfpPlan>,
flow_control: Option<FlowControl<G>>,
yield_fn: YFn,
) -> (
Stream<G, (Row, Timestamp, Diff)>,
Stream<G, (DataflowError, Timestamp, Diff)>,
Rc<dyn Any>,
)
where
G: Scope<Timestamp = mz_repr::Timestamp>,
YFn: Fn(Instant, usize) -> bool + 'static,
{
let (stream, token) = scope.scoped(
&format!("skip_granular_backpressure({})", source_id),
Expand All @@ -122,7 +121,6 @@ where
summary: Refines::to_inner(fc.summary),
metrics: fc.metrics,
}),
yield_fn,
);
(stream.leave(), token)
},
Expand All @@ -143,7 +141,7 @@ type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, u64)>;
///
/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
#[allow(clippy::needless_borrow)]
pub fn persist_source_core<'g, G, YFn>(
pub fn persist_source_core<'g, G>(
scope: &RefinedScope<'g, G>,
source_id: GlobalId,
persist_clients: Arc<PersistClientCache>,
Expand All @@ -152,15 +150,14 @@ pub fn persist_source_core<'g, G, YFn>(
until: Antichain<Timestamp>,
map_filter_project: Option<&mut MfpPlan>,
flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
yield_fn: YFn,
) -> (
Stream<RefinedScope<'g, G>, (Result<Row, DataflowError>, (mz_repr::Timestamp, u64), Diff)>,
Rc<dyn Any>,
)
where
G: Scope<Timestamp = mz_repr::Timestamp>,
YFn: Fn(Instant, usize) -> bool + 'static,
{
let cfg = persist_clients.cfg().clone();
let name = source_id.to_string();
let desc = metadata.relation_desc.clone();
let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
Expand Down Expand Up @@ -214,7 +211,7 @@ where
}
},
);
let rows = decode_and_mfp(&fetched, &name, until, map_filter_project, yield_fn);
let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
(rows, token)
}

Expand Down Expand Up @@ -242,16 +239,15 @@ fn filter_may_match(
result.may_contain(Datum::True) || result.may_fail()
}

pub fn decode_and_mfp<G, YFn>(
pub fn decode_and_mfp<G>(
cfg: PersistConfig,
fetched: &Stream<G, FetchedPart<SourceData, (), Timestamp, Diff>>,
name: &str,
until: Antichain<Timestamp>,
mut map_filter_project: Option<&mut MfpPlan>,
yield_fn: YFn,
) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
where
G: Scope<Timestamp = (mz_repr::Timestamp, u64)>,
YFn: Fn(Instant, usize) -> bool + 'static,
{
let scope = fetched.scope();
let mut builder = OperatorBuilder::new(
Expand Down Expand Up @@ -291,6 +287,11 @@ where
}
});

// Get the yield fuel once per schedule to amortize the cost of
// loading the atomic.
let yield_fuel = cfg.storage_source_decode_fuel();
let yield_fn = |_, work| work >= yield_fuel;

let mut work = 0;
let start_time = Instant::now();
let mut output = updates_output.activate();
Expand All @@ -300,7 +301,7 @@ where
&mut work,
&name,
start_time,
&yield_fn,
yield_fn,
&until,
map_filter_project.as_ref(),
&mut datum_vec,
Expand Down Expand Up @@ -365,6 +366,13 @@ impl PendingWork {
match (key, val) {
(Ok(SourceData(Ok(row))), Ok(())) => {
if let Some(mfp) = map_filter_project {
// We originally accounted work as the number of outputs, to give downstream
// operators a chance to reduce down anything we've emitted. This mfp call
// might have a restrictive filter, which would have been counted as no
// work. However, in practice, we've been decode_and_mfp be a source of
// interactivity loss during rehydration, so we now also count each mfp
// evaluation against our fuel.
*work += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we want to move this up even further... both FetchedPart::next and until.less_equal(&time) can filter out an arbitrary number of tuples before producing a candidate for mfp to evaluate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Not sure what we could do about the filtering inside FetchedPart::next but maybe that's okay because it's only looking at the timestamp (we always return after decoding key and val, which is presumably the expensive part).

The until.less_equal(&time) is probably not a terribly selective filter in the rehydration case, but might be worth it in the spirit of being defensive. We'd probably be double counting work then in the common case, no idea if we would feel bad about that. Maybe we should change it to be "1 work per decoded key/valplusmfp_output.len().saturating_sub(2)` (the latter basically being how many "extra" outputs are generated)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double counting seems fine to me, it's all an approximation right now. I think I'd feel better about that if the yield threshold were configurable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other thought: we could add a debug! that outputs the time spent before yielding and # of inputs and # of outputs (or create additional metrics for those) to guide any future tuning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that there's a decent amount of hesitation around this, I've decided to keep the change as minimal as possible. Adding something for each mfp eval is pretty critical, so I think we have to live with that change, but I think moving the filtering is a bit speculative.

If possible, I'd also love to punt on any additional metrics or introspection in this PR because I don't really have a ton of time to devote to it atm, but I really think we want the knob in prod.

let arena = mz_repr::RowArena::new();
let mut datums_local = datum_vec.borrow_with(&row);
for result in mfp.evaluate(
Expand Down
2 changes: 0 additions & 2 deletions src/storage/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
timely::progress::Antichain::new(),
None,
None,
// Copy the logic in DeltaJoin/Get/Join to start.
|_timer, count| count > 1_000_000,
);
tokens.push(source_token);

Expand Down
4 changes: 0 additions & 4 deletions src/storage/src/render/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,6 @@ where
Antichain::new(),
None,
None,
// Copy the logic in DeltaJoin/Get/Join to start.
|_timer, count| count > 1_000_000,
);
let (tx_source_ok, tx_source_err) = (
tx_source_ok_stream.as_collection(),
Expand Down Expand Up @@ -483,8 +481,6 @@ where
Antichain::new(),
None,
flow_control,
// Copy the logic in DeltaJoin/Get/Join to start.
|_timer, count| count > 1_000_000,
);
(
stream.as_collection(),
Expand Down