Skip to content

Commit 8cc0b26

Browse files
committed
perf: short-circuit skip-existing reuse
1 parent 86b85bb commit 8cc0b26

7 files changed

Lines changed: 435 additions & 186 deletions

File tree

atlas_patch/cli.py

Lines changed: 73 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -449,46 +449,84 @@ def _run_pipeline_from_config(
449449

450450
configure_logging(verbose)
451451

452-
from atlas_patch.orchestration.runner import ProcessingRunner
453-
from atlas_patch.services.extraction import PatchExtractionService
454-
from atlas_patch.services.feature_embedding import PatchFeatureEmbeddingService
455-
from atlas_patch.services.mpp import CSVMPPResolver
456-
from atlas_patch.services.segmentation import SAM2SegmentationService
457-
from atlas_patch.services.visualization import DefaultVisualizationService
458-
from atlas_patch.services.wsi_loader import DefaultWSILoader
452+
from atlas_patch.orchestration.runner import ProcessingRunner, classify_existing_slide_output
453+
454+
if slides is None:
455+
slides = [
456+
Slide(path=Path(path))
457+
for path in get_wsi_files(
458+
str(app_cfg.processing.input_path),
459+
recursive=app_cfg.processing.recursive,
460+
)
461+
]
462+
else:
463+
slides = list(slides)
459464

460-
segmentation_service = SAM2SegmentationService(app_cfg.segmentation)
461-
extractor_service = PatchExtractionService(app_cfg.extraction, app_cfg.output)
462-
visualizer_service = None
463-
if (
464-
app_cfg.output.visualize_grids
465-
or app_cfg.output.visualize_mask
466-
or app_cfg.output.visualize_contours
467-
):
468-
visualizer_service = DefaultVisualizationService(
469-
app_cfg.output,
470-
app_cfg.extraction,
471-
app_cfg.visualization,
465+
if not slides:
466+
logging.getLogger("atlas_patch.runner").warning("No slides found to process.")
467+
return [], []
468+
469+
preflight_results: list = []
470+
slides_to_process: list[Slide] = []
471+
for slide in slides:
472+
decision, existing_result = classify_existing_slide_output(app_cfg, slide)
473+
if decision == "skip":
474+
continue
475+
if decision == "reuse" and existing_result is not None:
476+
preflight_results.append(existing_result)
477+
continue
478+
slides_to_process.append(slide)
479+
480+
results = list(preflight_results)
481+
failures: list = []
482+
wsi_loader = None
483+
484+
if slides_to_process:
485+
from atlas_patch.services.extraction import PatchExtractionService
486+
from atlas_patch.services.mpp import CSVMPPResolver
487+
from atlas_patch.services.segmentation import SAM2SegmentationService
488+
from atlas_patch.services.visualization import DefaultVisualizationService
489+
from atlas_patch.services.wsi_loader import DefaultWSILoader
490+
491+
wsi_loader = DefaultWSILoader()
492+
segmentation_service = SAM2SegmentationService(app_cfg.segmentation)
493+
extractor_service = PatchExtractionService(app_cfg.extraction, app_cfg.output)
494+
visualizer_service = None
495+
if (
496+
app_cfg.output.visualize_grids
497+
or app_cfg.output.visualize_mask
498+
or app_cfg.output.visualize_contours
499+
):
500+
visualizer_service = DefaultVisualizationService(
501+
app_cfg.output,
502+
app_cfg.extraction,
503+
app_cfg.visualization,
504+
)
505+
506+
mpp_resolver = CSVMPPResolver(app_cfg.processing.mpp_csv)
507+
runner = ProcessingRunner(
508+
config=app_cfg,
509+
segmentation=segmentation_service,
510+
extractor=extractor_service,
511+
visualizer=visualizer_service,
512+
mpp_resolver=mpp_resolver,
513+
wsi_loader=wsi_loader,
514+
show_progress=not verbose,
472515
)
473516

474-
mpp_resolver = CSVMPPResolver(app_cfg.processing.mpp_csv)
475-
wsi_loader = DefaultWSILoader()
476-
runner = ProcessingRunner(
477-
config=app_cfg,
478-
segmentation=segmentation_service,
479-
extractor=extractor_service,
480-
visualizer=visualizer_service,
481-
mpp_resolver=mpp_resolver,
482-
wsi_loader=wsi_loader,
483-
show_progress=not verbose,
484-
)
517+
try:
518+
run_results, run_failures = runner.run(slides=slides_to_process)
519+
results.extend(run_results)
520+
failures.extend(run_failures)
521+
finally:
522+
segmentation_service.close()
485523

486-
try:
487-
results, failures = runner.run(slides=slides)
488-
finally:
489-
segmentation_service.close()
524+
if app_cfg.features is not None and results:
525+
from atlas_patch.services.feature_embedding import PatchFeatureEmbeddingService
526+
from atlas_patch.services.wsi_loader import DefaultWSILoader
490527

491-
if app_cfg.features is not None:
528+
if wsi_loader is None:
529+
wsi_loader = DefaultWSILoader()
492530
feature_service = PatchFeatureEmbeddingService(
493531
app_cfg.extraction,
494532
app_cfg.output,

atlas_patch/orchestration/runner.py

Lines changed: 79 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,78 @@ def _chunked(items: Sequence[Slide], size: int) -> Iterable[Sequence[Slide]]:
3636
yield items[i : i + size]
3737

3838

39+
def build_existing_extraction_result(slide: Slide, h5_path: Path) -> ExtractionResult | None:
40+
"""Create a lightweight ExtractionResult from an existing H5."""
41+
metadata: dict[str, Any] = {}
42+
num_patches: int | None = None
43+
patch_size_level0: int | None = None
44+
try:
45+
with h5py.File(h5_path, "r") as f:
46+
num_attr = f.attrs.get("num_patches")
47+
if num_attr is not None:
48+
num_patches = int(num_attr)
49+
elif "coords" in f:
50+
num_patches = int(f["coords"].shape[0])
51+
52+
ps_level0_attr = f.attrs.get("patch_size_level0")
53+
if ps_level0_attr is not None:
54+
patch_size_level0 = int(ps_level0_attr)
55+
except Exception as e: # noqa: BLE001
56+
logger.warning(
57+
"Failed to read existing output for %s; will reprocess. Error: %s",
58+
slide.path.name,
59+
e,
60+
)
61+
return None
62+
63+
if num_patches is None or num_patches <= 0:
64+
return None
65+
66+
return ExtractionResult(
67+
slide=slide,
68+
h5_path=h5_path,
69+
num_patches=int(num_patches),
70+
patch_size_level0=patch_size_level0,
71+
metadata=metadata,
72+
)
73+
74+
75+
def classify_existing_slide_output(
76+
config: AppConfig,
77+
slide: Slide,
78+
) -> tuple[str | None, ExtractionResult | None]:
79+
"""Return how an existing patch H5 should be treated for this run.
80+
81+
Returns one of:
82+
- (None, None): no reusable output; the slide needs full processing
83+
- ("skip", None): output is fully complete for this run
84+
- ("reuse", ExtractionResult): reuse patches/H5 and continue with downstream features
85+
"""
86+
if not config.output.skip_existing:
87+
return None, None
88+
89+
existing_path = find_existing_patch(slide, config.output, config.extraction)
90+
if existing_path is None:
91+
return None, None
92+
93+
feat_cfg = config.features
94+
if feat_cfg is None or not feat_cfg.extractors:
95+
return "skip", None
96+
97+
existing_result = build_existing_extraction_result(slide, existing_path)
98+
if existing_result is None:
99+
return None, None
100+
101+
missing = missing_features(
102+
existing_path,
103+
feat_cfg.extractors,
104+
expected_total=existing_result.num_patches,
105+
)
106+
if not missing:
107+
return "skip", None
108+
return "reuse", existing_result
109+
110+
39111
class ProcessingRunner:
40112
"""High-level orchestration of WSI segmentation, patch extraction, and visualization."""
41113

@@ -68,41 +140,6 @@ def discover_slides(self) -> list[Slide]:
68140
slides.append(slide)
69141
return slides
70142

71-
def _build_existing_result(self, slide: Slide, h5_path: Path) -> ExtractionResult | None:
72-
"""Create a lightweight ExtractionResult from an existing H5 (no re-segmentation)."""
73-
metadata: dict[str, Any] = {}
74-
num_patches: int | None = None
75-
patch_size_level0: int | None = None
76-
try:
77-
with h5py.File(h5_path, "r") as f:
78-
num_attr = f.attrs.get("num_patches")
79-
if num_attr is not None:
80-
num_patches = int(num_attr)
81-
elif "coords" in f:
82-
num_patches = int(f["coords"].shape[0])
83-
84-
ps_level0_attr = f.attrs.get("patch_size_level0")
85-
if ps_level0_attr is not None:
86-
patch_size_level0 = int(ps_level0_attr)
87-
except Exception as e: # noqa: BLE001
88-
logger.warning(
89-
"Failed to read existing output for %s; will reprocess. Error: %s",
90-
slide.path.name,
91-
e,
92-
)
93-
return None
94-
95-
if num_patches is None or num_patches <= 0:
96-
return None
97-
98-
return ExtractionResult(
99-
slide=slide,
100-
h5_path=h5_path,
101-
num_patches=int(num_patches),
102-
patch_size_level0=patch_size_level0,
103-
metadata=metadata,
104-
)
105-
106143
def _handle_existing_slide(
107144
self,
108145
slide: Slide,
@@ -113,35 +150,22 @@ def _handle_existing_slide(
113150
114151
Returns True when the slide is fully handled (skip or reuse), False to continue processing.
115152
"""
116-
if not self.config.output.skip_existing:
117-
return False
118-
119-
existing_path = find_existing_patch(slide, self.config.output, self.config.extraction)
120-
if existing_path is None:
153+
decision, existing_result = classify_existing_slide_output(self.config, slide)
154+
if decision is None:
121155
return False
122-
123-
feat_cfg = self.config.features
124-
if feat_cfg is None or not feat_cfg.extractors:
156+
if decision == "skip":
125157
logger.info("Skipping %s (already processed).", slide.path.name)
126158
if progress:
127159
progress.update(1)
128160
return True
129-
130-
existing_result = self._build_existing_result(slide, existing_path)
131161
if existing_result is None:
132-
logger.info("Existing output invalid for %s; reprocessing.", slide.path.name)
133162
return False
134-
163+
results.append(existing_result)
135164
missing = missing_features(
136-
existing_path, feat_cfg.extractors, expected_total=existing_result.num_patches
165+
existing_result.h5_path,
166+
self.config.features.extractors,
167+
expected_total=existing_result.num_patches,
137168
)
138-
if not missing:
139-
logger.info("Skipping %s (features complete).", slide.path.name)
140-
if progress:
141-
progress.update(1)
142-
return True
143-
144-
results.append(existing_result)
145169
logger.info(
146170
"Reusing existing patches for %s; missing features: %s",
147171
slide.path.name,

atlas_patch/services/feature_embedding.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ def embed_all(
275275
progress.update(completed_units)
276276

277277
for name in self.extractor_names:
278+
if not any(name in missing_for_slide for missing_for_slide in pending.values()):
279+
continue
278280
try:
279281
extractor = self.registry.create(name)
280282
except Exception as e: # noqa: BLE001

0 commit comments

Comments
 (0)