33import au .com .bytecode .opencsv .CSVWriter ;
44import htsjdk .samtools .util .IOUtil ;
55import htsjdk .samtools .util .Interval ;
6+ import htsjdk .variant .vcf .VCFFileReader ;
7+ import htsjdk .variant .vcf .VCFHeader ;
68import org .apache .commons .lang3 .StringUtils ;
79import org .apache .logging .log4j .Logger ;
810import org .json .JSONObject ;
11+ import org .labkey .api .pipeline .PipelineJob ;
912import org .labkey .api .pipeline .PipelineJobException ;
13+ import org .labkey .api .reader .Readers ;
14+ import org .labkey .api .sequenceanalysis .SequenceOutputFile ;
15+ import org .labkey .api .sequenceanalysis .model .Readset ;
1016import org .labkey .api .sequenceanalysis .pipeline .AbstractVariantProcessingStepProvider ;
1117import org .labkey .api .sequenceanalysis .pipeline .CommandLineParam ;
1218import org .labkey .api .sequenceanalysis .pipeline .PipelineContext ;
1319import org .labkey .api .sequenceanalysis .pipeline .PipelineStepProvider ;
1420import org .labkey .api .sequenceanalysis .pipeline .ReferenceGenome ;
21+ import org .labkey .api .sequenceanalysis .pipeline .SequenceAnalysisJobSupport ;
1522import org .labkey .api .sequenceanalysis .pipeline .SequencePipelineService ;
1623import org .labkey .api .sequenceanalysis .pipeline .ToolParameterDescriptor ;
1724import org .labkey .api .sequenceanalysis .pipeline .VariantProcessingStep ;
1825import org .labkey .api .sequenceanalysis .pipeline .VariantProcessingStepOutputImpl ;
1926import org .labkey .api .sequenceanalysis .run .AbstractCommandPipelineStep ;
2027import org .labkey .api .sequenceanalysis .run .AbstractCommandWrapper ;
28+ import org .labkey .api .util .FileUtil ;
29+ import org .labkey .api .writer .PrintWriters ;
2130
2231import javax .annotation .Nullable ;
32+ import java .io .BufferedReader ;
2333import java .io .File ;
2434import java .io .IOException ;
35+ import java .io .PrintWriter ;
2536import java .util .ArrayList ;
2637import java .util .Arrays ;
38+ import java .util .HashMap ;
2739import java .util .List ;
40+ import java .util .Map ;
2841
2942public class PlinkPcaStep extends AbstractCommandPipelineStep <PlinkPcaStep .PlinkWrapper > implements VariantProcessingStep
3043{
31- public PlinkPcaStep (PipelineStepProvider provider , PipelineContext ctx )
44+ public PlinkPcaStep (PipelineStepProvider <?> provider , PipelineContext ctx )
3245 {
3346 super (provider , ctx , new PlinkPcaStep .PlinkWrapper (ctx .getLogger ()));
3447 }
@@ -44,11 +57,13 @@ public Provider()
4457 ToolParameterDescriptor .createCommandLineParam (CommandLineParam .create ("--const-fid" ), "constFid" , "Constant FID" , "Converts sample IDs to within-family IDs while setting all family IDs to a single value (default '0')." , "checkbox" , new JSONObject (){{
4558 put ("checked" , true );
4659 }}, true ),
60+ ToolParameterDescriptor .create ("splitByApplication" , "Split by Application" , "If checked, one iteration of PCA will be performed for each application (defined by the readset)." , "checkbox" , null , true ),
4761 ToolParameterDescriptor .create (SelectSamplesStep .SAMPLE_INCLUDE , "Sample(s) Include" , "Only the following samples will be included in the analysis." , "sequenceanalysis-trimmingtextarea" , null , null ),
4862 ToolParameterDescriptor .create (SelectSamplesStep .SAMPLE_EXCLUDE , "Samples(s) To Exclude" , "The following samples will be excluded from the analysis." , "sequenceanalysis-trimmingtextarea" , null , null )
4963 ), Arrays .asList ("sequenceanalysis/field/TrimmingTextArea.js" ), "https://zzz.bwh.harvard.edu/plink/" );
5064 }
5165
66+ @ Override
5267 public PlinkPcaStep create (PipelineContext ctx )
5368 {
5469 return new PlinkPcaStep (this , ctx );
@@ -84,41 +99,129 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno
8499 {
85100 VariantProcessingStepOutputImpl output = new VariantProcessingStepOutputImpl ();
86101
102+ boolean splitByApplication = getProvider ().getParameterByName ("splitByApplication" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , true );
103+ if (splitByApplication )
104+ {
105+ Map <String , List <String >> applicationToSample = new HashMap <>();
106+ try (BufferedReader reader = Readers .getReader (getSampleMapFile ()))
107+ {
108+ String line ;
109+ while ((line = reader .readLine ()) != null )
110+ {
111+ String [] tokens = line .split ("\t " );
112+ if (!applicationToSample .containsKey (tokens [1 ]))
113+ {
114+ applicationToSample .put (tokens [1 ], new ArrayList <>());
115+ }
116+
117+ applicationToSample .get (tokens [1 ]).add (tokens [0 ]);
118+ }
119+ }
120+ catch (IOException e )
121+ {
122+ throw new PipelineJobException (e );
123+ }
124+
125+ for (String application : applicationToSample .keySet ())
126+ {
127+ getPipelineCtx ().getLogger ().info ("Running PCA for: " + application );
128+ runBatch (inputVCF , outputDirectory , output , genome , applicationToSample .get (application ), application );
129+ }
130+ }
131+ else
132+ {
133+ runBatch (inputVCF , outputDirectory , output , genome , null , null );
134+ }
135+
136+ output .addInput (inputVCF , "Input VCF" );
137+ output .addInput (genome .getWorkingFastaFile (), "Reference Genome" );
138+
139+ return output ;
140+ }
141+
142+ private void runBatch (File inputVCF , File outputDirectory , VariantProcessingStepOutputImpl output , ReferenceGenome genome , @ Nullable List <String > sampleList , @ Nullable String setName ) throws PipelineJobException
143+ {
87144 List <String > args = new ArrayList <>();
88145 args .add (getWrapper ().getExe ().getPath ());
89146 args .add ("--pca" );
90147 args .add ("--allow-extra-chr" );
91148
92149 String samplesToInclude = getProvider ().getParameterByName (SelectSamplesStep .SAMPLE_INCLUDE ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), String .class );
93- addSubjectSelectOptions (samplesToInclude , args , "--keep" , new File (outputDirectory , "samplesToKeep.txt" ), output );
150+ addSubjectSelectOptions (sampleList != null ? StringUtils . join ( sampleList , ";" ) : samplesToInclude , args , "--keep" , new File (outputDirectory , "samplesToKeep.txt" ), output );
94151
95152 String samplesToExclude = getProvider ().getParameterByName (SelectSamplesStep .SAMPLE_EXCLUDE ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), String .class );
96153 addSubjectSelectOptions (samplesToExclude , args , "--exclude" , new File (outputDirectory , "samplesToExclude.txt" ), output );
97154
98155 args .add ("--vcf" );
99156 args .add (inputVCF .getPath ());
100157
101- File outPrefix = new File (outputDirectory , "plink" );
158+ File outPrefix ;
159+ if (setName != null )
160+ {
161+ outPrefix = new File (outputDirectory , "plink." + FileUtil .makeLegalName (setName ));
162+ }
163+ else
164+ {
165+ outPrefix = new File (outputDirectory , "plink" );
166+ }
167+
102168 args .add ("--out" );
103169 args .add (outPrefix .getPath ());
104170
105171 args .addAll (getClientCommandArgs ());
106172
107173 getWrapper ().execute (args );
108174
109- output .addInput (inputVCF , "Input VCF" );
110- output .addInput (genome .getWorkingFastaFile (), "Reference Genome" );
111-
112175 File outputFile = new File (outPrefix .getPath () + ".eigenvec" );
113176 if (!outputFile .exists ())
114177 {
115178 throw new PipelineJobException ("Unable to find expected file: " + outputFile );
116179 }
117180
118181 output .addOutput (outputFile , "PLink PCA" );
119- output .addSequenceOutput (outputFile , "PLink PCA for: " + inputVCF .getName (), "PLink PCA" , null , null , genome .getGenomeId (), null );
182+ output .addSequenceOutput (outputFile , "PLink PCA for: " + inputVCF .getName () + (setName == null ? "" : ", for: " + setName ), "PLink PCA" , null , null , genome .getGenomeId (), null );
183+ }
120184
121- return output ;
185+ @ Override
186+ public void init (PipelineJob job , SequenceAnalysisJobSupport support , List <SequenceOutputFile > inputFiles ) throws PipelineJobException
187+ {
188+ try (PrintWriter writer = PrintWriters .getPrintWriter (getSampleMapFile ()))
189+ {
190+ getPipelineCtx ().getLogger ().info ("Writing Sample Map" );
191+ for (SequenceOutputFile so : inputFiles )
192+ {
193+ if (so .getReadset () == null )
194+ {
195+ throw new PipelineJobException ("This step requires all inputs to have a readset" );
196+ }
197+
198+ Readset rs = support .getCachedReadset (so .getReadset ());
199+
200+ try (VCFFileReader reader = new VCFFileReader (so .getFile ()))
201+ {
202+ VCFHeader header = reader .getFileHeader ();
203+ if (header .getSampleNamesInOrder ().isEmpty ())
204+ {
205+ throw new PipelineJobException ("Expected VCF to have samples: " + so .getFile ().getPath ());
206+ }
207+ else if (header .getSampleNamesInOrder ().size () != 1 )
208+ {
209+ throw new PipelineJobException ("Expected VCF to a single sample: " + so .getFile ().getPath ());
210+ }
211+
212+ writer .println (header .getSampleNamesInOrder ().get (0 ) + "\t " + rs .getApplication ());
213+ }
214+ }
215+ }
216+ catch (IOException e )
217+ {
218+ throw new PipelineJobException (e );
219+ }
220+ }
221+
222+ private File getSampleMapFile ()
223+ {
224+ return new File (getPipelineCtx ().getJob ().isSplitJob () ? getPipelineCtx ().getSourceDirectory ().getParentFile () : getPipelineCtx ().getSourceDirectory (), "sampleMap.txt" );
122225 }
123226
124227 public static class PlinkWrapper extends AbstractCommandWrapper
0 commit comments