Skip to content

Commit cee20ba

Browse files
committed
mapping file and tsv intermediate changes
1 parent 195706d commit cee20ba

10 files changed

Lines changed: 1035 additions & 58 deletions

File tree

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ In `full` mode with multiple VCF inputs, failures are isolated per input:
8181
- `-r, --rules` mapping rules file (`.ttl`)
8282
- default: `rules/default_rules.ttl`
8383
- `-l, --rdf-layout {aggregate,batch}` required in full mode
84+
- `-P, --spark-partitions` optional Spark partition hint (positive integer)
85+
- low-cost way to reduce output part count by setting `spark.default.parallelism` and `spark.sql.shuffle.partitions`
8486
- `-k, --keep-tsv` keep hidden TSV intermediates
8587
- `-R, --keep-rdf` keep raw `.nt` after compression
8688
- `-e, --estimate-size` preflight size estimate
@@ -121,6 +123,18 @@ vcf-rdfizer \
121123
--rdf-layout batch \
122124
--compression hdt \
123125
--out ./results
126+
127+
Full pipeline with low-cost partition cap (helps avoid too many tiny batch files):
128+
129+
```bash
130+
vcf-rdfizer \
131+
--mode full \
132+
--input ./vcf_files \
133+
--rdf-layout batch \
134+
--spark-partitions 8 \
135+
--compression hdt \
136+
--out ./results
137+
```
124138
```
125139

126140
Full pipeline with custom rules + keep RDF:

rules/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,17 @@ This directory contains RML mappings used by the conversion pipeline.
1111
- `/data/tsv/file_metadata.tsv`
1212
- `/data/tsv/header_lines.tsv`
1313
- `/data/tsv/records.tsv`
14+
- `/data/tsv/sample_calls.tsv`
15+
- `/data/tsv/sample_format_values.tsv`
16+
- `sample_calls.tsv` and `sample_format_values.tsv` are derived by the Python wrapper
17+
from `records.tsv` at runtime so FORMAT fields (e.g., `GT:DP:AD`) can be
18+
mapped to per-sample values consistently.
1419
- The Python wrapper rewrites these template paths per input VCF to:
1520
- `/data/tsv/<sample>.file_metadata.tsv`
1621
- `/data/tsv/<sample>.header_lines.tsv`
1722
- `/data/tsv/<sample>.records.tsv`
23+
- `/data/tsv/<sample>.sample_calls.tsv`
24+
- `/data/tsv/<sample>.sample_format_values.tsv`
1825

1926
## How To Create A Custom Mapping
2027

rules/default_rules.ttl

Lines changed: 150 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
# - /data/tsv/<sample>.file_metadata.tsv
1212
# - /data/tsv/<sample>.header_lines.tsv
1313
# - /data/tsv/<sample>.records.tsv
14+
# - /data/tsv/<sample>.sample_calls.tsv (derived by wrapper)
15+
# - /data/tsv/<sample>.sample_format_values.tsv (derived by wrapper)
1416
# The wrapper rewrites the template paths below for each input sample.
1517
#
1618
# Output format:
@@ -31,19 +33,12 @@
3133
rml:referenceFormulation ql:CSV
3234
] ;
3335
rr:subjectMap [
34-
rr:template "https://w3id.org/vcf-rdfizer/resource/vcf-file/{SOURCE_FILE}" ;
36+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}" ;
3537
rr:class vcfr:VCFFile
3638
] ;
3739
rr:predicateObjectMap [
3840
rr:predicate vcfr:hasHeader ;
39-
rr:objectMap [ rr:template "https://w3id.org/vcf-rdfizer/resource/vcf-header/{SOURCE_FILE}" ]
40-
] ;
41-
rr:predicateObjectMap [
42-
rr:predicate vcfr:hasRecord ;
43-
rr:objectMap [
44-
rr:parentTriplesMap <#VCFRecordMap> ;
45-
rr:joinCondition [ rr:child "SOURCE_FILE" ; rr:parent "SOURCE_FILE" ]
46-
]
41+
rr:objectMap [ rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/header" ]
4742
] ;
4843
rr:predicateObjectMap [
4944
rr:predicate vcfr:fileFormat ;
@@ -73,14 +68,59 @@
7368
rml:referenceFormulation ql:CSV
7469
] ;
7570
rr:subjectMap [
76-
rr:template "https://w3id.org/vcf-rdfizer/resource/vcf-header/{SOURCE_FILE}" ;
71+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/header" ;
7772
rr:class vcfr:VCFHeader
7873
] ;
74+
.
75+
76+
# NOTE:
77+
# Use explicit link maps (template-based) instead of parentTriplesMap joins.
78+
# In distributed RMLStreamer execution, joins can duplicate parent map triples.
79+
<#VCFFileToRecordLinkMap>
80+
a rr:TriplesMap ;
81+
rml:logicalSource [
82+
rml:source [
83+
a csvw:Table ;
84+
csvw:url "/data/tsv/records.tsv" ;
85+
csvw:dialect [
86+
a csvw:Dialect ;
87+
csvw:delimiter "\t" ;
88+
csvw:headerRowCount 1
89+
]
90+
] ;
91+
rml:referenceFormulation ql:CSV
92+
] ;
93+
rr:subjectMap [
94+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}"
95+
] ;
96+
rr:predicateObjectMap [
97+
rr:predicate vcfr:hasRecord ;
98+
rr:objectMap [
99+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/record/var{ROW_ID}"
100+
]
101+
] .
102+
103+
<#VCFHeaderToHeaderLineLinkMap>
104+
a rr:TriplesMap ;
105+
rml:logicalSource [
106+
rml:source [
107+
a csvw:Table ;
108+
csvw:url "/data/tsv/header_lines.tsv" ;
109+
csvw:dialect [
110+
a csvw:Dialect ;
111+
csvw:delimiter "\t" ;
112+
csvw:headerRowCount 1
113+
]
114+
] ;
115+
rml:referenceFormulation ql:CSV
116+
] ;
117+
rr:subjectMap [
118+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/header"
119+
] ;
79120
rr:predicateObjectMap [
80121
rr:predicate vcfr:hasHeaderLine ;
81122
rr:objectMap [
82-
rr:parentTriplesMap <#HeaderLineMap> ;
83-
rr:joinCondition [ rr:child "SOURCE_FILE" ; rr:parent "SOURCE_FILE" ]
123+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/header/line/{HEADER_INDEX}"
84124
]
85125
] .
86126

@@ -99,7 +139,7 @@
99139
rml:referenceFormulation ql:CSV
100140
] ;
101141
rr:subjectMap [
102-
rr:template "https://w3id.org/vcf-rdfizer/resource/header-line/{SOURCE_FILE}/{HEADER_INDEX}" ;
142+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/header/line/{HEADER_INDEX}" ;
103143
rr:class vcfr:HeaderLine
104144
] ;
105145
rr:predicateObjectMap [
@@ -126,7 +166,7 @@
126166
rml:referenceFormulation ql:CSV
127167
] ;
128168
rr:subjectMap [
129-
rr:template "https://w3id.org/vcf-rdfizer/resource/record/{SOURCE_FILE}/{ROW_ID}" ;
169+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/record/var{ROW_ID}" ;
130170
rr:class vcfr:VCFRecord
131171
] ;
132172
rr:predicateObjectMap [
@@ -151,7 +191,7 @@
151191
] ;
152192
rr:predicateObjectMap [
153193
rr:predicate vcfr:hasCall ;
154-
rr:objectMap [ rr:template "https://w3id.org/vcf-rdfizer/resource/call/{SOURCE_FILE}/{ROW_ID}" ]
194+
rr:objectMap [ rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/call/var{ROW_ID}" ]
155195
] .
156196

157197
<#VariantCallMap>
@@ -169,7 +209,7 @@
169209
rml:referenceFormulation ql:CSV
170210
] ;
171211
rr:subjectMap [
172-
rr:template "https://w3id.org/vcf-rdfizer/resource/call/{SOURCE_FILE}/{ROW_ID}" ;
212+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/call/var{ROW_ID}" ;
173213
rr:class vcfr:VariantCall
174214
] ;
175215
rr:predicateObjectMap [
@@ -184,3 +224,97 @@
184224
rr:predicate vcfr:formatRaw ;
185225
rr:objectMap [ rml:reference "FORMAT" ]
186226
] .
227+
228+
<#VariantCallToSampleLinkMap>
229+
a rr:TriplesMap ;
230+
rml:logicalSource [
231+
rml:source [
232+
a csvw:Table ;
233+
csvw:url "/data/tsv/sample_calls.tsv" ;
234+
csvw:dialect [
235+
a csvw:Dialect ;
236+
csvw:delimiter "\t" ;
237+
csvw:headerRowCount 1
238+
]
239+
] ;
240+
rml:referenceFormulation ql:CSV
241+
] ;
242+
rr:subjectMap [
243+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/call/var{ROW_ID}"
244+
] ;
245+
rr:predicateObjectMap [
246+
rr:predicate vcfr:hasSampleCall ;
247+
rr:objectMap [
248+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/sample/var{ROW_ID}/{SAMPLE_URI_ID}"
249+
]
250+
] .
251+
252+
<#SampleCallMap>
253+
a rr:TriplesMap ;
254+
rml:logicalSource [
255+
rml:source [
256+
a csvw:Table ;
257+
csvw:url "/data/tsv/sample_calls.tsv" ;
258+
csvw:dialect [
259+
a csvw:Dialect ;
260+
csvw:delimiter "\t" ;
261+
csvw:headerRowCount 1
262+
]
263+
] ;
264+
rml:referenceFormulation ql:CSV
265+
] ;
266+
rr:subjectMap [
267+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/sample/var{ROW_ID}/{SAMPLE_URI_ID}" ;
268+
rr:class vcfr:SampleCall
269+
] ;
270+
rr:predicateObjectMap [
271+
rr:predicate vcfr:sampleId ;
272+
rr:objectMap [ rml:reference "SAMPLE_ID" ]
273+
] .
274+
275+
<#SampleCallToFormatValueLinkMap>
276+
a rr:TriplesMap ;
277+
rml:logicalSource [
278+
rml:source [
279+
a csvw:Table ;
280+
csvw:url "/data/tsv/sample_format_values.tsv" ;
281+
csvw:dialect [
282+
a csvw:Dialect ;
283+
csvw:delimiter "\t" ;
284+
csvw:headerRowCount 1
285+
]
286+
] ;
287+
rml:referenceFormulation ql:CSV
288+
] ;
289+
rr:subjectMap [
290+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/sample/var{ROW_ID}/{SAMPLE_URI_ID}"
291+
] ;
292+
rr:predicateObjectMap [
293+
rr:predicate vcfr:hasFormatValue ;
294+
rr:objectMap [
295+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/sample/var{ROW_ID}/{SAMPLE_URI_ID}/fmt/{FORMAT_KEY}"
296+
]
297+
] .
298+
299+
<#FormatFieldValueMap>
300+
a rr:TriplesMap ;
301+
rml:logicalSource [
302+
rml:source [
303+
a csvw:Table ;
304+
csvw:url "/data/tsv/sample_format_values.tsv" ;
305+
csvw:dialect [
306+
a csvw:Dialect ;
307+
csvw:delimiter "\t" ;
308+
csvw:headerRowCount 1
309+
]
310+
] ;
311+
rml:referenceFormulation ql:CSV
312+
] ;
313+
rr:subjectMap [
314+
rr:template "https://w3id.org/vcf-rdfizer/vcf/{SOURCE_FILE}/sample/var{ROW_ID}/{SAMPLE_URI_ID}/fmt/{FORMAT_KEY}" ;
315+
rr:class vcfr:FormatFieldValue
316+
] ;
317+
rr:predicateObjectMap [
318+
rr:predicate vcfr:fieldValue ;
319+
rr:objectMap [ rml:reference "FORMAT_VALUE" ]
320+
] .

src/run_conversion.sh

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,42 @@ stat_size() {
7272
fi
7373
return
7474
fi
75+
76+
echo 0
77+
}
78+
79+
# Report comparable input VCF bytes.
80+
# - .vcf -> on-disk bytes
81+
# - .vcf.gz -> decompressed bytes
82+
# - dir -> sum of normalized sizes for contained .vcf/.vcf.gz files
83+
normalized_vcf_size() {
84+
local path="$1"
85+
local total=0
86+
87+
if [[ -f "$path" ]]; then
88+
if [[ "$path" == *.vcf.gz ]]; then
89+
gzip -dc "$path" | wc -c | tr -d ' '
90+
return
91+
fi
92+
stat_size "$path"
93+
return
94+
fi
95+
96+
if [[ -d "$path" ]]; then
97+
shopt -s nullglob
98+
for file in "$path"/*.vcf "$path"/*.vcf.gz; do
99+
if [[ ! -f "$file" ]]; then
100+
continue
101+
fi
102+
size=$(normalized_vcf_size "$file")
103+
total=$((total + size))
104+
done
105+
shopt -u nullglob
106+
echo "$total"
107+
return
108+
fi
109+
110+
echo 0
75111
}
76112

77113
have_gnu_time() { [[ -x /usr/bin/time ]] && /usr/bin/time --version >/dev/null 2>&1; }
@@ -140,7 +176,22 @@ JAVA_VERSION=$(java -version 2>&1 | head -n1 | sed 's/"/\\"/g')
140176
# or for Java 8: GC_OPTS="-Xloggc:$LOGDIR/gc-$RUN_ID.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
141177
GC_OPTS=${GC_OPTS:-}
142178

143-
JAVA_CMD=(java -jar "$JAR" toFile -m "$IN" -o "$OUT_DIR/$OUT_NAME")
179+
# Optional low-cost Spark partition hint for RMLStreamer execution.
180+
# When set, this caps Spark default parallelism and shuffle partitions to
181+
# reduce tiny output-part overproduction without introducing expensive
182+
# repartition/shuffle stages in the pipeline.
183+
SPARK_PARTITIONS=${SPARK_PARTITIONS:-}
184+
JAVA_SPARK_OPTS=()
185+
if [[ -n "$SPARK_PARTITIONS" ]]; then
186+
if [[ "$SPARK_PARTITIONS" =~ ^[1-9][0-9]*$ ]]; then
187+
JAVA_SPARK_OPTS+=("-Dspark.default.parallelism=$SPARK_PARTITIONS")
188+
JAVA_SPARK_OPTS+=("-Dspark.sql.shuffle.partitions=$SPARK_PARTITIONS")
189+
else
190+
echo "WARNING: ignoring invalid SPARK_PARTITIONS='$SPARK_PARTITIONS' (expected positive integer)." >&2
191+
fi
192+
fi
193+
194+
JAVA_CMD=(java "${JAVA_SPARK_OPTS[@]}" -jar "$JAR" toFile -m "$IN" -o "$OUT_DIR/$OUT_NAME")
144195

145196
# Ensure repeated runs with the same OUT_NAME do not accumulate old artifacts.
146197
if [[ -d "$OUT_DIR/$OUT_NAME" ]]; then
@@ -149,7 +200,7 @@ fi
149200

150201
# ---------- Pre-run ----------
151202
IN_SIZE=$(stat_size "$IN")
152-
VCF_SIZE=$(stat_size "$IN_VCF")
203+
VCF_SIZE=$(normalized_vcf_size "$IN_VCF")
153204

154205
# ---------- Run RMLStreamer with timing ----------
155206
EXIT_CODE=0

0 commit comments

Comments
 (0)