Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,26 @@ In order to remove this time restriction, jobs can be submitted with the line `q
**[gadi-nfcore-report.sh](Scripts/gadi_nfcore_report.sh)**

This script gathers the job requests and usage metrics from Gadi log files, same as [gadi-queuetime-report.pl](Scripts/gadi-queuetime-report.pl). However, this script loops through the Nextflow work directory to collect `.commmand.log` files and prints all output to a .tsv file: `gadi-nf-core-joblogs.tsv`

**[match_nf_logs.py](Scripts/match_nf_logs.py)**

This script merges Nextflow task metadata with the resource usage report by matching each task on the shared `work/XX/HASH` directory key.

Required input files:

* Nextflow log TSV (`-n/--nf-log`), typically generated with:
`nextflow log <run_name> -f 'name,status,native_id,work_dir' > nextflow_log.tsv`
* Gadi usage report TSV (`-l/--log-report`), generated by `gadi_nfcore_report.sh`.

Example usage:

```
python3 Scripts/match_nf_logs.py \
-n nextflow_log.tsv \
-l gadi-nf-core-joblogs.tsv
```

Output files:

* `matched_logs.tsv`: merged table for downstream analysis.
* `matched_logs.unmatched.tsv`: rows that could not be matched (`NF_LOG_ONLY`, `LOG_REPORT_ONLY`, `NF_NO_WORKDIR`) so no data is silently dropped.
10 changes: 5 additions & 5 deletions Scripts/gadi_nfcore_report.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# Tab-delimited summary of the resources requested and used for each job
# will be printed to tsv file: gadi-nf-core-joblogs.tsv.
#
# Date last modified: 08/08/23
# Date last modified: 13/02/2026
#
# If you use this script towards a publication, please acknowledge the
# Sydney Informatics Hub (or co-authorship, where appropriate).
Expand Down Expand Up @@ -63,13 +63,13 @@ find work -type f -name ".command.log" | while read -r log_file; do
if($0 ~ /Service Units/) service_units = $3
if($0 ~ /NCPUs Requested/) ncpus_requested = $3
if($0 ~ /NCPUs Used/) ncpus_used = $3
if($0 ~ /CPU Time Used/) cpu_time_used = $5
if($0 ~ /CPU Time Used/) cpu_time_used = $7
if($0 ~ /Memory Requested/) memory_requested = $3
if($0 ~ /Memory Used/) memory_used = $6
if($0 ~ /Walltime requested/) walltime_requested = $3
if($0 ~ /Walltime Requested/) walltime_requested = $3
if($0 ~ /Walltime Used/) walltime_used = $6
if($0 ~ /JobFS requested/) jobfs_requested = $3
if($0 ~ /JobFS used/) jobfs_used = $6
if($0 ~ /JobFS Requested/) jobfs_requested = $3
if($0 ~ /JobFS Used/) jobfs_used = $6
}
END {
print "'$file_name'", exit_status, service_units, ncpus_requested, ncpus_used, cpu_time_used, memory_requested, memory_used, walltime_requested, walltime_used, jobfs_requested, jobfs_used
Expand Down
217 changes: 217 additions & 0 deletions Scripts/match_nf_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#!/usr/bin/env python3
"""
match_nf_logs.py
================
Joins two Nextflow-related log files on their shared work directory hash,
producing a single merged TSV ready for Excel.

Input files
-----------
-n / --nf-log Nextflow task log TSV, produced by:
nextflow log <run_name> -f 'name,status,native_id,work_dir'
Expected columns (tab-separated, no header):
name status native_id work_dir

-l / --log-report PBS resource-usage report TSV produced by the
gadi_nfcore_report / collect_nf_logs pipeline.
Expected first column: Log_path (e.g. work/XX/HASH/.command.log)
Remaining columns: Exit_status Service_units NCPUs_requested ...

Output
------
-o / --output Merged TSV (default: matched_logs.tsv).
Unmatched rows from either file are written to
<output>.unmatched.tsv so nothing is silently lost.

Usage
-----
python3 match_nf_logs.py -n nextflow_log.tsv -l log_report.tsv
python3 match_nf_logs.py -n nextflow_log.tsv -l log_report.tsv -o my_run.tsv
"""

import argparse
import re
import sys
from pathlib import Path
from typing import Optional


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def work_key(path: str) -> Optional[str]:
"""
Extract a normalised 'XX/HASH' key from any string that contains a
Nextflow work directory path. Works whether the path is:
- a full absolute path (/scratch/.../work/9e/c258d1ff...)
- a relative path (work/9e/c258d1ff.../.command.log)
- a truncated hash (work/9e/c258d1...)
Returns None if no work-dir pattern is found.
"""
m = re.search(r'work/([0-9a-f]{2}/[0-9a-f]{6,})', path)
if not m:
return None
prefix, hashpart = m.group(1).split('/', 1)
# Normalise: lowercase, strip any trailing /.command.log or similar
return f"{prefix.lower()}/{hashpart.lower()}"


def read_tsv(path: str) -> tuple[list[str], list[list[str]]]:
"""Return (header_row, data_rows) for a TSV file. Header is the first
line if it doesn't look like a work-dir path, otherwise header=[].
"""
lines = Path(path).read_text().splitlines()
lines = [l for l in lines if l.strip()]
if not lines:
return [], []

# Detect header: first column of first line is not a path
first_col = lines[0].split('\t')[0]
if first_col.startswith('work/') or first_col.startswith('/'):
header = []
data = [l.split('\t') for l in lines]
else:
header = lines[0].split('\t')
data = [l.split('\t') for l in lines[1:]]
return header, data


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
ap = argparse.ArgumentParser(
description="Match a Nextflow task log with a PBS resource report on work-dir hash.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
ap.add_argument('-n', '--nf-log', required=True,
help='Nextflow log TSV (name, status, native_id, work_dir)')
ap.add_argument('-l', '--log-report', required=True,
help='PBS resource report TSV (Log_path as first column)')
ap.add_argument('-o', '--output', default='matched_logs.tsv',
help='Output TSV (default: matched_logs.tsv)')
args = ap.parse_args()

# -----------------------------------------------------------------------
# 1. Load the Nextflow log
# -----------------------------------------------------------------------
nf_header, nf_rows = read_tsv(args.nf_log)

# Build lookup: work_key -> row (list of strings)
# We expect columns: name, status, native_id, work_dir
# but accept 3-column files where work_dir is missing (native_id may be
# the path for CACHED tasks with no PBS ID).
NF_COLS = ['Task_name', 'NF_status', 'Native_id', 'Work_dir']

nf_lookup: dict[str, list[str]] = {}
nf_unmatched: list[list[str]] = []

for row in nf_rows:
# Pad to at least 4 columns
row = row + [''] * (4 - len(row))
task_name, nf_status, native_id, work_dir = row[:4]

# Fallback: if work_dir is empty but native_id looks like a path
if not work_dir and native_id.startswith('/'):
work_dir = native_id
native_id = 'UNKNOWN'

key = work_key(work_dir) if work_dir else None

if key:
nf_lookup[key] = [task_name, nf_status, native_id, work_dir]
else:
nf_unmatched.append(row)

# -----------------------------------------------------------------------
# 2. Load the PBS log report
# -----------------------------------------------------------------------
log_header, log_rows = read_tsv(args.log_report)

# Default header if absent
if not log_header:
log_header = [
'Log_path', 'Exit_status', 'Service_units',
'NCPUs_requested', 'NCPUs_used', 'CPU_time_used',
'Memory_requested', 'Memory_used',
'Walltime_requested', 'Walltime_used',
'JobFS_requested', 'JobFS_used',
]

# -----------------------------------------------------------------------
# 3. Merge
# -----------------------------------------------------------------------
out_header = NF_COLS + log_header
matched_rows: list[list[str]] = []
log_unmatched: list[list[str]] = []

matched_keys: set[str] = set()

for row in log_rows:
row = row + [''] * (len(log_header) - len(row)) # pad
log_path = row[0]
key = work_key(log_path)

if key and key in nf_lookup:
merged = nf_lookup[key] + row
matched_rows.append(merged)
matched_keys.add(key)
else:
log_unmatched.append(row)

# Any NF tasks whose key never appeared in the log report
nf_only_unmatched = [
v for k, v in nf_lookup.items() if k not in matched_keys
]

# -----------------------------------------------------------------------
# 4. Write matched output
# -----------------------------------------------------------------------
out_path = Path(args.output)
with out_path.open('w') as f:
f.write('\t'.join(out_header) + '\n')
for row in matched_rows:
f.write('\t'.join(row) + '\n')

# -----------------------------------------------------------------------
# 5. Write unmatched rows (if any)
# -----------------------------------------------------------------------
unmatched_path = out_path.with_suffix('').with_name(
out_path.stem + '.unmatched.tsv'
)
any_unmatched = nf_unmatched or log_unmatched or nf_only_unmatched
if any_unmatched:
with unmatched_path.open('w') as f:
f.write('Source\t' + '\t'.join(out_header) + '\n')
for row in nf_only_unmatched:
padded = row + [''] * (len(log_header))
f.write('NF_LOG_ONLY\t' + '\t'.join(padded) + '\n')
for row in log_unmatched:
padded = [''] * len(NF_COLS) + row
f.write('LOG_REPORT_ONLY\t' + '\t'.join(padded) + '\n')
for row in nf_unmatched:
padded = row + [''] * len(log_header)
f.write('NF_NO_WORKDIR\t' + '\t'.join(padded) + '\n')

# -----------------------------------------------------------------------
# 6. Summary
# -----------------------------------------------------------------------
print("=" * 52)
print(" match_nf_logs.py — complete")
print("=" * 52)
print(f" Matched rows written : {len(matched_rows)}")
print(f" Output file : {out_path}")
if any_unmatched:
total_unmatched = len(nf_only_unmatched) + len(log_unmatched) + len(nf_unmatched)
print(f" Unmatched rows : {total_unmatched}")
print(f" Unmatched written to : {unmatched_path}")
else:
print(" Unmatched rows : 0")
print("=" * 52)


if __name__ == '__main__':
main()