From e1974639c16e7b15f3947d3f2bf399b60bbba67f Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 16 Dec 2025 23:39:41 +0200 Subject: [PATCH 1/3] store: Reject incompatible graft schemas during site allocation Signed-off-by: Maksim Dimitrov --- store/postgres/src/deployment_store.rs | 16 +----- store/postgres/src/subgraph_store.rs | 75 ++++++++++++++++---------- 2 files changed, 49 insertions(+), 42 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 3703534979c..a9fcc833e99 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -184,7 +184,6 @@ impl DeploymentStore { schema: &InputSchema, deployment: DeploymentCreate, site: Arc, - graft_base: Option>, replace: bool, on_sync: OnSync, index_def: Option, @@ -217,7 +216,7 @@ impl DeploymentStore { let query = format!("create schema {}", &site.namespace); conn.batch_execute(&query).await?; - let layout = Layout::create_relational_schema( + let _ = Layout::create_relational_schema( conn, site.clone(), schema, @@ -225,19 +224,6 @@ impl DeploymentStore { index_def, ) .await?; - // See if we are grafting and check that the graft is permissible - if let Some(base) = graft_base { - let errors = layout.can_copy_from(&base); - if !errors.is_empty() { - return Err(StoreError::Unknown(anyhow!( - "The subgraph `{}` cannot be used as the graft base \ - for `{}` because the schemas are incompatible:\n - {}", - &base.catalog.site.namespace, - &layout.catalog.site.namespace, - errors.join("\n - ") - ))); - } - } // Create data sources table if site.schema_version.private_data_sources() { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 478d21eba02..f47049d0e39 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -39,6 +39,7 @@ use graph::{ use graph::{derive::CheapClone, futures03::future::join_all, prelude::alloy::primitives::Address}; use crate::{ + catalog::Catalog, deployment::{OnSync, SubgraphHealth}, primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site}, relational::{ @@ -88,7 +89,7 @@ impl Shard { .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') { return Err(StoreError::InvalidIdentifier(format!( - "shard name `{}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'", name + "shard name `{name}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'" ))); } Ok(Shard(name)) @@ -351,33 +352,60 @@ impl SubgraphStore { // assignment that we used last time to avoid creating // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id).await?; + let mut conn = self.primary_conn().await?; - let (site, site_was_created) = conn - .allocate_site(shard, schema.id(), network_name, graft_base) - .await?; - let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id); - (site, !site_was_created, node_id) + conn.transaction(|conn| { + async { + let (site, site_was_created) = conn + .allocate_site(shard, schema.id(), network_name, graft_base) + .await?; + let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id); + let site = Arc::new(site); + + if let Some(graft_base) = graft_base { + // Ensure that the graft base exists + let base_layout = self.layout(graft_base).await?; + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + let catalog = Catalog::for_tests( + site.cheap_clone(), + entities_with_causality_region.into_iter().collect(), + )?; + let layout = Layout::new(site.cheap_clone(), schema, catalog)?; + + let errors = layout.can_copy_from(&base_layout); + if !errors.is_empty() { + return Err(StoreError::Unknown(anyhow!( + "The subgraph `{}` cannot be used as the graft base \ + for `{}` because the schemas are incompatible:\n - {}", + &base_layout.catalog.site.namespace, + &layout.catalog.site.namespace, + errors.join("\n - ") + ))); + } + } + + Ok((site, !site_was_created, node_id)) + } + .scope_boxed() + }) + .await? }; - let site = Arc::new(site); - // if the deployment already exists, we don't need to perform any copying - // so we can set graft_base to None - // if it doesn't exist, we need to copy the graft base to the new deployment - let graft_base_layout = if !exists { - let graft_base = match deployment.graft_base.as_ref() { + // If the deployment already exists, we don't need to perform any copying + // If it doesn't exist, we need to copy the graft base to the new deployment + if !exists { + let graft_base_layout = match graft_base { Some(base) => Some(self.layout(base).await?), None => None, }; - if let Some(graft_base) = &graft_base { + if let Some(graft_base_layout) = &graft_base_layout { self.primary_conn() .await? - .record_active_copy(graft_base.site.as_ref(), site.as_ref()) + .record_active_copy(graft_base_layout.site.as_ref(), site.as_ref()) .await?; } - graft_base - } else { - None }; // Create the actual databases schema and metadata entries @@ -386,7 +414,7 @@ impl SubgraphStore { .get(&site.shard) .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - let index_def = if let Some(graft) = &graft_base.clone() { + let index_def = if let Some(graft) = graft_base { if let Some(site) = self.sites.get(graft) { let store = self .stores @@ -406,7 +434,6 @@ impl SubgraphStore { schema, deployment, site.clone(), - graft_base_layout, replace, OnSync::None, index_def, @@ -731,8 +758,7 @@ impl Inner { if src.id == dst.id { return Err(StoreError::Unknown(anyhow!( - "can not copy deployment {} onto itself", - src_loc + "can not copy deployment {src_loc} onto itself" ))); } // The very last thing we do when we set up a copy here is assign it @@ -740,9 +766,7 @@ impl Inner { // should not have been called. if let Some(node) = self.mirror.assigned_node(dst.as_ref()).await? { return Err(StoreError::Unknown(anyhow!( - "can not copy into deployment {} since it is already assigned to node `{}`", - dst_loc, - node + "can not copy into deployment {dst_loc} since it is already assigned to node `{node}`" ))); } let deployment = src_store.load_deployment(src.clone()).await?; @@ -758,8 +782,6 @@ impl Inner { history_blocks_override: None, }; - let graft_base = self.layout(&src.deployment).await?; - self.primary_conn() .await? .record_active_copy(src.as_ref(), dst.as_ref()) @@ -776,7 +798,6 @@ impl Inner { &src_layout.input_schema, deployment, dst.clone(), - Some(graft_base), false, on_sync, Some(index_def), From 2e46515a8739c60e21eb633d4cfefa7a9862a192 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Thu, 22 Jan 2026 11:28:06 +0200 Subject: [PATCH 2/3] store: Move recording the active_copy in the transaction Signed-off-by: Maksim Dimitrov --- store/postgres/src/subgraph_store.rs | 108 +++++++++++++-------------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index f47049d0e39..d56c2e29792 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -340,7 +340,7 @@ impl SubgraphStore { self.evict(schema.id())?; let graft_base = deployment.graft_base.as_ref(); - let (site, exists, node_id) = { + let (site, deployment_store, node_id) = { // We need to deal with two situations: // (1) We are really creating a new subgraph; it therefore needs // to go in the shard and onto the node that the placement @@ -353,67 +353,61 @@ impl SubgraphStore { // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id).await?; - let mut conn = self.primary_conn().await?; - conn.transaction(|conn| { - async { - let (site, site_was_created) = conn - .allocate_site(shard, schema.id(), network_name, graft_base) - .await?; - let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id); - let site = Arc::new(site); - - if let Some(graft_base) = graft_base { - // Ensure that the graft base exists - let base_layout = self.layout(graft_base).await?; - let entities_with_causality_region = - deployment.manifest.entities_with_causality_region.clone(); - let catalog = Catalog::for_tests( - site.cheap_clone(), - entities_with_causality_region.into_iter().collect(), - )?; - let layout = Layout::new(site.cheap_clone(), schema, catalog)?; - - let errors = layout.can_copy_from(&base_layout); - if !errors.is_empty() { - return Err(StoreError::Unknown(anyhow!( - "The subgraph `{}` cannot be used as the graft base \ - for `{}` because the schemas are incompatible:\n - {}", - &base_layout.catalog.site.namespace, - &layout.catalog.site.namespace, - errors.join("\n - ") - ))); + let mut pconn = self.primary_conn().await?; + pconn + .transaction(|pconn| { + async { + let (site, site_was_created) = pconn + .allocate_site(shard, schema.id(), network_name, graft_base) + .await?; + let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id); + let site = Arc::new(site); + let deployment_store = self + .stores + .get(&site.shard) + .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; + + if site_was_created { + if let Some(graft_base) = graft_base { + // Ensure that the graft base exists + let base_layout = self.layout(graft_base).await?; + let mut shard_conn = + deployment_store.get_replica_conn(ReplicaId::Main).await?; + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + let catalog = Catalog::for_creation( + &mut shard_conn, + site.cheap_clone(), + entities_with_causality_region.into_iter().collect(), + ) + .await?; + let layout = Layout::new(site.cheap_clone(), schema, catalog)?; + + let errors = layout.can_copy_from(&base_layout); + if !errors.is_empty() { + return Err(StoreError::Unknown(anyhow!( + "The subgraph `{}` cannot be used as the graft base \ + for `{}` because the schemas are incompatible:\n - {}", + &base_layout.catalog.site.namespace, + &layout.catalog.site.namespace, + errors.join("\n - ") + ))); + } + + pconn + .record_active_copy(base_layout.site.as_ref(), site.as_ref()) + .await?; + } } - } - - Ok((site, !site_was_created, node_id)) - } - .scope_boxed() - }) - .await? - }; - // If the deployment already exists, we don't need to perform any copying - // If it doesn't exist, we need to copy the graft base to the new deployment - if !exists { - let graft_base_layout = match graft_base { - Some(base) => Some(self.layout(base).await?), - None => None, - }; - - if let Some(graft_base_layout) = &graft_base_layout { - self.primary_conn() - .await? - .record_active_copy(graft_base_layout.site.as_ref(), site.as_ref()) - .await?; - } + Ok((site, deployment_store, node_id)) + } + .scope_boxed() + }) + .await? }; // Create the actual databases schema and metadata entries - let deployment_store = self - .stores - .get(&site.shard) - .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - let index_def = if let Some(graft) = graft_base { if let Some(site) = self.sites.get(graft) { let store = self From 0d307c9894e3745e2fb90d0e7254d502acac9a83 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 27 Jan 2026 14:54:26 +0200 Subject: [PATCH 3/3] fix(store): handle orphaned sites in graft compatibility check Move graft compatibility validation out of primary transaction to avoid holding primary and shard connections simultaneously, which could exhaust connection pools when they share the same database. - Detect orphaned sites (site exists but deployment doesn't) and re-run the graft `can_copy_from` check on redeploy - Only insert into `active_copies` when a copy is actually needed, avoiding spurious records for already-copied deployments - Maintain idempotency: failed deployments leave state that will be properly validated on the next attempt Signed-off-by: Maksim Dimitrov --- store/postgres/src/subgraph_store.rs | 104 ++++++++++++++------------- 1 file changed, 55 insertions(+), 49 deletions(-) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index d56c2e29792..74ec326de6c 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -354,57 +354,63 @@ impl SubgraphStore { let (shard, node_id) = self.place(&name, &network_name, node_id).await?; let mut pconn = self.primary_conn().await?; - pconn - .transaction(|pconn| { - async { - let (site, site_was_created) = pconn - .allocate_site(shard, schema.id(), network_name, graft_base) - .await?; - let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id); - let site = Arc::new(site); - let deployment_store = self - .stores - .get(&site.shard) - .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - - if site_was_created { - if let Some(graft_base) = graft_base { - // Ensure that the graft base exists - let base_layout = self.layout(graft_base).await?; - let mut shard_conn = - deployment_store.get_replica_conn(ReplicaId::Main).await?; - let entities_with_causality_region = - deployment.manifest.entities_with_causality_region.clone(); - let catalog = Catalog::for_creation( - &mut shard_conn, - site.cheap_clone(), - entities_with_causality_region.into_iter().collect(), - ) - .await?; - let layout = Layout::new(site.cheap_clone(), schema, catalog)?; - - let errors = layout.can_copy_from(&base_layout); - if !errors.is_empty() { - return Err(StoreError::Unknown(anyhow!( - "The subgraph `{}` cannot be used as the graft base \ + + let (site, site_was_created) = pconn + .allocate_site(shard, schema.id(), network_name, graft_base) + .await?; + let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id); + let site = Arc::new(site); + let deployment_store = self + .stores + .get(&site.shard) + .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; + + let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?; + let needs_check = if site_was_created { + true + } else { + // If deployment does not exist, but site exists it means + // that we are recovering from a failed deployment creation with an orphaned site. + // In that case, we should check graft compatibility again. + let exists = crate::deployment::exists(&mut shard_conn, &site).await?; + !exists + }; + + if let Some(graft_base) = graft_base { + let base_layout = self.layout(graft_base).await?; + + if needs_check { + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + let catalog = Catalog::for_creation( + &mut shard_conn, + site.cheap_clone(), + entities_with_causality_region.into_iter().collect(), + ) + .await?; + let layout = Layout::new(site.cheap_clone(), schema, catalog)?; + + let errors = layout.can_copy_from(&base_layout); + if !errors.is_empty() { + return Err(StoreError::Unknown(anyhow!( + "The subgraph `{}` cannot be used as the graft base \ for `{}` because the schemas are incompatible:\n - {}", - &base_layout.catalog.site.namespace, - &layout.catalog.site.namespace, - errors.join("\n - ") - ))); - } - - pconn - .record_active_copy(base_layout.site.as_ref(), site.as_ref()) - .await?; - } - } - - Ok((site, deployment_store, node_id)) + &base_layout.catalog.site.namespace, + &layout.catalog.site.namespace, + errors.join("\n - ") + ))); } - .scope_boxed() - }) - .await? + + // Only record active copy when the graft check passes and a copy is needed. + // If deployment already exists, the copy has either completed (no active_copies + // record) or is in progress (active_copies record already exists). + pconn + .record_active_copy(base_layout.site.as_ref(), site.as_ref()) + .await?; + } + } + + (site, deployment_store, node_id) }; // Create the actual databases schema and metadata entries