Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/developer/generated/compute-types/dyncfgs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Declares all dynamic configuration (`dyncfg`) constants for the compute layer. K

- **Rendering**: `ENABLE_HALF_JOIN2`, `ENABLE_MZ_JOIN_CORE`, `LINEAR_JOIN_YIELDING`, `ENABLE_TEMPORAL_BUCKETING`/`TEMPORAL_BUCKETING_SUMMARY`, `COMPUTE_FLAT_MAP_FUEL`, `ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`, `COMPUTE_APPLY_COLUMN_DEMANDS`.
- **MV sink correction**: `ENABLE_CORRECTION_V2`, `CONSOLIDATING_VEC_GROWTH_DAMPENER`, `MV_SINK_ADVANCE_PERSIST_FRONTIERS`.
- **Memory management**: `ENABLE_LGALLOC`, `ENABLE_LGALLOC_EAGER_RECLAMATION`, `LGALLOC_BACKGROUND_INTERVAL`, `LGALLOC_FILE_GROWTH_DAMPENER`, `LGALLOC_LOCAL_BUFFER_BYTES`, `LGALLOC_SLOW_CLEAR_BYTES`, `ENABLE_COLUMNATION_LGALLOC`, `ENABLE_COLUMNAR_LGALLOC`, `MEMORY_LIMITER_INTERVAL`, `MEMORY_LIMITER_USAGE_BIAS`, `MEMORY_LIMITER_BURST_FACTOR`.
- **Memory management**: `ENABLE_LGALLOC`, `LGALLOC_BACKGROUND_INTERVAL`, `LGALLOC_FILE_GROWTH_DAMPENER`, `LGALLOC_LOCAL_BUFFER_BYTES`, `LGALLOC_SLOW_CLEAR_BYTES`, `ENABLE_COLUMNATION_LGALLOC`, `ENABLE_COLUMNAR_LGALLOC`, `MEMORY_LIMITER_INTERVAL`, `MEMORY_LIMITER_USAGE_BIAS`, `MEMORY_LIMITER_BURST_FACTOR`.
- **Backpressure**: `DATAFLOW_MAX_INFLIGHT_BYTES`, `DATAFLOW_MAX_INFLIGHT_BYTES_CC`, `ENABLE_COMPUTE_LOGICAL_BACKPRESSURE`, `COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES`, `COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK`.
- **Peek stash**: `ENABLE_PEEK_RESPONSE_STASH`, `PEEK_RESPONSE_STASH_THRESHOLD_BYTES`, `PEEK_RESPONSE_STASH_BATCH_MAX_RUNS`, `PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES`, `PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES`, `PEEK_STASH_NUM_BATCHES`, `PEEK_STASH_BATCH_SIZE`.
- **Other**: `HYDRATION_CONCURRENCY`, `COMPUTE_SERVER_MAINTENANCE_INTERVAL`, `ENABLE_COMPUTE_REPLICA_EXPIRATION`, `COMPUTE_REPLICA_EXPIRATION_OFFSET`, `COPY_TO_S3_*`, `COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL`, `SUBSCRIBE_SNAPSHOT_OPTIMIZATION`.
Expand Down
11 changes: 0 additions & 11 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def get_minimal_system_parameters(
"enable_lgalloc": "false",
"enable_load_generator_counter": "true",
"enable_logical_compaction_window": "true",
"enable_multi_worker_storage_persist_sink": "true",
"enable_multi_replica_sources": "true",
"enable_rbac_checks": "true",
"enable_reduce_mfp_fusion": "true",
Expand Down Expand Up @@ -307,12 +306,6 @@ def get_variable_system_parameters(
"1048576",
["65536", "262144", "1048576", "4194304"],
),
VariableSystemParameter(
"persist_pubsub_client_enabled", "true", ["true", "false"]
),
VariableSystemParameter(
"persist_pubsub_push_diff_enabled", "true", ["true", "false"]
),
VariableSystemParameter(
"persist_record_compactions", "true", ["true", "false"]
),
Expand Down Expand Up @@ -471,7 +464,6 @@ def get_default_system_parameters(
"enable_compute_half_join2",
"enable_mz_join_core",
"linear_join_yielding",
"enable_lgalloc_eager_reclamation",
"lgalloc_background_interval",
"lgalloc_file_growth_dampener",
"lgalloc_local_buffer_bytes",
Expand All @@ -496,7 +488,6 @@ def get_default_system_parameters(
"compute_logical_backpressure_inflight_slack",
"persist_fetch_semaphore_cost_adjustment",
"persist_fetch_semaphore_permit_adjustment",
"persist_optimize_ignored_data_fetch",
"persist_pubsub_same_process_delegate_enabled",
"persist_pubsub_connect_attempt_timeout",
"persist_pubsub_request_timeout",
Expand All @@ -520,7 +511,6 @@ def get_default_system_parameters(
"crdb_keepalives_interval",
"crdb_keepalives_retries",
"persist_use_critical_since_txn",
"use_global_txn_cache_source",
"persist_batch_builder_max_outstanding_parts",
"persist_compaction_heuristic_min_inputs",
"persist_compaction_heuristic_min_parts",
Expand Down Expand Up @@ -573,7 +563,6 @@ def get_default_system_parameters(
"persist_txns_data_shard_retryer_multiplier",
"persist_txns_data_shard_retryer_clamp",
"storage_cluster_shutdown_grace_period",
"storage_dataflow_delay_sources_past_rehydration",
"storage_dataflow_suspendable_sources",
"storage_downgrade_since_during_finalization",
"replica_metrics_history_retention_interval",
Expand Down
8 changes: 0 additions & 8 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,9 +1477,6 @@ def __init__(
self.flags_with_values["persist_claim_unclaimed_compactions"] = (
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["persist_optimize_ignored_data_fetch"] = (
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["enable_variadic_left_join_lowering"] = (
BOOLEAN_FLAG_VALUES
)
Expand Down Expand Up @@ -1585,7 +1582,6 @@ def __init__(
"enable_compute_correction_v2",
"linear_join_yielding",
"enable_lgalloc",
"enable_lgalloc_eager_reclamation",
"enable_s3_tables_region_check",
"lgalloc_background_interval",
"lgalloc_file_growth_dampener",
Expand Down Expand Up @@ -1617,8 +1613,6 @@ def __init__(
"compute_logical_backpressure_inflight_slack",
"persist_fetch_semaphore_cost_adjustment",
"persist_fetch_semaphore_permit_adjustment",
"persist_pubsub_client_enabled",
"persist_pubsub_push_diff_enabled",
"persist_pubsub_same_process_delegate_enabled",
"persist_pubsub_connect_attempt_timeout",
"persist_pubsub_request_timeout",
Expand All @@ -1645,7 +1639,6 @@ def __init__(
"crdb_keepalives_idle",
"crdb_keepalives_interval",
"crdb_keepalives_retries",
"use_global_txn_cache_source",
"persist_batch_builder_max_outstanding_parts",
"persist_compaction_heuristic_min_inputs",
"persist_compaction_heuristic_min_parts",
Expand Down Expand Up @@ -1711,7 +1704,6 @@ def __init__(
"persist_txns_data_shard_retryer_multiplier",
"persist_txns_data_shard_retryer_clamp",
"storage_cluster_shutdown_grace_period",
"storage_dataflow_delay_sources_past_rehydration",
"storage_dataflow_suspendable_sources",
"storage_downgrade_since_during_finalization",
"replica_metrics_history_retention_interval",
Expand Down
8 changes: 0 additions & 8 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
/// Enable lgalloc.
pub const ENABLE_LGALLOC: Config<bool> = Config::new("enable_lgalloc", true, "Enable lgalloc.");

/// Enable lgalloc's eager memory return/reclamation feature.
pub const ENABLE_LGALLOC_EAGER_RECLAMATION: Config<bool> = Config::new(
"enable_lgalloc_eager_reclamation",
true,
"Enable lgalloc's eager return behavior.",
);

/// The interval at which the background thread wakes.
pub const LGALLOC_BACKGROUND_INTERVAL: Config<Duration> = Config::new(
"lgalloc_background_interval",
Expand Down Expand Up @@ -379,7 +372,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&MEMORY_LIMITER_INTERVAL)
.add(&MEMORY_LIMITER_USAGE_BIAS)
.add(&MEMORY_LIMITER_BURST_FACTOR)
.add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
.add(&ENABLE_COLUMNATION_LGALLOC)
.add(&ENABLE_COLUMNAR_LGALLOC)
.add(&COMPUTE_SERVER_MAINTENANCE_INTERVAL)
Expand Down
4 changes: 1 addition & 3 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,13 @@ impl ComputeState {
if ENABLE_LGALLOC.get(config) {
if let Some(path) = &self.context.scratch_directory {
let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
info!(
?path,
backgrund_interval=?interval,
clear_bytes,
eager_return,
file_growth_dampener,
local_buffer_bytes,
"enabling lgalloc"
Expand All @@ -271,7 +269,7 @@ impl ComputeState {
.enable()
.with_path(path.clone())
.with_background_config(background_worker_config)
.eager_return(eager_return)
.eager_return(true)
.file_growth_dampener(file_growth_dampener)
.local_buffer_bytes(local_buffer_bytes),
);
Expand Down
3 changes: 0 additions & 3 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
.add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
.add(&crate::fetch::VALIDATE_PART_BOUNDS_ON_READ)
.add(&crate::fetch::OPTIMIZE_IGNORED_DATA_FETCH)
.add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
.add(&crate::internal::cache::BLOB_CACHE_SCALE_WITH_THREADS)
.add(&crate::internal::cache::BLOB_CACHE_SCALE_FACTOR_BYTES)
Expand All @@ -351,8 +350,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::internal::state::ENABLE_INCREMENTAL_COMPACTION)
.add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
.add(&crate::read::READER_LEASE_DURATION)
.add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
.add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
.add(&crate::rpc::PUBSUB_SAME_PROCESS_DELEGATE_ENABLED)
.add(&crate::rpc::PUBSUB_CONNECT_ATTEMPT_TIMEOUT)
.add(&crate::rpc::PUBSUB_REQUEST_TIMEOUT)
Expand Down
13 changes: 2 additions & 11 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use differential_dataflow::difference::Monoid;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use itertools::EitherOrBoth;
use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
use mz_dyncfg::{Config, ConfigValHandle};
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
use mz_ore::{soft_assert_or_log, soft_panic_no_log, soft_panic_or_log};
Expand Down Expand Up @@ -85,12 +85,6 @@ pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
'row_with_validate', or 'arrow' (Materialize).",
);

pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
"persist_optimize_ignored_data_fetch",
true,
"CYA to allow opt-out of a performance optimization to skip fetching ignored data",
);

pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
"persist_validate_part_bounds_on_read",
false,
Expand Down Expand Up @@ -567,16 +561,13 @@ where

/// Apply any relevant projection pushdown optimizations, assuming that the data in the part
/// is equivalent to the provided key and value.
pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
pub fn maybe_optimize(&mut self, key: ArrayRef, val: ArrayRef) {
assert_eq!(key.len(), 1, "expect a single-row key array");
assert_eq!(val.len(), 1, "expect a single-row val array");
let as_of = match &self.filter {
FetchBatchFilter::Snapshot { as_of } => as_of,
FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
};
if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
return;
}
let (diffs_sum, _stats) = match &self.part {
BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
BatchPart::Inline { .. } => return,
Expand Down
6 changes: 2 additions & 4 deletions src/persist-client/src/internal/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::internal::state_versions::{EncodedRollup, StateVersions};
use crate::internal::trace::FueledMergeReq;
use crate::internal::watch::StateWatch;
use crate::read::LeasedReaderId;
use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
use crate::rpc::PubSubSender;
use crate::schema::SchemaCache;
use crate::{Diagnostics, PersistConfig, ShardId, cfg};
use differential_dataflow::difference::Monoid;
Expand Down Expand Up @@ -383,9 +383,7 @@ where
cmd.succeeded.inc();
self.shard_metrics.cmd_succeeded.inc();
self.update_state(new_state);
if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
self.pubsub_sender.push_diff(&self.shard_id, &diff);
}
self.pubsub_sender.push_diff(&self.shard_id, &diff);
return Ok((diff.seqno, Ok(res), maintenance));
}
ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
Expand Down
2 changes: 1 addition & 1 deletion src/persist-client/src/operators/shard_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ where
}
}
FilterResult::ReplaceWith { key, val } => {
part_desc.maybe_optimize(&cfg, key, val);
part_desc.maybe_optimize(key, val);
audit_budget_bytes = audit_budget_bytes.saturating_add(bytes);
}
}
Expand Down
32 changes: 2 additions & 30 deletions src/persist-client/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,6 @@ use crate::internal::service::{
};
use crate::metrics::Metrics;

/// Determines whether PubSub clients should connect to the PubSub server.
pub(crate) const PUBSUB_CLIENT_ENABLED: Config<bool> = Config::new(
"persist_pubsub_client_enabled",
true,
"Whether to connect to the Persist PubSub service.",
);

/// For connected clients, determines whether to push state diffs to the PubSub
/// server. For the server, determines whether to broadcast state diffs to
/// subscribed clients.
pub(crate) const PUBSUB_PUSH_DIFF_ENABLED: Config<bool> = Config::new(
"persist_pubsub_push_diff_enabled",
true,
"Whether to push state diffs to Persist PubSub.",
);

/// For connected clients, determines whether to push state diffs to the PubSub
/// server. For the server, determines whether to broadcast state diffs to
/// subscribed clients.
Expand Down Expand Up @@ -292,11 +276,6 @@ impl GrpcPubSubClient {
let sender = Arc::clone(&sender);
metrics.pubsub_client.grpc_connection.connected.set(0);

if !PUBSUB_CLIENT_ENABLED.get(&config.persist_cfg) {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}

// add a bit of backoff when reconnecting after some network/server failure
if is_first_connection_attempt {
is_first_connection_attempt = false;
Expand Down Expand Up @@ -443,10 +422,6 @@ impl GrpcPubSubClient {
metrics: &Metrics,
) -> Result<(), Error> {
loop {
if !PUBSUB_CLIENT_ENABLED.get(&config.persist_cfg) {
return Ok(());
}

debug!("awaiting next pubsub response");
match responses.next().await {
Some(Ok(message)) => {
Expand Down Expand Up @@ -1168,9 +1143,7 @@ impl proto_persist_pub_sub_server::ProtoPersistPubSub for PersistGrpcPubSubServe
seqno: req.seqno.into_rust().expect("valid seqno"),
data: req.diff.clone(),
};
if PUBSUB_PUSH_DIFF_ENABLED.get(&cfg) {
connection.push_diff(&shard_id, &diff);
}
connection.push_diff(&shard_id, &diff);
}
Some(proto_pub_sub_message::Message::Subscribe(diff)) => {
let shard_id = diff.shard_id.parse().expect("valid shard id");
Expand Down Expand Up @@ -1466,7 +1439,7 @@ mod grpc {
use crate::internal::service::proto_pub_sub_message::Message;
use crate::metrics::Metrics;
use crate::rpc::{
GrpcPubSubClient, PUBSUB_CLIENT_ENABLED, PUBSUB_RECONNECT_BACKOFF, PersistGrpcPubSubServer,
GrpcPubSubClient, PUBSUB_RECONNECT_BACKOFF, PersistGrpcPubSubServer,
PersistPubSubClient, PersistPubSubClientConfig, PubSubState,
};

Expand Down Expand Up @@ -1874,7 +1847,6 @@ mod grpc {
let cfg = PersistConfig::new_for_tests();

let mut updates = ConfigUpdates::default();
updates.add(&PUBSUB_CLIENT_ENABLED, true);
updates.add(&PUBSUB_RECONNECT_BACKOFF, Duration::ZERO);
cfg.apply_from(&updates);

Expand Down
6 changes: 0 additions & 6 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1850,12 +1850,6 @@ feature_flags!(
default: true,
enable_for_item_parsing: true,
},
{
name: enable_multi_worker_storage_persist_sink,
desc: "multi-worker storage persist sink",
default: true,
enable_for_item_parsing: true,
},
{
name: enable_persist_streaming_snapshot_and_fetch,
desc: "use the new streaming consolidate for snapshot_and_fetch",
Expand Down
11 changes: 0 additions & 11 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@ pub const CLUSTER_SHUTDOWN_GRACE_PERIOD: Config<Duration> = Config::new(

// Flow control

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
/// Configuration for basic hydration backpressure.
pub const DELAY_SOURCES_PAST_REHYDRATION: Config<bool> = Config::new(
"storage_dataflow_delay_sources_past_rehydration",
// This was original `false`, but it is not enabled everywhere.
true,
"Whether or not to delay sources producing values in some scenarios \
(namely, upsert) till after rehydration is finished",
);

/// Whether storage dataflows should suspend execution while downstream operators are still
/// processing data.
pub const SUSPENDABLE_SOURCES: Config<bool> = Config::new(
Expand Down Expand Up @@ -321,7 +311,6 @@ pub const STATISTICS_RETENTION_DURATION: Config<Duration> = Config::new(
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&CLUSTER_SHUTDOWN_GRACE_PERIOD)
.add(&DELAY_SOURCES_PAST_REHYDRATION)
.add(&ENFORCE_EXTERNAL_ADDRESSES)
.add(&KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS)
.add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES)
Expand Down
25 changes: 8 additions & 17 deletions src/storage/src/render/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
use mz_storage_operators::persist_source;
use mz_storage_operators::persist_source::Subtime;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::dyncfgs;
use mz_storage_types::errors::{
DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
};
Expand Down Expand Up @@ -353,22 +352,14 @@ where
// rehydration processing the `persist_source` input above.
needed_tokens.push(upsert_token);

// If configured, delay raw sources until we rehydrate the upsert
// source. Otherwise, drop the token, unblocking the sources at the
// end rendering.
if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
.get(storage_state.storage_configuration.config_set())
{
crate::upsert::rehydration_finished(
scope.clone(),
base_source_config,
rehydrated_token,
refine_antichain(&resume_upper),
snapshot_progress.clone(),
);
} else {
drop(rehydrated_token)
};
// Delay raw sources until we rehydrate the upsert source.
crate::upsert::rehydration_finished(
scope.clone(),
base_source_config,
rehydrated_token,
refine_antichain(&resume_upper),
snapshot_progress.clone(),
);

// If backpressure from persist is enabled, we connect the upsert operator's
// snapshot progress to the persist source feedback handle.
Expand Down
Loading
Loading