Skip to content

Commit 1fe33ea

Browse files
committed
changed failure behavior and error handling
1 parent 2466874 commit 1fe33ea

File tree

3 files changed

+738
-78
lines changed

3 files changed

+738
-78
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ inside this directory.
6060
- `compress`: compress an existing `.nt`
6161
- `decompress`: decompress `.nt.gz`, `.nt.br`, or `.hdt`
6262

63+
In `full` mode with multiple VCF inputs, failures are isolated per input:
64+
- the run continues with remaining files
65+
- failed inputs are summarized in `run_metrics/<RUN_ID>/failed_inputs.csv`
66+
6367
## Main Flags (Most Used)
6468

6569
- `-m, --mode {full,compress,decompress}`
@@ -165,6 +169,21 @@ Given `--out ./results`:
165169
Intermediates are hidden by default.
166170
Raw `.nt` files are removed after compression unless `--keep-rdf` is provided.
167171

172+
## Metrics
173+
174+
For each run, VCF-RDFizer writes:
175+
176+
- `run_metrics/<RUN_ID>/metrics.csv`
177+
- `run_metrics/<RUN_ID>/wrapper_execution_times.csv`
178+
- `run_metrics/<RUN_ID>/progress.log`
179+
180+
Compression metrics now include per-method:
181+
182+
- `wall_seconds_*`
183+
- `user_seconds_*`
184+
- `sys_seconds_*`
185+
- `max_rss_kb_*`
186+
168187
## Rules
169188

170189
- default rules file: `rules/default_rules.ttl`
@@ -176,6 +195,14 @@ If Docker permission issues occur, rerun with a Docker-allowed user (or configur
176195

177196
If HDT compression fails on very large `.nt` files, use batch layout and/or non-HDT compression methods.
178197

198+
Safe termination:
199+
200+
- Press `Ctrl+C` to interrupt a run.
201+
- The wrapper exits with code `130`, writes progress to `run_metrics/<RUN_ID>/progress.log`, and performs best-effort cleanup of tracked intermediates.
202+
- Raw RDF cleanup on interrupt follows `--keep-rdf`:
203+
- with `--keep-rdf`, raw `.nt` files are preserved
204+
- without `--keep-rdf`, tracked raw `.nt` files are removed during interrupt cleanup
205+
179206
## Citation
180207

181208
If you use VCF-RDFizer in a publication, please cite:

test/test_vcf_rdfizer_unit.py

Lines changed: 216 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ def output_name_from_command(cmd):
7373
if isinstance(part, str) and part.startswith("OUT_NAME="):
7474
return part.split("=", 1)[1]
7575
if isinstance(cmd, list) and cmd and isinstance(cmd[-1], str):
76-
match = re.search(r"/data/out/(?:([^/]+)/)?([^/]+)\.hdt", cmd[-1])
77-
if match:
78-
return match.group(1) or match.group(2)
76+
matches = re.findall(r"/data/out/(?:([^/]+)/)?([^/]+)\.hdt(?!\.time)", cmd[-1])
77+
if matches:
78+
group1, group2 = matches[-1]
79+
return group1 or group2
7980
return None
8081

8182

@@ -145,12 +146,18 @@ def test_update_metrics_csv_keeps_raw_and_hdt_compound_metrics_separate(self):
145146
"output_size_bytes": 40,
146147
"exit_code": 0,
147148
"wall_seconds": 1.25,
149+
"user_seconds": 1.10,
150+
"sys_seconds": 0.10,
151+
"max_rss_kb": 2048,
148152
"source": "existing",
149153
},
150154
"hdt_gzip": {
151155
"output_size_bytes": 12,
152156
"exit_code": 0,
153157
"wall_seconds": 0.50,
158+
"user_seconds": 0.30,
159+
"sys_seconds": 0.05,
160+
"max_rss_kb": 512,
154161
},
155162
},
156163
)
@@ -163,6 +170,94 @@ def test_update_metrics_csv_keeps_raw_and_hdt_compound_metrics_separate(self):
163170
self.assertEqual(row["exit_code_gzip"], "0")
164171
self.assertEqual(row["exit_code_gzip_on_hdt"], "0")
165172
self.assertEqual(row["hdt_source"], "existing")
173+
self.assertEqual(row["user_seconds_hdt"], "1.100000")
174+
self.assertEqual(row["sys_seconds_hdt"], "0.100000")
175+
self.assertEqual(row["max_rss_kb_hdt"], "2048")
176+
self.assertEqual(row["user_seconds_gzip_on_hdt"], "0.300000")
177+
self.assertEqual(row["sys_seconds_gzip_on_hdt"], "0.050000")
178+
self.assertEqual(row["max_rss_kb_gzip_on_hdt"], "512")
179+
180+
def test_parse_time_log_metrics_reads_gnu_time_fields(self):
181+
"""GNU time logs are parsed for wall/user/sys/max RSS values."""
182+
with tempfile.TemporaryDirectory() as td:
183+
tmp_path = Path(td)
184+
time_log = tmp_path / "time.log"
185+
time_log.write_text(
186+
"User time (seconds): 1.23\n"
187+
"System time (seconds): 0.45\n"
188+
"Elapsed (wall clock) time (h:mm:ss or m:ss): 0:02.50\n"
189+
"Maximum resident set size (kbytes): 12345\n"
190+
)
191+
parsed = vcf_rdfizer.parse_time_log_metrics(time_log)
192+
193+
self.assertEqual(parsed["wall_seconds"], 2.5)
194+
self.assertEqual(parsed["user_seconds"], 1.23)
195+
self.assertEqual(parsed["sys_seconds"], 0.45)
196+
self.assertEqual(parsed["max_rss_kb"], 12345)
197+
198+
def test_cleanup_interrupted_full_run_removes_intermediates(self):
199+
"""Interrupt cleanup removes tracked intermediate and raw RDF files."""
200+
with tempfile.TemporaryDirectory() as td:
201+
tmp_path = Path(td)
202+
out_root = tmp_path / "out"
203+
metrics_dir = out_root / "run_metrics" / "20260304T120000"
204+
tsv_file = out_root / ".intermediate" / "tsv" / "sample.records.tsv"
205+
raw_rdf = out_root / "sample" / "sample.nt"
206+
tsv_file.parent.mkdir(parents=True, exist_ok=True)
207+
raw_rdf.parent.mkdir(parents=True, exist_ok=True)
208+
tsv_file.write_text("dummy\n")
209+
raw_rdf.write_text("<s> <p> <o> .\n")
210+
211+
tracker = vcf_rdfizer.RunTracker(metrics_dir / "progress.log")
212+
tracker.track_intermediate(tsv_file.parent)
213+
tracker.track_raw_rdf(raw_rdf)
214+
removed, failed = vcf_rdfizer.cleanup_interrupted_full_run(
215+
run_tracker=tracker,
216+
out_root=out_root,
217+
image_ref=None,
218+
keep_rdf=False,
219+
wrapper_log_path=metrics_dir / "wrapper.log",
220+
)
221+
tracker.close()
222+
223+
self.assertGreaterEqual(removed, 2)
224+
self.assertEqual(failed, 0)
225+
self.assertFalse(tsv_file.parent.exists())
226+
self.assertFalse(raw_rdf.exists())
227+
228+
def test_aggregate_method_results_includes_all_timing_types(self):
229+
"""Batch aggregation keeps wall/user/sys and max_rss metrics for each method."""
230+
aggregated = vcf_rdfizer.aggregate_method_results_across_files(
231+
{
232+
"part1.nt": {
233+
"gzip": {
234+
"exit_code": 0,
235+
"wall_seconds": 1.0,
236+
"user_seconds": 0.7,
237+
"sys_seconds": 0.1,
238+
"max_rss_kb": 100,
239+
"output_size_bytes": 10,
240+
}
241+
},
242+
"part2.nt": {
243+
"gzip": {
244+
"exit_code": 0,
245+
"wall_seconds": 2.0,
246+
"user_seconds": 1.2,
247+
"sys_seconds": 0.2,
248+
"max_rss_kb": 150,
249+
"output_size_bytes": 20,
250+
}
251+
},
252+
}
253+
)
254+
255+
gzip = aggregated["gzip"]
256+
self.assertEqual(gzip["wall_seconds"], 3.0)
257+
self.assertAlmostEqual(gzip["user_seconds"], 1.9, places=6)
258+
self.assertAlmostEqual(gzip["sys_seconds"], 0.3, places=6)
259+
self.assertEqual(gzip["max_rss_kb"], 150)
260+
self.assertEqual(gzip["output_size_bytes"], 30)
166261

167262
def test_help_flag_prints_usage_guide(self):
168263
"""Help flag exits cleanly and prints mode usage examples."""
@@ -616,6 +711,44 @@ def test_main_full_mode_requires_rdf_layout_argument(self):
616711
)
617712
self.assertEqual(rc, 2)
618713

714+
def test_main_full_mode_keyboard_interrupt_returns_130_and_writes_progress_log(self):
715+
"""Keyboard interrupt exits with 130 and records interruption in progress log."""
716+
with tempfile.TemporaryDirectory() as td:
717+
tmp_path = Path(td)
718+
input_dir, rules_path = prepare_inputs(tmp_path)
719+
out_dir = tmp_path / "out"
720+
721+
old_cwd = os.getcwd()
722+
os.chdir(tmp_path)
723+
try:
724+
with mock.patch.object(vcf_rdfizer, "check_docker", return_value=True), mock.patch.object(
725+
vcf_rdfizer, "docker_image_exists", return_value=True
726+
), mock.patch.object(
727+
vcf_rdfizer, "run_full_mode", side_effect=KeyboardInterrupt()
728+
):
729+
rc = invoke_main(
730+
[
731+
"--mode",
732+
"full",
733+
"--input",
734+
str(input_dir),
735+
"--rules",
736+
str(rules_path),
737+
"--rdf-layout",
738+
"aggregate",
739+
"--out",
740+
str(out_dir),
741+
]
742+
)
743+
finally:
744+
os.chdir(old_cwd)
745+
746+
self.assertEqual(rc, 130)
747+
run_metrics_dir = latest_metrics_run_dir(out_dir / "run_metrics")
748+
progress_log = run_metrics_dir / "progress.log"
749+
self.assertTrue(progress_log.exists())
750+
self.assertIn("Run interrupted by user signal", progress_log.read_text())
751+
619752
def test_main_compress_mode_none_skips_compression_commands(self):
620753
"""Compression mode with method none performs no compression runs."""
621754
with tempfile.TemporaryDirectory() as td:
@@ -1010,6 +1143,86 @@ def fake_run(cmd, cwd=None, env=None):
10101143
self.assertIn("rdf2hdt", commands[9][-1])
10111144
self.assertIn("/data/out/sample_b.hdt", commands[9][-1])
10121145

1146+
def test_main_multiple_inputs_continue_after_one_failure_and_write_failure_report(self):
1147+
"""Multi-input full mode continues after one input fails and writes failed_inputs.csv."""
1148+
with tempfile.TemporaryDirectory() as td:
1149+
tmp_path = Path(td)
1150+
input_dir = tmp_path / "input"
1151+
input_dir.mkdir()
1152+
(input_dir / "sample_a.vcf").write_text("##fileformat=VCFv4.2\n#CHROM\tPOS\n1\t10\n")
1153+
(input_dir / "sample_b.vcf").write_text("##fileformat=VCFv4.2\n#CHROM\tPOS\n1\t20\n")
1154+
rules_path = tmp_path / "rules.ttl"
1155+
rules_path.write_text("@prefix ex: <http://example.org/> .\n")
1156+
out_dir = tmp_path / "out"
1157+
commands = []
1158+
1159+
multi_triplets = [
1160+
{
1161+
"prefix": "sample_a",
1162+
"records": Path("sample_a.records.tsv"),
1163+
"headers": Path("sample_a.header_lines.tsv"),
1164+
"metadata": Path("sample_a.file_metadata.tsv"),
1165+
},
1166+
{
1167+
"prefix": "sample_b",
1168+
"records": Path("sample_b.records.tsv"),
1169+
"headers": Path("sample_b.header_lines.tsv"),
1170+
"metadata": Path("sample_b.file_metadata.tsv"),
1171+
},
1172+
]
1173+
1174+
def fake_run(cmd, cwd=None, env=None):
1175+
commands.append(cmd)
1176+
if "/opt/vcf-rdfizer/run_conversion.sh" in cmd:
1177+
output_name = output_name_from_command(cmd) or "sample"
1178+
if output_name == "sample_a":
1179+
return 1
1180+
out_sample_dir = out_dir / output_name
1181+
out_sample_dir.mkdir(parents=True, exist_ok=True)
1182+
(out_sample_dir / f"{output_name}.nt").write_text("<s> <p> <o> .\n")
1183+
return 0
1184+
if isinstance(cmd, list) and cmd and "rdf2hdt" in cmd[-1]:
1185+
output_name = output_name_from_command(cmd) or "sample"
1186+
out_sample_dir = out_dir / output_name
1187+
out_sample_dir.mkdir(parents=True, exist_ok=True)
1188+
(out_sample_dir / f"{output_name}.hdt").write_text("fake-hdt\n")
1189+
return 0
1190+
1191+
old_cwd = os.getcwd()
1192+
os.chdir(tmp_path)
1193+
try:
1194+
with mock.patch.object(vcf_rdfizer, "run", side_effect=fake_run), mock.patch.object(
1195+
vcf_rdfizer, "check_docker", return_value=True
1196+
), mock.patch.object(
1197+
vcf_rdfizer, "docker_image_exists", return_value=True
1198+
), mock.patch.object(
1199+
vcf_rdfizer, "discover_tsv_triplets", return_value=multi_triplets
1200+
):
1201+
rc = invoke_main(
1202+
[
1203+
"--input",
1204+
str(input_dir),
1205+
"--rules",
1206+
str(rules_path),
1207+
"--out",
1208+
str(out_dir),
1209+
"--keep-tsv",
1210+
]
1211+
)
1212+
finally:
1213+
os.chdir(old_cwd)
1214+
1215+
self.assertEqual(rc, 1)
1216+
self.assertTrue(any("/data/in/sample_b.vcf" in str(cmd) for cmd in commands))
1217+
self.assertTrue((out_dir / "sample_b" / "sample_b.hdt").exists())
1218+
1219+
run_metrics_dir = latest_metrics_run_dir(out_dir / "run_metrics")
1220+
failed_report = run_metrics_dir / "failed_inputs.csv"
1221+
self.assertTrue(failed_report.exists())
1222+
report_text = failed_report.read_text()
1223+
self.assertIn("sample_a", report_text)
1224+
self.assertIn("rdf-conversion", report_text)
1225+
10131226
def test_main_full_mode_deletes_nt_after_compression_by_default(self):
10141227
"""Full mode removes merged .nt outputs after successful compression unless --keep-rdf is set."""
10151228
with tempfile.TemporaryDirectory() as td:

0 commit comments

Comments
 (0)