diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index e4c6995d639..e027f00d2cc 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -1648,7 +1648,6 @@ mod tests { index_id: "test-index".to_string(), source_id: INGEST_V2_SOURCE_ID.to_string(), }], - closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; let get_open_shards_response = control_plane_mailbox @@ -2411,7 +2410,6 @@ mod tests { index_id: "test-index-foo".to_string(), source_id: INGEST_V2_SOURCE_ID.to_string(), }], - closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }) .await @@ -2626,7 +2624,6 @@ mod tests { index_id: "test-index".to_string(), source_id: INGEST_V2_SOURCE_ID.to_string(), }], - closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; control_plane_mailbox @@ -2780,7 +2777,6 @@ mod tests { index_id: "test-index".to_string(), source_id: INGEST_V2_SOURCE_ID.to_string(), }], - closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; control_plane_mailbox diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 63295d61eca..829fa7fbf28 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -26,7 +26,6 @@ use futures::stream::FuturesUnordered; use itertools::{Itertools as _, MinMaxResult}; use quickwit_actors::Mailbox; use quickwit_common::Progress; -use quickwit_common::pretty::PrettySample; use quickwit_ingest::{IngesterPool, LeaderId, LocalShardsUpdate}; use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, GetOrCreateOpenShardsFailureReason, @@ -45,7 +44,7 @@ use quickwit_proto::metastore::{ MetastoreResult, MetastoreService, MetastoreServiceClient, OpenShardSubrequest, OpenShardsRequest, OpenShardsResponse, serde_utils, }; -use quickwit_proto::types::{IndexUid, NodeId, NodeIdRef, Position, ShardId, SourceUid}; +use quickwit_proto::types::{NodeId, NodeIdRef, Position, ShardId, SourceUid}; use rand::prelude::IndexedRandom; use rand::rngs::ThreadRng; use rand::seq::SliceRandom; @@ -363,29 +362,6 @@ impl IngestController { wait_handle } - fn handle_closed_shards(&self, closed_shards: Vec, model: &mut ControlPlaneModel) { - for closed_shard in closed_shards { - let index_uid: IndexUid = closed_shard.index_uid().clone(); - let source_id = closed_shard.source_id; - - let source_uid = SourceUid { - index_uid, - source_id, - }; - let closed_shard_ids = model.close_shards(&source_uid, &closed_shard.shard_ids); - - if !closed_shard_ids.is_empty() { - info!( - index_id=%source_uid.index_uid.index_id, - source_id=%source_uid.source_id, - shard_ids=?PrettySample::new(&closed_shard_ids, 5), - "closed {} shards reported by router", - closed_shard_ids.len() - ); - } - } - } - pub(crate) async fn handle_local_shards_update( &mut self, local_shards_update: LocalShardsUpdate, @@ -442,13 +418,6 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult { - // Closing shards is an operation performed by ingesters, - // so the control plane is not necessarily aware that they are closed. - // - // Routers can report closed shards so that we can update our - // internal state. - self.handle_closed_shards(get_open_shards_request.closed_shards, model); - let num_subrequests = get_open_shards_request.subrequests.len(); let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests); let mut get_or_create_open_shards_failures = Vec::new(); @@ -1324,7 +1293,7 @@ mod tests { use quickwit_proto::metastore::{ self, MetastoreError, MockMetastoreService, OpenShardSubresponse, }; - use quickwit_proto::types::{DocMappingUid, Position, SourceId}; + use quickwit_proto::types::{DocMappingUid, IndexUid, Position, SourceId}; use super::*; @@ -1461,7 +1430,6 @@ mod tests { let request = GetOrCreateOpenShardsRequest { subrequests: Vec::new(), - closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; let response = controller @@ -1494,11 +1462,9 @@ mod tests { source_id: "source-not-found".to_string(), }, ]; - let closed_shards = Vec::new(); let unavailable_leaders = vec!["test-ingester-0".to_string()]; let request = GetOrCreateOpenShardsRequest { subrequests, - closed_shards, unavailable_leaders, }; let response = controller @@ -1627,7 +1593,6 @@ mod tests { }]; let request = GetOrCreateOpenShardsRequest { subrequests, - closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; @@ -1639,57 +1604,6 @@ mod tests { assert!(!metastore_error.is_transaction_certainly_aborted()); } - #[tokio::test] - async fn test_ingest_controller_get_open_shards_handles_closed_shards() { - let metastore = MetastoreServiceClient::mocked(); - let ingester_pool = IngesterPool::default(); - let replication_factor = 2; - - let mut controller = IngestController::new( - metastore, - ingester_pool, - replication_factor, - TEST_SHARD_THROUGHPUT_LIMIT_MIB, - 1.001, - ); - let mut model = ControlPlaneModel::default(); - - let index_uid = IndexUid::for_test("test-index-0", 0); - let source_id: SourceId = "test-source".to_string(); - - let shards = vec![Shard { - shard_id: Some(ShardId::from(1)), - index_uid: Some(index_uid.clone()), - source_id: source_id.clone(), - leader_id: "test-ingester-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }]; - model.insert_shards(&index_uid, &source_id, shards); - - let request = GetOrCreateOpenShardsRequest { - subrequests: Vec::new(), - closed_shards: vec![ShardIds { - index_uid: index_uid.clone().into(), - source_id: source_id.clone(), - shard_ids: vec![ShardId::from(1), ShardId::from(2)], - }], - unavailable_leaders: Vec::new(), - }; - let progress = Progress::default(); - - controller - .get_or_create_open_shards(request, &mut model, &progress) - .await - .unwrap(); - - let shard_1 = model - .all_shards() - .find(|shard| shard.shard_id() == ShardId::from(1)) - .unwrap(); - assert!(shard_1.is_closed()); - } - #[test] fn test_ingest_controller_allocate_shards() { let metastore = MetastoreServiceClient::mocked(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs b/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs index 19d6f5d691d..c571721fa7d 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs @@ -84,7 +84,6 @@ impl DebouncedGetOrCreateOpenShardsRequest { let request = GetOrCreateOpenShardsRequest { subrequests: self.subrequests, unavailable_leaders: self.unavailable_leaders, - ..Default::default() }; (Some(request), self.rendezvous) } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 5cd231d5a4a..1046c0d343b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -459,7 +459,6 @@ impl Ingester { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, - shard_id: subrequest.shard_id, reason: PersistFailureReason::NodeUnavailable as i32, }; persist_failures.push(persist_failure); @@ -489,7 +488,6 @@ impl Ingester { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, - shard_id: subrequest.shard_id, reason: PersistFailureReason::NoShardsAvailable as i32, }; persist_failures.push(persist_failure); @@ -530,7 +528,6 @@ impl Ingester { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, - shard_id: Some(shard_id), reason: PersistFailureReason::WalFull as i32, }; persist_failures.push(persist_failure); @@ -548,7 +545,6 @@ impl Ingester { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, - shard_id: Some(shard_id), reason: PersistFailureReason::NoShardsAvailable as i32, }; persist_failures.push(persist_failure); @@ -686,7 +682,6 @@ impl Ingester { subrequest_id: replicate_failure.subrequest_id, index_uid: replicate_failure.index_uid, source_id: replicate_failure.source_id, - shard_id: replicate_failure.shard_id, reason: persist_failure_reason as i32, }; persist_failures.push(persist_failure); @@ -736,7 +731,6 @@ impl Ingester { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, - shard_id: subrequest.shard_id, reason: reason as i32, }; persist_failures.push(persist_failure); @@ -1751,14 +1745,12 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"{"doc": "test-doc-010"}"#])), }, PersistSubrequest { subrequest_id: 1, index_uid: Some(index_uid2.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([ r#"{"doc": "test-doc-110"}"#, r#"{"doc": "test-doc-111"}"#, @@ -1872,7 +1864,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(0)), doc_batch: None, }], }; @@ -1933,7 +1924,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(0)), doc_batch: Some(DocBatchV2::for_test([ "", // invalid "[]", // invalid @@ -2008,7 +1998,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(0)), doc_batch: Some(DocBatchV2::for_test([ "", // invalid "[]", // invalid @@ -2071,7 +2060,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(0)), doc_batch: Some(DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#])), }], }; @@ -2132,7 +2120,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(0)), doc_batch: Some(DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#])), }], }; @@ -2193,7 +2180,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"test-doc-foo"#])), }], }; @@ -2206,7 +2192,6 @@ mod tests { assert_eq!(persist_failure.subrequest_id, 0); assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); - assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!( persist_failure.reason(), PersistFailureReason::NodeUnavailable, @@ -2246,7 +2231,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"{"doc": "test-doc-foo"}"#])), }], }; @@ -2259,7 +2243,6 @@ mod tests { assert_eq!(persist_failure.subrequest_id, 0); assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); - assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!( persist_failure.reason(), PersistFailureReason::NodeUnavailable @@ -2342,14 +2325,12 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"{"doc": "test-doc-010"}"#])), }, PersistSubrequest { subrequest_id: 1, index_uid: Some(index_uid2.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([ r#"{"doc": "test-doc-110"}"#, r#"{"doc": "test-doc-111"}"#, @@ -2550,14 +2531,12 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"{"doc": "test-doc-010"}"#])), }, PersistSubrequest { subrequest_id: 1, index_uid: Some(index_uid2.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([ r#"{"doc": "test-doc-110"}"#, r#"{"doc": "test-doc-111"}"#, @@ -2676,7 +2655,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"{"doc": "test-doc-010"}"#])), }], }; @@ -2689,7 +2667,6 @@ mod tests { assert_eq!(persist_failure.subrequest_id, 0); assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); - assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!( persist_failure.reason(), PersistFailureReason::NoShardsAvailable @@ -2755,7 +2732,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"{"doc": "test-doc-010"}"#])), }], }; @@ -2768,7 +2744,6 @@ mod tests { assert_eq!(persist_failure.subrequest_id, 0); assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); - assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!( persist_failure.reason(), PersistFailureReason::NoShardsAvailable @@ -2836,7 +2811,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test([r#"{"doc": "test-doc-010"}"#])), }], }; @@ -2849,7 +2823,6 @@ mod tests { assert_eq!(persist_failure.subrequest_id, 0); assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); - assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!(persist_failure.reason(), PersistFailureReason::WalFull); let state_guard = ingester.state.lock_fully().await.unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index da3d989d93e..7e293cff0ee 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -337,7 +337,6 @@ impl IngestRouter { subrequest_id: subrequest.subrequest_id, index_uid: Some(ingester_node.index_uid.clone()), source_id: subrequest.source_id.clone(), - shard_id: None, doc_batch: subrequest.doc_batch.clone(), }; per_leader_persist_subrequests @@ -881,7 +880,6 @@ mod tests { source_id: "source-not-found".to_string(), }, ], - closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; router @@ -1091,7 +1089,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), reason: PersistFailureReason::NoShardsAvailable as i32, }], }); @@ -1247,7 +1244,6 @@ mod tests { .returning(move |request| { assert_eq!(request.leader_id, "test-ingester-0"); assert_eq!(request.subrequests.len(), 1); - assert!(request.subrequests[0].shard_id.is_none()); Ok(PersistResponse { leader_id: request.leader_id, @@ -1279,7 +1275,6 @@ mod tests { .returning(move |request| { assert_eq!(request.leader_id, "test-ingester-1"); assert_eq!(request.subrequests.len(), 1); - assert!(request.subrequests[0].shard_id.is_none()); Ok(PersistResponse { leader_id: request.leader_id, @@ -1373,7 +1368,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid_clone.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), reason: PersistFailureReason::NoShardsAvailable as i32, }], }) @@ -1513,7 +1507,6 @@ mod tests { assert_eq!(subrequest.subrequest_id, 0); let index_uid = subrequest.index_uid().clone(); assert_eq!(subrequest.source_id, "test-source"); - assert!(subrequest.shard_id.is_none()); assert_eq!( subrequest.doc_batch, Some(DocBatchV2::for_test(["test-doc-foo"])) @@ -1526,7 +1519,6 @@ mod tests { subrequest_id: 0, index_uid: Some(index_uid), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), reason: PersistFailureReason::NoShardsAvailable as i32, }], }; @@ -1626,7 +1618,6 @@ mod tests { subrequest_id: 0, index_uid: Some(IndexUid::for_test("test-index-0", 0)), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), reason: PersistFailureReason::NoShardsAvailable as i32, }], }); @@ -1655,7 +1646,6 @@ mod tests { subrequest_id: 1, index_uid: Some(IndexUid::for_test("test-index-1", 0)), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), reason: PersistFailureReason::NodeUnavailable as i32, }], }); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 3e7b22969e8..4fafcf2bc0a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -559,7 +559,6 @@ mod tests { let persist_failure = PersistFailure { subrequest_id: 1, - shard_id: Some(shard_id_2.clone()), ..Default::default() }; workbench.record_persist_failure(&persist_failure); @@ -806,7 +805,6 @@ mod tests { let persist_failure = PersistFailure { subrequest_id: 0, - shard_id: Some(ShardId::from(1)), reason: PersistFailureReason::WalFull as i32, ..Default::default() }; diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index d0850091280..0e60be0fd93 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -76,7 +76,6 @@ service ControlPlaneService { message GetOrCreateOpenShardsRequest { // There should be at most one subrequest per index per request. repeated GetOrCreateOpenShardsSubrequest subrequests = 1; - repeated quickwit.ingest.ShardIds closed_shards = 2; // The control plane should return shards that are not present on the supplied leaders. // // The control plane does not change the status of those leaders just from this signal. diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 25a4705d58a..7a8d0c27105 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -73,7 +73,6 @@ message PersistSubrequest { uint32 subrequest_id = 1; quickwit.common.IndexUid index_uid = 2; string source_id = 3; - quickwit.ingest.ShardId shard_id = 4; quickwit.ingest.DocBatchV2 doc_batch = 5; } @@ -106,7 +105,6 @@ message PersistFailure { uint32 subrequest_id = 1; quickwit.common.IndexUid index_uid = 2; string source_id = 3; - quickwit.ingest.ShardId shard_id = 4; PersistFailureReason reason = 5; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 09cfbdebf58..28eaaa18ed9 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -5,8 +5,6 @@ pub struct GetOrCreateOpenShardsRequest { /// There should be at most one subrequest per index per request. #[prost(message, repeated, tag = "1")] pub subrequests: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag = "2")] - pub closed_shards: ::prost::alloc::vec::Vec, /// The control plane should return shards that are not present on the supplied leaders. /// /// The control plane does not change the status of those leaders just from this signal. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 07b8d5b64a1..3ad77cd0a19 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -37,8 +37,6 @@ pub struct PersistSubrequest { pub index_uid: ::core::option::Option, #[prost(string, tag = "3")] pub source_id: ::prost::alloc::string::String, - #[prost(message, optional, tag = "4")] - pub shard_id: ::core::option::Option, #[prost(message, optional, tag = "5")] pub doc_batch: ::core::option::Option, } @@ -79,8 +77,6 @@ pub struct PersistFailure { pub index_uid: ::core::option::Option, #[prost(string, tag = "3")] pub source_id: ::prost::alloc::string::String, - #[prost(message, optional, tag = "4")] - pub shard_id: ::core::option::Option, #[prost(enumeration = "PersistFailureReason", tag = "5")] pub reason: i32, } diff --git a/quickwit/quickwit-proto/src/getters.rs b/quickwit/quickwit-proto/src/getters.rs index a327c1717a7..2607bfd2d74 100644 --- a/quickwit/quickwit-proto/src/getters.rs +++ b/quickwit/quickwit-proto/src/getters.rs @@ -209,8 +209,6 @@ generate_getters! { InitShardFailure, OpenFetchStreamRequest, OpenShardSubrequest, - PersistFailure, - PersistSubrequest, PersistSuccess, ReplicateFailure, ReplicateSubrequest, diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index d2da3f8d9bd..3d6bb896e37 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -85,12 +85,6 @@ impl OpenFetchStreamRequest { } } -impl PersistSubrequest { - pub fn queue_id(&self) -> QueueId { - queue_id(self.index_uid(), &self.source_id, self.shard_id()) - } -} - impl PersistSuccess { pub fn queue_id(&self) -> QueueId { queue_id(self.index_uid(), &self.source_id, self.shard_id())