Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 5 additions & 65 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! Persistent metadata storage for the coordinator.

use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet};
use std::convert;
use std::sync::Arc;

Expand Down Expand Up @@ -48,7 +48,6 @@ use mz_controller::clusters::ReplicaLocation;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::OptimizedMirRelationExpr;
use mz_license_keys::ValidatedLicenseKey;
use mz_ore::collections::HashSet;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
use mz_ore::result::ResultExt as _;
Expand All @@ -63,9 +62,9 @@ use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarT
use mz_secrets::InMemorySecretsController;
use mz_sql::catalog::{
CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
SessionCatalog, SystemObjectType,
};
use mz_sql::names::{
CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
Expand Down Expand Up @@ -325,58 +324,6 @@ impl Catalog {

dropped_notices
}

/// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
/// as a result of creating new indexes on the items in `ons`.
///
/// When creating and inserting a new index, we need to invalidate some entries that may
/// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
/// the following objects:
///
/// - `o`.
/// - All compute objects that depend directly on `o`.
/// - All compute objects that would directly depend on `o`, if all views were inlined.
pub(crate) fn invalidate_for_index(
&self,
ons: impl Iterator<Item = GlobalId>,
) -> BTreeSet<GlobalId> {
let mut dependencies = BTreeSet::new();
let mut queue = VecDeque::new();
let mut seen = HashSet::new();
for on in ons {
let entry = self.get_entry_by_global_id(&on);
dependencies.insert(on);
seen.insert(entry.id);
let uses = entry.uses();
queue.extend(uses.clone());
}

while let Some(cur) = queue.pop_front() {
let entry = self.get_entry(&cur);
if seen.insert(cur) {
let global_ids = entry.global_ids();
match entry.item_type() {
CatalogItemType::Table
| CatalogItemType::Source
| CatalogItemType::MaterializedView
| CatalogItemType::Sink
| CatalogItemType::Index
| CatalogItemType::Type
| CatalogItemType::Func
| CatalogItemType::Secret
| CatalogItemType::Connection
| CatalogItemType::ContinualTask => {
dependencies.extend(global_ids);
}
CatalogItemType::View => {
dependencies.extend(global_ids);
queue.extend(entry.uses());
}
}
}
}
dependencies
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1466,18 +1413,11 @@ impl Catalog {
new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
) -> BoxFuture<'b, ()> {
if let Some(expr_cache) = &self.expr_cache_handle {
let ons = new_local_expressions
.iter()
.map(|(id, _)| id)
.chain(new_global_expressions.iter().map(|(id, _)| id))
.map(|id| self.get_entry_by_global_id(id))
.filter_map(|entry| entry.index().map(|index| index.on));
let invalidate_ids = self.invalidate_for_index(ons);
expr_cache
.update(
new_local_expressions,
new_global_expressions,
invalidate_ids,
Default::default(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All calls to update are now passing an empty set to the invalidate_ids parameter, so you could consider removing that parameter, and downstream things. (But it's also ok to leave that to follow-up PRs.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it in because I want to follow up with a PR that also updates the cache in response to DDL. And I suspect there is a good chance we'll need to pass this argument when removing plans on DROP.

)
.boxed()
} else {
Expand Down