Skip to content

Commit db1fe83

Browse files
committed
DRY ephemeral key matching, fix visibility and appropriate kvstore usage
- Restrict `TierStoreInner` visibility from `pub` to `pub(crate)` - Primary store can be either local or remote - Extract repeated ephemeral key matching into a standalone `is_ephemerally_cached_key` helper to DRY up `read_internal`, `write_internal`, and `remove_internal` - Replace `KVStoreSync::list` with async `KVStore::list` in `list_internal` to avoid blocking the async runtime
1 parent 08fe778 commit db1fe83

File tree

1 file changed

+71
-92
lines changed

1 file changed

+71
-92
lines changed

src/io/tier_store.rs

Lines changed: 71 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,8 @@ impl KVStoreSync for TierStore {
230230
}
231231
}
232232

233-
pub struct TierStoreInner {
234-
/// For remote data.
233+
struct TierStoreInner {
234+
/// For local or remote data.
235235
primary_store: Arc<DynStore>,
236236
/// For local non-critical/ephemeral data.
237237
ephemeral_store: Option<Arc<DynStore>>,
@@ -395,9 +395,7 @@ impl TierStoreInner {
395395
match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace)
396396
.await
397397
{
398-
Ok(keys) => {
399-
return Ok(keys);
400-
},
398+
Ok(keys) => Ok(keys),
401399
Err(e) => {
402400
log_error!(
403401
self.logger,
@@ -505,104 +503,76 @@ impl TierStoreInner {
505503
"read",
506504
)?;
507505

508-
match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) {
509-
(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY)
510-
| (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => {
511-
if let Some(eph_store) = self.ephemeral_store.as_ref() {
512-
// We only try once here (without retry logic) because local failure might be indicative
513-
// of a more serious issue (e.g. full memory, memory corruption, permissions change) that
514-
// do not self-resolve such that retrying would negate the latency benefits.
515-
516-
// The following questions remain:
517-
// 1. Are there situations where local transient errors may warrant a retry?
518-
// 2. Can we reliably identify/detect these transient errors?
519-
// 3. Should we fall back to the primary or backup stores in the event of any error?
520-
KVStore::read(
521-
eph_store.as_ref(),
522-
&primary_namespace,
523-
&secondary_namespace,
524-
&key,
525-
)
526-
.await
527-
} else {
528-
self.read_primary(&primary_namespace, &secondary_namespace, &key).await
529-
}
530-
},
531-
_ => self.read_primary(&primary_namespace, &secondary_namespace, &key).await,
506+
if let Some(eph_store) = self
507+
.ephemeral_store
508+
.as_ref()
509+
.filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key))
510+
{
511+
// We only try once here (without retry logic) because local failure might be indicative
512+
// of a more serious issue (e.g. full memory, memory corruption, permissions change) that
513+
// do not self-resolve such that retrying would negate the latency benefits.
514+
515+
// The following questions remain:
516+
// 1. Are there situations where local transient errors may warrant a retry?
517+
// 2. Can we reliably identify/detect these transient errors?
518+
// 3. Should we fall back to the primary or backup stores in the event of any error?
519+
KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await
520+
} else {
521+
self.read_primary(&primary_namespace, &secondary_namespace, &key).await
532522
}
533523
}
534524

535525
async fn write_internal(
536526
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
537527
) -> io::Result<()> {
538-
match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) {
539-
(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY)
540-
| (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => {
541-
if let Some(eph_store) = &self.ephemeral_store {
542-
KVStore::write(
543-
eph_store.as_ref(),
544-
primary_namespace.as_str(),
545-
secondary_namespace.as_str(),
546-
key.as_str(),
547-
buf,
548-
)
549-
.await
550-
} else {
551-
self.primary_write_then_schedule_backup(
552-
primary_namespace.as_str(),
553-
secondary_namespace.as_str(),
554-
key.as_str(),
555-
buf,
556-
)
557-
.await
558-
}
559-
},
560-
_ => {
561-
self.primary_write_then_schedule_backup(
562-
primary_namespace.as_str(),
563-
secondary_namespace.as_str(),
564-
key.as_str(),
565-
buf,
566-
)
567-
.await
568-
},
528+
if let Some(eph_store) = self
529+
.ephemeral_store
530+
.as_ref()
531+
.filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key))
532+
{
533+
KVStore::write(
534+
eph_store.as_ref(),
535+
primary_namespace.as_str(),
536+
secondary_namespace.as_str(),
537+
key.as_str(),
538+
buf,
539+
)
540+
.await
541+
} else {
542+
self.primary_write_then_schedule_backup(
543+
primary_namespace.as_str(),
544+
secondary_namespace.as_str(),
545+
key.as_str(),
546+
buf,
547+
)
548+
.await
569549
}
570550
}
571551

572552
async fn remove_internal(
573553
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
574554
) -> io::Result<()> {
575-
match (primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str()) {
576-
(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY)
577-
| (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) => {
578-
if let Some(eph_store) = &self.ephemeral_store {
579-
KVStore::remove(
580-
eph_store.as_ref(),
581-
primary_namespace.as_str(),
582-
secondary_namespace.as_str(),
583-
key.as_str(),
584-
lazy,
585-
)
586-
.await
587-
} else {
588-
self.primary_remove_then_schedule_backup(
589-
primary_namespace.as_str(),
590-
secondary_namespace.as_str(),
591-
key.as_str(),
592-
lazy,
593-
)
594-
.await
595-
}
596-
},
597-
_ => {
598-
self.primary_remove_then_schedule_backup(
599-
primary_namespace.as_str(),
600-
secondary_namespace.as_str(),
601-
key.as_str(),
602-
lazy,
603-
)
604-
.await
605-
},
555+
if let Some(eph_store) = self
556+
.ephemeral_store
557+
.as_ref()
558+
.filter(|_s| is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key))
559+
{
560+
KVStore::remove(
561+
eph_store.as_ref(),
562+
primary_namespace.as_str(),
563+
secondary_namespace.as_str(),
564+
key.as_str(),
565+
lazy,
566+
)
567+
.await
568+
} else {
569+
self.primary_remove_then_schedule_backup(
570+
primary_namespace.as_str(),
571+
secondary_namespace.as_str(),
572+
key.as_str(),
573+
lazy,
574+
)
575+
.await
606576
}
607577
}
608578

@@ -616,7 +586,8 @@ impl TierStoreInner {
616586
)
617587
| (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => {
618588
if let Some(eph_store) = self.ephemeral_store.as_ref() {
619-
KVStoreSync::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace)
589+
KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace)
590+
.await
620591
} else {
621592
self.list_primary(&primary_namespace, &secondary_namespace).await
622593
}
@@ -653,6 +624,14 @@ impl BackupOp {
653624
}
654625
}
655626

627+
fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool {
628+
matches!(
629+
(pn, sn, key),
630+
(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY)
631+
| (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY)
632+
)
633+
}
634+
656635
#[cfg(test)]
657636
mod tests {
658637
use std::panic::RefUnwindSafe;

0 commit comments

Comments
 (0)