diff --git a/README.md b/README.md index 1b84ca6..6b432cb 100755 --- a/README.md +++ b/README.md @@ -27,3 +27,26 @@ In order to remove this time restriction, jobs can be submitted with the line `q **[gadi-nfcore-report.sh](Scripts/gadi_nfcore_report.sh)** This script gathers the job requests and usage metrics from Gadi log files, same as [gadi-queuetime-report.pl](Scripts/gadi-queuetime-report.pl). However, this script loops through the Nextflow work directory to collect `.commmand.log` files and prints all output to a .tsv file: `gadi-nf-core-joblogs.tsv` + +**[match_nf_logs.py](Scripts/match_nf_logs.py)** + +This script merges Nextflow task metadata with the resource usage report by matching each task on the shared `work/XX/HASH` directory key. + +Required input files: + +* Nextflow log TSV (`-n/--nf-log`), typically generated with: + `nextflow log -f 'name,status,native_id,work_dir' > nextflow_log.tsv` +* Gadi usage report TSV (`-l/--log-report`), generated by `gadi_nfcore_report.sh`. + +Example usage: + +``` +python3 Scripts/match_nf_logs.py \ + -n nextflow_log.tsv \ + -l gadi-nf-core-joblogs.tsv +``` + +Output files: + +* `matched_logs.tsv`: merged table for downstream analysis. +* `matched_logs.unmatched.tsv`: rows that could not be matched (`NF_LOG_ONLY`, `LOG_REPORT_ONLY`, `NF_NO_WORKDIR`) so no data is silently dropped. diff --git a/Scripts/gadi_nfcore_report.sh b/Scripts/gadi_nfcore_report.sh index d9ad5f0..0e5b1fa 100644 --- a/Scripts/gadi_nfcore_report.sh +++ b/Scripts/gadi_nfcore_report.sh @@ -18,7 +18,7 @@ # Tab-delimited summary of the resources requested and used for each job # will be printed to tsv file: gadi-nf-core-joblogs.tsv. # -# Date last modified: 08/08/23 +# Date last modified: 13/02/2026 # # If you use this script towards a publication, please acknowledge the # Sydney Informatics Hub (or co-authorship, where appropriate). @@ -63,13 +63,13 @@ find work -type f -name ".command.log" | while read -r log_file; do if($0 ~ /Service Units/) service_units = $3 if($0 ~ /NCPUs Requested/) ncpus_requested = $3 if($0 ~ /NCPUs Used/) ncpus_used = $3 - if($0 ~ /CPU Time Used/) cpu_time_used = $5 + if($0 ~ /CPU Time Used/) cpu_time_used = $7 if($0 ~ /Memory Requested/) memory_requested = $3 if($0 ~ /Memory Used/) memory_used = $6 - if($0 ~ /Walltime requested/) walltime_requested = $3 + if($0 ~ /Walltime Requested/) walltime_requested = $3 if($0 ~ /Walltime Used/) walltime_used = $6 - if($0 ~ /JobFS requested/) jobfs_requested = $3 - if($0 ~ /JobFS used/) jobfs_used = $6 + if($0 ~ /JobFS Requested/) jobfs_requested = $3 + if($0 ~ /JobFS Used/) jobfs_used = $6 } END { print "'$file_name'", exit_status, service_units, ncpus_requested, ncpus_used, cpu_time_used, memory_requested, memory_used, walltime_requested, walltime_used, jobfs_requested, jobfs_used diff --git a/Scripts/match_nf_logs.py b/Scripts/match_nf_logs.py new file mode 100755 index 0000000..4b57e6c --- /dev/null +++ b/Scripts/match_nf_logs.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python3 +""" +match_nf_logs.py +================ +Joins two Nextflow-related log files on their shared work directory hash, +producing a single merged TSV ready for Excel. + +Input files +----------- +-n / --nf-log Nextflow task log TSV, produced by: + nextflow log -f 'name,status,native_id,work_dir' + Expected columns (tab-separated, no header): + name status native_id work_dir + +-l / --log-report PBS resource-usage report TSV produced by the + gadi_nfcore_report / collect_nf_logs pipeline. + Expected first column: Log_path (e.g. work/XX/HASH/.command.log) + Remaining columns: Exit_status Service_units NCPUs_requested ... + +Output +------ +-o / --output Merged TSV (default: matched_logs.tsv). + Unmatched rows from either file are written to + .unmatched.tsv so nothing is silently lost. + +Usage +----- + python3 match_nf_logs.py -n nextflow_log.tsv -l log_report.tsv + python3 match_nf_logs.py -n nextflow_log.tsv -l log_report.tsv -o my_run.tsv +""" + +import argparse +import re +import sys +from pathlib import Path +from typing import Optional + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def work_key(path: str) -> Optional[str]: + """ + Extract a normalised 'XX/HASH' key from any string that contains a + Nextflow work directory path. Works whether the path is: + - a full absolute path (/scratch/.../work/9e/c258d1ff...) + - a relative path (work/9e/c258d1ff.../.command.log) + - a truncated hash (work/9e/c258d1...) + Returns None if no work-dir pattern is found. + """ + m = re.search(r'work/([0-9a-f]{2}/[0-9a-f]{6,})', path) + if not m: + return None + prefix, hashpart = m.group(1).split('/', 1) + # Normalise: lowercase, strip any trailing /.command.log or similar + return f"{prefix.lower()}/{hashpart.lower()}" + + +def read_tsv(path: str) -> tuple[list[str], list[list[str]]]: + """Return (header_row, data_rows) for a TSV file. Header is the first + line if it doesn't look like a work-dir path, otherwise header=[]. + """ + lines = Path(path).read_text().splitlines() + lines = [l for l in lines if l.strip()] + if not lines: + return [], [] + + # Detect header: first column of first line is not a path + first_col = lines[0].split('\t')[0] + if first_col.startswith('work/') or first_col.startswith('/'): + header = [] + data = [l.split('\t') for l in lines] + else: + header = lines[0].split('\t') + data = [l.split('\t') for l in lines[1:]] + return header, data + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + ap = argparse.ArgumentParser( + description="Match a Nextflow task log with a PBS resource report on work-dir hash.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + ap.add_argument('-n', '--nf-log', required=True, + help='Nextflow log TSV (name, status, native_id, work_dir)') + ap.add_argument('-l', '--log-report', required=True, + help='PBS resource report TSV (Log_path as first column)') + ap.add_argument('-o', '--output', default='matched_logs.tsv', + help='Output TSV (default: matched_logs.tsv)') + args = ap.parse_args() + + # ----------------------------------------------------------------------- + # 1. Load the Nextflow log + # ----------------------------------------------------------------------- + nf_header, nf_rows = read_tsv(args.nf_log) + + # Build lookup: work_key -> row (list of strings) + # We expect columns: name, status, native_id, work_dir + # but accept 3-column files where work_dir is missing (native_id may be + # the path for CACHED tasks with no PBS ID). + NF_COLS = ['Task_name', 'NF_status', 'Native_id', 'Work_dir'] + + nf_lookup: dict[str, list[str]] = {} + nf_unmatched: list[list[str]] = [] + + for row in nf_rows: + # Pad to at least 4 columns + row = row + [''] * (4 - len(row)) + task_name, nf_status, native_id, work_dir = row[:4] + + # Fallback: if work_dir is empty but native_id looks like a path + if not work_dir and native_id.startswith('/'): + work_dir = native_id + native_id = 'UNKNOWN' + + key = work_key(work_dir) if work_dir else None + + if key: + nf_lookup[key] = [task_name, nf_status, native_id, work_dir] + else: + nf_unmatched.append(row) + + # ----------------------------------------------------------------------- + # 2. Load the PBS log report + # ----------------------------------------------------------------------- + log_header, log_rows = read_tsv(args.log_report) + + # Default header if absent + if not log_header: + log_header = [ + 'Log_path', 'Exit_status', 'Service_units', + 'NCPUs_requested', 'NCPUs_used', 'CPU_time_used', + 'Memory_requested', 'Memory_used', + 'Walltime_requested', 'Walltime_used', + 'JobFS_requested', 'JobFS_used', + ] + + # ----------------------------------------------------------------------- + # 3. Merge + # ----------------------------------------------------------------------- + out_header = NF_COLS + log_header + matched_rows: list[list[str]] = [] + log_unmatched: list[list[str]] = [] + + matched_keys: set[str] = set() + + for row in log_rows: + row = row + [''] * (len(log_header) - len(row)) # pad + log_path = row[0] + key = work_key(log_path) + + if key and key in nf_lookup: + merged = nf_lookup[key] + row + matched_rows.append(merged) + matched_keys.add(key) + else: + log_unmatched.append(row) + + # Any NF tasks whose key never appeared in the log report + nf_only_unmatched = [ + v for k, v in nf_lookup.items() if k not in matched_keys + ] + + # ----------------------------------------------------------------------- + # 4. Write matched output + # ----------------------------------------------------------------------- + out_path = Path(args.output) + with out_path.open('w') as f: + f.write('\t'.join(out_header) + '\n') + for row in matched_rows: + f.write('\t'.join(row) + '\n') + + # ----------------------------------------------------------------------- + # 5. Write unmatched rows (if any) + # ----------------------------------------------------------------------- + unmatched_path = out_path.with_suffix('').with_name( + out_path.stem + '.unmatched.tsv' + ) + any_unmatched = nf_unmatched or log_unmatched or nf_only_unmatched + if any_unmatched: + with unmatched_path.open('w') as f: + f.write('Source\t' + '\t'.join(out_header) + '\n') + for row in nf_only_unmatched: + padded = row + [''] * (len(log_header)) + f.write('NF_LOG_ONLY\t' + '\t'.join(padded) + '\n') + for row in log_unmatched: + padded = [''] * len(NF_COLS) + row + f.write('LOG_REPORT_ONLY\t' + '\t'.join(padded) + '\n') + for row in nf_unmatched: + padded = row + [''] * len(log_header) + f.write('NF_NO_WORKDIR\t' + '\t'.join(padded) + '\n') + + # ----------------------------------------------------------------------- + # 6. Summary + # ----------------------------------------------------------------------- + print("=" * 52) + print(" match_nf_logs.py — complete") + print("=" * 52) + print(f" Matched rows written : {len(matched_rows)}") + print(f" Output file : {out_path}") + if any_unmatched: + total_unmatched = len(nf_only_unmatched) + len(log_unmatched) + len(nf_unmatched) + print(f" Unmatched rows : {total_unmatched}") + print(f" Unmatched written to : {unmatched_path}") + else: + print(" Unmatched rows : 0") + print("=" * 52) + + +if __name__ == '__main__': + main() \ No newline at end of file