Skip to content

Commit cc4f1e5

Browse files
authored
Merge pull request #91 from LabKey/fb_merge_discvr-20.11
Merge discvr 20.11 to develop
2 parents 3392a20 + 50015c5 commit cc4f1e5

File tree

133 files changed

+5109
-2690
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

133 files changed

+5109
-2690
lines changed
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
package org.labkey.api.sequenceanalysis.pipeline;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import org.apache.logging.log4j.Logger;
5+
import org.labkey.api.pipeline.PipelineJob;
6+
import org.labkey.api.pipeline.PipelineJobException;
7+
import org.labkey.api.pipeline.RecordedAction;
8+
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
9+
import org.labkey.api.util.Pair;
10+
11+
import java.io.BufferedInputStream;
12+
import java.io.File;
13+
import java.io.FileInputStream;
14+
import java.io.IOException;
15+
import java.io.Serializable;
16+
import java.lang.reflect.InvocationTargetException;
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.LinkedHashSet;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
/**
24+
* Created by bimber on 4/2/2017.
25+
*/
26+
abstract public class AbstractResumer implements Serializable
27+
{
28+
transient Logger _log;
29+
transient File _localWorkDir;
30+
31+
protected TaskFileManager _fileManager;
32+
protected LinkedHashSet<RecordedAction> _recordedActions = null;
33+
protected boolean _isResume = false;
34+
protected Map<File, File> _copiedInputs = new HashMap<>();
35+
36+
protected List<Pair<File, File>> _filesCopiedLocally = new ArrayList<>();
37+
38+
//for serialization
39+
protected AbstractResumer()
40+
{
41+
42+
}
43+
44+
protected AbstractResumer(File localWorkDir, Logger log, TaskFileManager fileManager)
45+
{
46+
_localWorkDir = localWorkDir;
47+
_log = log;
48+
_fileManager = fileManager;
49+
_recordedActions = new LinkedHashSet<>();
50+
_filesCopiedLocally = new ArrayList<>();
51+
}
52+
53+
public static File getSerializedJson(File outdir, String jsonName)
54+
{
55+
return new File(outdir, jsonName);
56+
}
57+
58+
public static <RESUMER extends AbstractResumer> RESUMER readFromJson(File json, Class<RESUMER> clazz) throws PipelineJobException
59+
{
60+
try (BufferedInputStream bus = new BufferedInputStream(new FileInputStream(json)))
61+
{
62+
ObjectMapper objectMapper = PipelineJob.createObjectMapper();
63+
return objectMapper.readValue(bus, clazz);
64+
}
65+
catch (IOException e)
66+
{
67+
throw new PipelineJobException(e);
68+
}
69+
}
70+
71+
protected void writeToJson() throws PipelineJobException
72+
{
73+
writeToJson(_localWorkDir);
74+
}
75+
76+
abstract protected String getJsonName();
77+
78+
protected void logInfoBeforeSave()
79+
{
80+
_log.debug("total actions: " + _recordedActions.size());
81+
_log.debug("total sequence outputs: " + getFileManager().getOutputsToCreate().size());
82+
}
83+
84+
public void writeToJson(File outDir) throws PipelineJobException
85+
{
86+
_log.debug("saving job checkpoint to file");
87+
logInfoBeforeSave();
88+
89+
if (outDir == null)
90+
{
91+
throw new PipelineJobException("output directory was null");
92+
}
93+
94+
File output = getSerializedJson(outDir, getJsonName());
95+
_log.debug("using file: " + output.getPath());
96+
try
97+
{
98+
ObjectMapper objectMapper = PipelineJob.createObjectMapper();
99+
objectMapper.writeValue(output, this);
100+
}
101+
catch (Throwable e)
102+
{
103+
throw new PipelineJobException(e);
104+
}
105+
}
106+
107+
public void markComplete()
108+
{
109+
markComplete(true);
110+
}
111+
112+
public void markComplete(boolean deleteFile)
113+
{
114+
File file = getSerializedJson(_localWorkDir, getJsonName());
115+
if (file.exists())
116+
{
117+
_log.info("closing job resumer");
118+
if (deleteFile)
119+
file.delete();
120+
else
121+
_log.debug("delete of file will be deferred: " + file.getPath());
122+
}
123+
}
124+
125+
public void saveState() throws PipelineJobException
126+
{
127+
writeToJson();
128+
}
129+
130+
public TaskFileManager getFileManager()
131+
{
132+
return _fileManager;
133+
}
134+
135+
public void setFileManager(TaskFileManager fileManager)
136+
{
137+
_fileManager = fileManager;
138+
}
139+
140+
public boolean isResume()
141+
{
142+
return _isResume;
143+
}
144+
145+
public void setResume(boolean resume)
146+
{
147+
_isResume = resume;
148+
}
149+
150+
public LinkedHashSet<RecordedAction> getRecordedActions()
151+
{
152+
return _recordedActions;
153+
}
154+
155+
public void setRecordedActions(LinkedHashSet<RecordedAction> recordedActions)
156+
{
157+
_recordedActions = recordedActions;
158+
}
159+
160+
public File getLocalWorkDir()
161+
{
162+
return _localWorkDir;
163+
}
164+
165+
public void setLocalWorkDir(File localWorkDir)
166+
{
167+
_localWorkDir = localWorkDir;
168+
}
169+
170+
public Logger getLogger()
171+
{
172+
return _log;
173+
}
174+
175+
public void setLogger(Logger log)
176+
{
177+
_log = log;
178+
}
179+
180+
public Map<File, File> getCopiedInputs()
181+
{
182+
return _copiedInputs;
183+
}
184+
185+
public void setCopiedInputs(Map<File, File> copiedInputs)
186+
{
187+
_copiedInputs = copiedInputs;
188+
}
189+
190+
public List<Pair<File, File>> getFilesCopiedLocally()
191+
{
192+
return _filesCopiedLocally;
193+
}
194+
195+
public void setFilesCopiedLocally(List<Pair<File, File>> filesCopiedLocally)
196+
{
197+
_filesCopiedLocally = filesCopiedLocally;
198+
}
199+
200+
public void addFileCopiedLocally(File orig, File copied)
201+
{
202+
_filesCopiedLocally.add(Pair.of(orig, copied));
203+
}
204+
205+
public static <T extends AbstractResumer> T create(SequenceOutputHandler.JobContext ctx, String jsonName, Class<T> clazz) throws PipelineJobException
206+
{
207+
if (!(ctx instanceof SequenceOutputHandler.MutableJobContext))
208+
{
209+
throw new IllegalArgumentException("Expected JobContext to be instance of MutableJobContext");
210+
}
211+
212+
T ret;
213+
File json = getSerializedJson(ctx.getSourceDirectory(), jsonName);
214+
if (!json.exists())
215+
{
216+
try
217+
{
218+
ret = clazz.getDeclaredConstructor(SequenceOutputHandler.JobContext.class).newInstance(ctx);
219+
}
220+
catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e)
221+
{
222+
throw new PipelineJobException(e);
223+
}
224+
}
225+
else
226+
{
227+
ret = readFromJson(json, clazz);
228+
ret.setResume(true);
229+
ret.setLogger(ctx.getLogger());
230+
ret.setLocalWorkDir(ctx.getWorkDir().getDir());
231+
ret.getFileManager().onResume(ctx.getJob(), ctx.getWorkDir());
232+
233+
ctx.getLogger().debug("FileManagers initially equal: " + ctx.getFileManager().equals(ret.getFileManager()));
234+
235+
ctx.getLogger().debug("Replacing fileManager on JobContext");
236+
((SequenceOutputHandler.MutableJobContext)ctx).setFileManager(ret.getFileManager());
237+
try
238+
{
239+
if (!ret.getCopiedInputs().isEmpty())
240+
{
241+
for (File orig : ret.getCopiedInputs().keySet())
242+
{
243+
ctx.getWorkDir().inputFile(orig, ret._copiedInputs.get(orig), false);
244+
}
245+
}
246+
}
247+
catch (IOException e)
248+
{
249+
throw new PipelineJobException(e);
250+
}
251+
252+
//debugging:
253+
ctx.getLogger().debug("loaded from file. total recorded actions: " + ret.getRecordedActions().size());
254+
ctx.getLogger().debug("total sequence outputs: " + ret.getFileManager().getOutputsToCreate().size());
255+
ctx.getLogger().debug("total intermediate files: " + ret.getFileManager().getIntermediateFiles().size());
256+
for (RecordedAction a : ret.getRecordedActions())
257+
{
258+
ctx.getLogger().debug("action: " + a.getName() + ", inputs: " + a.getInputs().size() + ", outputs: " + a.getOutputs().size());
259+
}
260+
261+
if (ret._recordedActions == null)
262+
{
263+
throw new PipelineJobException("Job read from file, but did not have any saved actions. This indicates a problem w/ serialization.");
264+
}
265+
}
266+
267+
if (ret.isResume())
268+
{
269+
ctx.getLogger().info("resuming previous job");
270+
271+
}
272+
273+
boolean fmEqual = ctx.getFileManager().equals(ret._fileManager);
274+
ctx.getLogger().debug("FileManagers on resumer and JobContext equal: " + fmEqual);
275+
276+
return ret;
277+
}
278+
279+
public void markComplete(SequenceOutputHandler.JobContext ctx)
280+
{
281+
// NOTE: due to the way the resumer is set up, the FileManager used by the Resumer is a different
282+
// instance than the JobContext, meaning we need to manually pass information back to the primary FileManager
283+
ctx.getLogger().debug("total sequence outputs tracked in resumer: " + getFileManager().getOutputsToCreate().size());
284+
for (SequenceOutputFile so : getFileManager().getOutputsToCreate())
285+
{
286+
ctx.addSequenceOutput(so);
287+
}
288+
289+
ctx.getLogger().debug("total actions tracked in resumer: " + getRecordedActions().size());
290+
for (RecordedAction a : getRecordedActions())
291+
{
292+
ctx.addActions(a);
293+
}
294+
295+
ctx.getFileManager().addIntermediateFiles(getFileManager().getIntermediateFiles());
296+
297+
markComplete();
298+
}
299+
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/AbstractSequenceTaskFactory.java renamed to SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/AbstractSequenceTaskFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.labkey.sequenceanalysis.pipeline;
16+
package org.labkey.api.sequenceanalysis.pipeline;
1717

1818
import org.labkey.api.pipeline.AbstractTaskFactory;
1919
import org.labkey.api.pipeline.AbstractTaskFactorySettings;

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/DefaultPipelineStepOutput.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.File;
2323
import java.util.ArrayList;
24+
import java.util.Collection;
2425
import java.util.Collections;
2526
import java.util.List;
2627

@@ -78,7 +79,7 @@ public List<File> getIntermediateFiles()
7879
}
7980

8081
@Override
81-
public void removeIntermediateFiles(File toRemove)
82+
public void removeIntermediateFile(File toRemove)
8283
{
8384
_intermediateFiles.remove(toRemove);
8485
}
@@ -134,6 +135,12 @@ public void addIntermediateFile(File file, String role)
134135
_intermediateFiles.add(file);
135136
}
136137

138+
@Override
139+
public void addIntermediateFiles(Collection<File> files)
140+
{
141+
_intermediateFiles.addAll(files);
142+
}
143+
137144
public void addPicardMetricsFile(Readset rs, File metricFile, File inputFile)
138145
{
139146
_picardMetricsFiles.add(new PipelineStepOutput.PicardMetricsOutput(metricFile, inputFile, rs.getRowId()));
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.labkey.api.sequenceanalysis.pipeline;
2+
3+
import org.jetbrains.annotations.Nullable;
4+
5+
import java.io.File;
6+
import java.util.Collection;
7+
8+
public interface PipelineOutputTracker
9+
{
10+
/**
11+
* Add an intermediate file. If the user selected 'delete intermediates', this will be deleted on job success.
12+
* @param file
13+
*/
14+
public void addIntermediateFile(File file);
15+
16+
public void addIntermediateFiles(Collection<File> files);
17+
18+
/**
19+
* Add a SequenceOutputFile for this job. These files are tracked and displayed through the browser UI.
20+
* @param file
21+
* @param label
22+
* @param category
23+
* @param readsetId
24+
* @param analysisId
25+
* @param genomeId
26+
* @param description
27+
*/
28+
public void addSequenceOutput(File file, String label, String category, @Nullable Integer readsetId, @Nullable Integer analysisId, @Nullable Integer genomeId, @Nullable String description);
29+
30+
/**
31+
* Remove a previously added intermediate file
32+
* @param toRemove The file to remove
33+
*/
34+
public void removeIntermediateFile(File toRemove);
35+
36+
}

0 commit comments

Comments
 (0)