Skip to content

Commit fb48e50

Browse files
committed
fixed aggregate wall-time reporting bug, now uses wrapper timings as sanity check
1 parent e3149d9 commit fb48e50

File tree

2 files changed

+120
-5
lines changed

2 files changed

+120
-5
lines changed

test/test_vcf_rdfizer_unit.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,76 @@ def test_parse_time_log_metrics_reads_gnu_time_fields(self):
197197
self.assertEqual(parsed["sys_seconds"], 0.45)
198198
self.assertEqual(parsed["max_rss_kb"], 12345)
199199

200+
def test_run_tsv_conversion_uses_wrapper_elapsed_for_wall_seconds(self):
201+
"""TSV metrics use wrapper-observed wall time even if the inner time log disagrees."""
202+
with tempfile.TemporaryDirectory() as td:
203+
tmp_path = Path(td)
204+
input_mount_dir = tmp_path / "input"
205+
tsv_dir = tmp_path / "tsv"
206+
metrics_dir = tmp_path / "metrics"
207+
input_mount_dir.mkdir(parents=True, exist_ok=True)
208+
tsv_dir.mkdir(parents=True, exist_ok=True)
209+
metrics_dir.mkdir(parents=True, exist_ok=True)
210+
(input_mount_dir / "sample.vcf").write_text("##fileformat=VCFv4.2\n")
211+
212+
def fake_run(cmd, cwd=None, env=None):
213+
metrics_mount = next(
214+
(
215+
part.split(":", 1)[0]
216+
for part in cmd
217+
if isinstance(part, str) and part.endswith(":/data/metrics")
218+
),
219+
None,
220+
)
221+
tsv_mount = next(
222+
(
223+
part.split(":", 1)[0]
224+
for part in cmd
225+
if isinstance(part, str) and part.endswith(":/data/tsv")
226+
),
227+
None,
228+
)
229+
script = str(cmd[-1]) if cmd else ""
230+
time_match = re.search(r"-o\s+(/data/metrics/raw_metrics/tsv_time/[^\s;]+)", script)
231+
if metrics_mount and time_match:
232+
time_log = Path(metrics_mount) / time_match.group(1).replace("/data/metrics/", "", 1)
233+
time_log.parent.mkdir(parents=True, exist_ok=True)
234+
time_log.write_text(
235+
"User time (seconds): 11.00\n"
236+
"System time (seconds): 1.50\n"
237+
"Elapsed (wall clock) time (h:mm:ss or m:ss): 0:00.19\n"
238+
"Maximum resident set size (kbytes): 2048\n"
239+
)
240+
if tsv_mount:
241+
tsv_root = Path(tsv_mount)
242+
(tsv_root / "sample.records.tsv").write_text("SOURCE_FILE\tROW_ID\nsample.vcf\t1\n")
243+
(tsv_root / "sample.header_lines.tsv").write_text("SOURCE_FILE\tLINE\nsample.vcf\t##x\n")
244+
(tsv_root / "sample.file_metadata.tsv").write_text("SOURCE_FILE\tKEY\tVALUE\nsample.vcf\tk\tv\n")
245+
return 0
246+
247+
with mock.patch.object(vcf_rdfizer, "run", side_effect=fake_run), mock.patch(
248+
"vcf_rdfizer.time.perf_counter", side_effect=[100.0, 460.0]
249+
):
250+
metrics = vcf_rdfizer.run_tsv_conversion_with_metrics(
251+
input_mount_dir=input_mount_dir,
252+
container_input="/data/in/sample.vcf",
253+
tsv_dir=tsv_dir,
254+
metrics_dir=metrics_dir,
255+
image_ref="example/vcf-rdfizer:latest",
256+
run_id="run-tsv-elapsed",
257+
timestamp="2026-03-16T10:00:00",
258+
prefix="sample",
259+
)
260+
261+
self.assertEqual(metrics["wall_seconds"], 360.0)
262+
self.assertEqual(metrics["user_seconds"], 11.0)
263+
self.assertEqual(metrics["sys_seconds"], 1.5)
264+
self.assertEqual(metrics["max_rss_kb"], 2048)
265+
266+
raw_json = metrics_dir / "raw_metrics" / "tsv_metrics" / "sample" / "run-tsv-elapsed.json"
267+
payload = json.loads(raw_json.read_text())
268+
self.assertEqual(payload["timing"]["wall_seconds"], 360.0)
269+
200270
def test_run_compression_methods_persists_raw_metrics_and_time_logs(self):
201271
"""Per-file compression timing/metrics are retained under raw_metrics."""
202272
with tempfile.TemporaryDirectory() as td:
@@ -283,6 +353,49 @@ def fake_run(cmd, cwd=None, env=None):
283353
self.assertEqual(payload["methods"]["hdt"]["exit_code"], 0)
284354
self.assertEqual(payload["methods"]["gzip"]["exit_code"], 0)
285355

356+
def test_run_compression_methods_use_wrapper_elapsed_for_wall_seconds(self):
357+
"""Compression metrics use wrapper-observed wall time even if the inner time log disagrees."""
358+
with tempfile.TemporaryDirectory() as td:
359+
tmp_path = Path(td)
360+
out_dir = tmp_path / "out" / "sample"
361+
out_dir.mkdir(parents=True, exist_ok=True)
362+
rdf_path = out_dir / "sample.nt"
363+
rdf_path.write_text("<s> <p> <o> .\n")
364+
365+
def fake_run(cmd, cwd=None, env=None):
366+
script = str(cmd[-1]) if cmd else ""
367+
time_match = re.search(r"-o\s+(/data/out/[^\s;]+)", script)
368+
if time_match:
369+
time_log = out_dir / time_match.group(1).replace("/data/out/", "", 1)
370+
time_log.parent.mkdir(parents=True, exist_ok=True)
371+
time_log.write_text(
372+
"User time (seconds): 123.00\n"
373+
"System time (seconds): 4.50\n"
374+
"Elapsed (wall clock) time (h:mm:ss or m:ss): 0:05.06\n"
375+
"Maximum resident set size (kbytes): 8192\n"
376+
)
377+
if 'HDT_BIN="${RDF2HDT_BIN' in script:
378+
(out_dir / "sample.hdt").write_text("hdt\n")
379+
return 0
380+
381+
with mock.patch.object(vcf_rdfizer, "run", side_effect=fake_run), mock.patch(
382+
"vcf_rdfizer.time.perf_counter", side_effect=[1000.0, 3700.0]
383+
):
384+
ok, method_results = vcf_rdfizer.run_compression_methods_for_rdf(
385+
rdf_path=rdf_path,
386+
out_dir=out_dir,
387+
image_ref="example/vcf-rdfizer:latest",
388+
methods=["hdt"],
389+
wrapper_log_path=tmp_path / "wrapper.log",
390+
status_indent=None,
391+
)
392+
393+
self.assertTrue(ok)
394+
self.assertEqual(method_results["hdt"]["wall_seconds"], 2700.0)
395+
self.assertEqual(method_results["hdt"]["user_seconds"], 123.0)
396+
self.assertEqual(method_results["hdt"]["sys_seconds"], 4.5)
397+
self.assertEqual(method_results["hdt"]["max_rss_kb"], 8192)
398+
286399
def test_run_compression_methods_records_implicit_hdt_in_raw_metrics(self):
287400
"""Compound HDT methods include the implicit HDT stage in raw metrics JSON."""
288401
with tempfile.TemporaryDirectory() as td:

vcf_rdfizer.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1853,8 +1853,10 @@ def run_tsv_conversion_with_metrics(
18531853
elapsed = time.perf_counter() - started
18541854

18551855
timing = parse_time_log_metrics(time_log_host)
1856-
if timing.get("wall_seconds") is None:
1857-
timing["wall_seconds"] = elapsed
1856+
# Use the wrapper-observed docker runtime as the authoritative wall clock.
1857+
# GNU `time` remains useful for CPU/RSS, but its elapsed field has proven
1858+
# unreliable for long-running conversion/compression workloads.
1859+
timing["wall_seconds"] = elapsed
18581860

18591861
output_paths, output_size_bytes = summarize_tsv_outputs(tsv_dir, prefix)
18601862
write_tsv_metrics_artifacts(
@@ -2178,9 +2180,9 @@ def run_container_command(*, method: str, artifact_name: str, command: str):
21782180
output_path = target_out_dir / artifact_name
21792181
method_results[method] = {
21802182
"exit_code": exit_code,
2181-
"wall_seconds": timing.get("wall_seconds")
2182-
if timing.get("wall_seconds") is not None
2183-
else elapsed,
2183+
# Prefer the wrapper-observed docker runtime over the inner
2184+
# `/usr/bin/time` elapsed field for long-running jobs.
2185+
"wall_seconds": elapsed,
21842186
"user_seconds": timing.get("user_seconds"),
21852187
"sys_seconds": timing.get("sys_seconds"),
21862188
"max_rss_kb": timing.get("max_rss_kb"),

0 commit comments

Comments
 (0)