1+ package org .labkey .sequenceanalysis .analysis ;
2+
3+ import htsjdk .samtools .SAMFileHeader ;
4+ import htsjdk .samtools .SAMFileWriter ;
5+ import htsjdk .samtools .SAMFileWriterFactory ;
6+ import htsjdk .samtools .SAMReadGroupRecord ;
7+ import htsjdk .samtools .SamReader ;
8+ import htsjdk .samtools .SamReaderFactory ;
9+ import htsjdk .samtools .util .FileExtensions ;
10+ import htsjdk .variant .vcf .VCFFileReader ;
11+ import htsjdk .variant .vcf .VCFHeader ;
12+ import htsjdk .variant .vcf .VCFReader ;
13+ import org .apache .commons .io .FileUtils ;
14+ import org .apache .logging .log4j .Logger ;
15+ import org .json .JSONObject ;
16+ import org .labkey .api .module .ModuleLoader ;
17+ import org .labkey .api .pipeline .PipelineJob ;
18+ import org .labkey .api .pipeline .PipelineJobException ;
19+ import org .labkey .api .pipeline .RecordedAction ;
20+ import org .labkey .api .sequenceanalysis .SequenceAnalysisService ;
21+ import org .labkey .api .sequenceanalysis .SequenceOutputFile ;
22+ import org .labkey .api .sequenceanalysis .model .Readset ;
23+ import org .labkey .api .sequenceanalysis .pipeline .AbstractParameterizedOutputHandler ;
24+ import org .labkey .api .sequenceanalysis .pipeline .BcftoolsRunner ;
25+ import org .labkey .api .sequenceanalysis .pipeline .SequenceAnalysisJobSupport ;
26+ import org .labkey .api .sequenceanalysis .pipeline .SequenceOutputHandler ;
27+ import org .labkey .api .sequenceanalysis .pipeline .SequencePipelineService ;
28+ import org .labkey .api .sequenceanalysis .run .PicardWrapper ;
29+ import org .labkey .api .util .FileType ;
30+ import org .labkey .api .writer .PrintWriters ;
31+ import org .labkey .sequenceanalysis .SequenceAnalysisModule ;
32+ import org .labkey .sequenceanalysis .util .SequenceUtil ;
33+
34+ import java .io .File ;
35+ import java .io .IOException ;
36+ import java .io .PrintWriter ;
37+ import java .nio .file .StandardCopyOption ;
38+ import java .nio .file .StandardOpenOption ;
39+ import java .util .ArrayList ;
40+ import java .util .Arrays ;
41+ import java .util .List ;
42+
43+ public class UpdateReadsetFilesHandler extends AbstractParameterizedOutputHandler <SequenceOutputHandler .SequenceOutputProcessor >
44+ {
45+ public UpdateReadsetFilesHandler ()
46+ {
47+ super (ModuleLoader .getInstance ().getModule (SequenceAnalysisModule .class ), "Update Sample/Header Information" , "This will re-header any BAM or gVCF files using the sample name from the source readset. All inputs must be single-sample and have a readset attached to the record" , null , List .of (
48+
49+ ));
50+ }
51+
52+ @ Override
53+ public boolean doRunRemote ()
54+ {
55+ return true ;
56+ }
57+
58+ @ Override
59+ public boolean doRunLocal ()
60+ {
61+ return false ;
62+ }
63+
64+ @ Override
65+ public boolean canProcess (SequenceOutputFile f )
66+ {
67+ return f .getFile () != null && (
68+ SequenceUtil .FILETYPE .gvcf .getFileType ().isType (f .getFile ()) ||
69+ SequenceUtil .FILETYPE .bamOrCram .getFileType ().isType (f .getFile ())
70+ );
71+ }
72+
73+ @ Override
74+ public boolean doSplitJobs ()
75+ {
76+ return true ;
77+ }
78+
79+ @ Override
80+ public SequenceOutputProcessor getProcessor ()
81+ {
82+ return new Processor ();
83+ }
84+
85+ public static class Processor implements SequenceOutputProcessor
86+ {
87+ @ Override
88+ public void init (JobContext ctx , List <SequenceOutputFile > inputFiles , List <RecordedAction > actions , List <SequenceOutputFile > outputsToCreate ) throws UnsupportedOperationException , PipelineJobException
89+ {
90+ if (inputFiles .size () > 1 )
91+ {
92+ throw new PipelineJobException ("This job expects a single input file!" );
93+ }
94+
95+ SequenceOutputFile so = inputFiles .get (0 );
96+ if (so .getReadset () == null )
97+ {
98+ throw new PipelineJobException ("All inputs must have a readset, missing: " + so .getRowid ());
99+ }
100+
101+ Readset rs = SequenceAnalysisService .get ().getReadset (so .getReadset (), ctx .getJob ().getUser ());
102+ String newRsName = SequenceUtil .getLegalReadGroupName (rs .getName ());
103+
104+ if (SequenceUtil .FILETYPE .bamOrCram .getFileType ().isType (so .getFile ()))
105+ {
106+ getAndValidateHeaderForBam (so , newRsName );
107+ }
108+ else if (SequenceUtil .FILETYPE .gvcf .getFileType ().isType (so .getFile ()) | SequenceUtil .FILETYPE .vcf .getFileType ().isType (so .getFile ()))
109+ {
110+ getAndValidateHeaderForVcf (so , newRsName );
111+ }
112+
113+ ctx .getSequenceSupport ().cacheObject ("readsetId" , newRsName );
114+ }
115+
116+ private SAMFileHeader getAndValidateHeaderForBam (SequenceOutputFile so , String newRsName ) throws PipelineJobException
117+ {
118+ SamReaderFactory samReaderFactory = SamReaderFactory .makeDefault ();
119+ try (SamReader reader = samReaderFactory .open (so .getFile ()))
120+ {
121+ SAMFileHeader header = reader .getFileHeader ().clone ();
122+ int nSamples = reader .getFileHeader ().getReadGroups ().size ();
123+ if (nSamples != 1 )
124+ {
125+ throw new PipelineJobException ("File has more than one read group, found: " + nSamples );
126+ }
127+
128+ List <SAMReadGroupRecord > rgs = header .getReadGroups ();
129+ String existingSample = rgs .get (0 ).getSample ();
130+ if (existingSample .equals (newRsName ))
131+ {
132+ throw new PipelineJobException ("Sample names match, aborting" );
133+ }
134+
135+ return header ;
136+ }
137+ catch (IOException e )
138+ {
139+ throw new PipelineJobException (e );
140+ }
141+ }
142+
143+ private VCFHeader getAndValidateHeaderForVcf (SequenceOutputFile so , String newRsName ) throws PipelineJobException
144+ {
145+ try (VCFReader reader = new VCFFileReader (so .getFile ()))
146+ {
147+ VCFHeader header = reader .getHeader ();
148+ int nSamples = header .getGenotypeSamples ().size ();
149+ if (nSamples != 1 )
150+ {
151+ throw new PipelineJobException ("File has more than one sample, found: " + nSamples );
152+ }
153+
154+ String existingSample = header .getGenotypeSamples ().get (0 );
155+ if (existingSample .equals (newRsName ))
156+ {
157+ throw new PipelineJobException ("Sample names match, aborting" );
158+ }
159+
160+ return header ;
161+ }
162+ catch (IOException e )
163+ {
164+ throw new PipelineJobException (e );
165+ }
166+ }
167+
168+ @ Override
169+ public void processFilesOnWebserver (PipelineJob job , SequenceAnalysisJobSupport support , List <SequenceOutputFile > inputFiles , JSONObject params , File outputDir , List <RecordedAction > actions , List <SequenceOutputFile > outputsToCreate ) throws UnsupportedOperationException , PipelineJobException
170+ {
171+
172+ }
173+
174+ @ Override
175+ public void processFilesRemote (List <SequenceOutputFile > inputFiles , JobContext ctx ) throws UnsupportedOperationException , PipelineJobException
176+ {
177+ String newRsName = ctx .getSequenceSupport ().getCachedObject ("readsetId" , String .class );
178+ if (newRsName == null )
179+ {
180+ throw new PipelineJobException ("Missing cached readsetId" );
181+ }
182+
183+ SequenceOutputFile so = inputFiles .get (0 );
184+ if (SequenceUtil .FILETYPE .bamOrCram .getFileType ().isType (so .getFile ()))
185+ {
186+ reheaderBamOrCram (so , ctx , newRsName );
187+ }
188+ else if (SequenceUtil .FILETYPE .gvcf .getFileType ().isType (so .getFile ()) | SequenceUtil .FILETYPE .vcf .getFileType ().isType (so .getFile ()))
189+ {
190+ reheaderVcf (so , ctx , newRsName );
191+ }
192+ }
193+
194+ private void reheaderVcf (SequenceOutputFile so , JobContext ctx , String newRsName ) throws PipelineJobException
195+ {
196+ VCFHeader header = getAndValidateHeaderForVcf (so , newRsName );
197+ String existingSample = header .getGenotypeSamples ().get (0 );
198+
199+ File sampleNamesFile = new File (ctx .getWorkingDirectory (), "sampleNames.txt" );
200+ try (PrintWriter writer = PrintWriters .getPrintWriter (sampleNamesFile , StandardOpenOption .APPEND ))
201+ {
202+ writer .println (newRsName );
203+ }
204+ catch (IOException e )
205+ {
206+ throw new PipelineJobException (e );
207+ }
208+ ctx .getFileManager ().addIntermediateFile (sampleNamesFile );
209+
210+ File outputVcf = new File (ctx .getWorkingDirectory (), so .getFile ().getName ());
211+
212+ BcftoolsRunner wrapper = new BcftoolsRunner (ctx .getLogger ());
213+ wrapper .execute (Arrays .asList (BcftoolsRunner .getBcfToolsPath ().getPath (), "reheader" , "-s" , sampleNamesFile .getPath (), "-o" , outputVcf .getPath (), so .getFile ().getPath ()));
214+
215+ try
216+ {
217+ File outputIdx = SequenceAnalysisService .get ().ensureVcfIndex (outputVcf , ctx .getLogger (), false );
218+ FileUtils .moveFile (outputVcf , so .getFile (), StandardCopyOption .REPLACE_EXISTING );
219+
220+ FileType gz = new FileType (".gz" );
221+ File inputIndex = gz .isType (so .getFile ()) ? new File (so .getFile ().getPath () + ".tbi" ) : new File (so .getFile ().getPath () + FileExtensions .TRIBBLE_INDEX );
222+ FileUtils .moveFile (outputIdx , inputIndex , StandardCopyOption .REPLACE_EXISTING );
223+
224+ addTracker (so , existingSample , newRsName );
225+ }
226+ catch (IOException e )
227+ {
228+ throw new PipelineJobException (e );
229+ }
230+ }
231+
232+ private void addTracker (SequenceOutputFile so , String existingSample , String newRsName ) throws IOException
233+ {
234+ File tracker = new File (so .getFile ().getParentFile (), "reheaderHistory.txt" );
235+ boolean preExisting = tracker .exists ();
236+ try (PrintWriter writer = PrintWriters .getPrintWriter (tracker , StandardOpenOption .APPEND ))
237+ {
238+ if (!preExisting )
239+ {
240+ writer .println ("OriginalSample\t NewSample" );
241+ }
242+
243+ writer .println (existingSample + "\t " + newRsName );
244+ }
245+ }
246+
247+ private void reheaderBamOrCram (SequenceOutputFile so , JobContext ctx , String newRsName ) throws PipelineJobException
248+ {
249+ try
250+ {
251+ SAMFileHeader header = getAndValidateHeaderForBam (so , newRsName );
252+
253+ List <SAMReadGroupRecord > rgs = header .getReadGroups ();
254+ String existingSample = rgs .get (0 ).getSample ();
255+ rgs .get (0 ).setSample (newRsName );
256+
257+ File headerBam = new File (ctx .getWorkingDirectory (), "header.bam" );
258+ try (SAMFileWriter writer = new SAMFileWriterFactory ().makeBAMWriter (header , false , headerBam ))
259+ {
260+
261+ }
262+ ctx .getFileManager ().addIntermediateFile (headerBam );
263+ ctx .getFileManager ().addIntermediateFile (SequencePipelineService .get ().getExpectedIndex (headerBam ));
264+
265+ File output = new File (ctx .getWorkingDirectory (), so .getFile ().getName ());
266+ new ReplaceSamHeaderWrapper (ctx .getLogger ()).execute (so .getFile (), output , headerBam );
267+ if (!output .exists ())
268+ {
269+ throw new PipelineJobException ("Missing file: " + output .getPath ());
270+ }
271+
272+ File outputIdx = SequencePipelineService .get ().ensureBamIndex (output , ctx .getLogger (), false );
273+
274+ FileUtils .moveFile (output , so .getFile (), StandardCopyOption .REPLACE_EXISTING );
275+ FileUtils .moveFile (outputIdx , SequenceAnalysisService .get ().getExpectedBamOrCramIndex (so .getFile ()), StandardCopyOption .REPLACE_EXISTING );
276+
277+ addTracker (so , existingSample , newRsName );
278+ }
279+ catch (IOException e )
280+ {
281+ throw new PipelineJobException (e );
282+ }
283+ }
284+
285+ private static class ReplaceSamHeaderWrapper extends PicardWrapper
286+ {
287+ public ReplaceSamHeaderWrapper (Logger log )
288+ {
289+ super (log );
290+ }
291+
292+ @ Override
293+ protected String getToolName ()
294+ {
295+ return "ReplaceSamHeader" ;
296+ }
297+
298+ public void execute (File input , File output , File headerBam ) throws PipelineJobException
299+ {
300+ List <String > params = new ArrayList <>(getBaseArgs ());
301+
302+ params .add ("--INPUT" );
303+ params .add (input .getPath ());
304+
305+ params .add ("--OUTPUT" );
306+ params .add (output .getPath ());
307+
308+ params .add ("--HEADER" );
309+ params .add (headerBam .getPath ());
310+
311+ execute (params );
312+ }
313+ }
314+ }
315+ }
0 commit comments