Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,6 @@ pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
"Delay interval when reconnecting to a source / sink after halt.",
);

/// Whether to mint reclock bindings based on the latest probed frontier or the currently ingested
/// frontier.
pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
"storage_reclock_to_latest",
true,
"Whether to mint reclock bindings based on the latest probed offset or the latest ingested offset.",
);

/// Whether to use the new continual feedback upsert operator.
pub const STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT: Config<bool> = Config::new(
"storage_use_continual_feedback_upsert",
Expand Down Expand Up @@ -373,7 +365,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&SINK_PROGRESS_SEARCH)
.add(&SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY)
.add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION)
.add(&STORAGE_RECLOCK_TO_LATEST)
.add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
.add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
.add(&STORAGE_SERVER_MAINTENANCE_INTERVAL)
Expand Down
89 changes: 82 additions & 7 deletions src/storage/src/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,28 @@ use std::ops::Rem;
use std::sync::Arc;
use std::time::Duration;

use differential_dataflow::AsCollection;
use differential_dataflow::{AsCollection, Hashable};
use futures::StreamExt;
use itertools::Itertools;
use mz_ore::cast::CastFrom;
use mz_ore::iter::IteratorExt;
use mz_ore::now::NowFn;
use mz_repr::{Diff, GlobalId, Row};
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::load_generator::{
Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput,
LoadGeneratorSourceConnection,
};
use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp};
use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use mz_timely_util::containers::stack::AccountedStackBuilder;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::core::Partition;
use timely::dataflow::{Scope, Stream};
use timely::progress::Antichain;
use timely::progress::{Antichain, Timestamp};
use tokio::time::{Instant, interval_at};

use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
Expand Down Expand Up @@ -207,9 +211,8 @@ impl SourceRender for LoadGeneratorSourceConnection {
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
Stream<G, Infallible>,
Stream<G, HealthStatusMessage>,
Option<Stream<G, Probe<MzOffset>>>,
Stream<G, Probe<MzOffset>>,
Vec<PressOnDropButton>,
) {
let generator_kind = GeneratorKind::new(
Expand All @@ -218,10 +221,17 @@ impl SourceRender for LoadGeneratorSourceConnection {
self.as_of,
self.up_to,
);
let (updates, uppers, health, button) =
let (updates, progress, health, button) =
generator_kind.render(scope, config, committed_uppers, start_signal);

(updates, uppers, health, None, button)
let probe_stream = synthesize_probes(
config.id,
&progress,
config.timestamp_interval,
config.now_fn.clone(),
);

(updates, health, probe_stream, button)
}
}

Expand Down Expand Up @@ -450,3 +460,68 @@ fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
vec![button.press_on_drop()],
)
}

/// Synthesizes a probe stream that produces the frontier of the given progress stream at the given
/// interval.
///
/// This is used as a fallback for sources that don't support probing the frontier of the upstream
/// system.
fn synthesize_probes<G>(
source_id: GlobalId,
progress: &Stream<G, Infallible>,
interval: Duration,
now_fn: NowFn,
) -> Stream<G, Probe<G::Timestamp>>
where
G: Scope,
{
let scope = progress.scope();

let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
let is_active_worker = active_worker == scope.index();

let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
let mut input = op.new_input_for(progress, Pipeline, &output);

op.build(|caps| async move {
if !is_active_worker {
return;
}

let [cap] = caps.try_into().expect("one capability per output");

let mut ticker = super::probe::Ticker::new(move || interval, now_fn.clone());

let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
let mut frontier = minimum_frontier.clone();
loop {
tokio::select! {
event = input.next() => match event {
Some(AsyncEvent::Progress(progress)) => frontier = progress,
Some(AsyncEvent::Data(..)) => unreachable!(),
None => break,
},
// We only report a probe if the source upper frontier is not the minimum frontier.
// This makes it so the first remap binding corresponds to the snapshot of the
// source, and because the first binding always maps to the minimum *target*
// frontier we guarantee that the source will never appear empty.
probe_ts = ticker.tick(), if frontier != minimum_frontier => {
let probe = Probe {
probe_ts,
upstream_frontier: frontier.clone(),
};
output.give(&cap, probe);
}
}
}

let probe = Probe {
probe_ts: now_fn().into(),
upstream_frontier: Antichain::new(),
};
output.give(&cap, probe);
});

output_stream
}
38 changes: 4 additions & 34 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use std::convert::Infallible;
use std::str::{self};
use std::sync::Arc;
use std::thread;
Expand Down Expand Up @@ -136,8 +135,6 @@ pub struct KafkaSourceReader {
struct PartitionCapability {
/// The capability of the data produced
data: Capability<KafkaTimestamp>,
/// The capability of the progress stream
progress: Capability<KafkaTimestamp>,
}

/// The high watermark offsets of a Kafka partition.
Expand Down Expand Up @@ -186,14 +183,13 @@ impl SourceRender for KafkaSourceConnection {
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
Stream<G, Infallible>,
Stream<G, HealthStatusMessage>,
Option<Stream<G, Probe<KafkaTimestamp>>>,
Stream<G, Probe<KafkaTimestamp>>,
Vec<PressOnDropButton>,
) {
let (metadata, probes, metadata_token) =
render_metadata_fetcher(scope, self.clone(), config.clone());
let (data, progress, health, reader_token) = render_reader(
let (data, health, reader_token) = render_reader(
scope,
self,
config.clone(),
Expand Down Expand Up @@ -221,9 +217,8 @@ impl SourceRender for KafkaSourceConnection {

(
data_collections,
progress,
health,
Some(probes),
probes,
vec![metadata_token, reader_token],
)
}
Expand All @@ -242,15 +237,13 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
Stream<G, Infallible>,
Stream<G, HealthStatusMessage>,
PressOnDropButton,
) {
let name = format!("KafkaReader({})", config.id);
let mut builder = AsyncOperatorBuilder::new(name, scope.clone());

let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();

let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
Expand Down Expand Up @@ -307,7 +300,7 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
let busy_signal = Arc::clone(&config.busy_signal);
let button = builder.build(move |caps| {
SignaledFuture::new(busy_signal, async move {
let [mut data_cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
let [mut data_cap, health_cap] = caps.try_into().unwrap();

let client_id = connection.client_id(
config.config.config_set(),
Expand Down Expand Up @@ -363,7 +356,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
);
let part_cap = PartitionCapability {
data: data_cap.delayed(&part_ts),
progress: progress_cap.delayed(&part_ts),
};
partition_capabilities.insert(*pid, part_cap);
}
Expand All @@ -375,7 +367,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
let future_ts =
Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
data_cap.downgrade(&future_ts);
progress_cap.downgrade(&future_ts);

info!(
source_id = config.id.to_string(),
Expand Down Expand Up @@ -620,23 +611,9 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
RangeBound::exact(pid),
MzOffset::from(start_offset),
);
let part_upper_ts = Partitioned::new_singleton(
RangeBound::exact(pid),
MzOffset::from(high_watermark),
);

// This is the moment at which we have discovered a new partition
// and we need to make sure we produce its initial snapshot at a,
// single timestamp so that the source transitions from no data
// from this partition to all the data of this partition. We do
// this by initializing the data capability to the starting offset
// and, importantly, the progress capability directly to the high
// watermark. This jump of the progress capability ensures that
// everything until the high watermark will be reclocked to a
// single point.
entry.insert(PartitionCapability {
data: data_cap.delayed(&part_since_ts),
progress: progress_cap.delayed(&part_upper_ts),
});
}
}
Expand Down Expand Up @@ -682,7 +659,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
}

data_cap.downgrade(&future_ts);
progress_cap.downgrade(&future_ts);
}
Some(MetadataUpdate::TransientError(status)) => {
if let Some(update) = status.kafka {
Expand Down Expand Up @@ -955,11 +931,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
}
};

// We use try_downgrade here because during the initial snapshot phase the
// data capability is not beyond the progress capability and therefore a
// normal downgrade would panic. Once it catches up though the data
// capbility is what's pushing the progress capability forward.
let _ = part_cap.progress.try_downgrade(&upper);
}
}

Expand All @@ -980,7 +951,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(

(
stream.as_collection(),
progress_stream,
health_stream,
button.press_on_drop(),
)
Expand Down
9 changes: 3 additions & 6 deletions src/storage/src/source/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
//! trigger a restart of the dataflow.

use std::collections::BTreeMap;
use std::convert::Infallible;
use std::fmt;
use std::io;
use std::rc::Rc;
Expand Down Expand Up @@ -110,9 +109,8 @@ impl SourceRender for MySqlSourceConnection {
_start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
Stream<G, Infallible>,
Stream<G, HealthStatusMessage>,
Option<Stream<G, Probe<GtidPartition>>>,
Stream<G, Probe<GtidPartition>>,
Vec<PressOnDropButton>,
) {
// Collect the source outputs that we will be exporting.
Expand Down Expand Up @@ -163,7 +161,7 @@ impl SourceRender for MySqlSourceConnection {
metrics.snapshot_metrics.clone(),
);

let (repl_updates, uppers, repl_err, repl_token) = replication::render(
let (repl_updates, repl_err, repl_token) = replication::render(
scope.clone(),
config.clone(),
self.clone(),
Expand Down Expand Up @@ -234,9 +232,8 @@ impl SourceRender for MySqlSourceConnection {

(
data_collections,
uppers,
health,
Some(probe_stream),
probe_stream,
vec![snapshot_token, repl_token, stats_token],
)
}
Expand Down
Loading