Skip to content

Commit 97b5215

Browse files
committed
Add workaround to attempt to recover out-of-sync cluster jobs
1 parent 48815be commit 97b5215

File tree

2 files changed

+89
-9
lines changed

2 files changed

+89
-9
lines changed

cluster/src/org/labkey/cluster/ClusterController.java

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package org.labkey.cluster;
1818

1919
import org.apache.commons.lang3.StringUtils;
20+
import org.apache.log4j.Logger;
2021
import org.labkey.api.action.ConfirmAction;
2122
import org.labkey.api.action.SpringActionController;
2223
import org.labkey.api.pipeline.PipelineJob;
24+
import org.labkey.api.pipeline.PipelineJobException;
2325
import org.labkey.api.pipeline.PipelineJobService;
2426
import org.labkey.api.pipeline.PipelineService;
2527
import org.labkey.api.pipeline.PipelineStatusFile;
@@ -32,10 +34,13 @@
3234
import org.labkey.api.util.PageFlowUtil;
3335
import org.labkey.api.util.URLHelper;
3436
import org.labkey.api.view.HtmlView;
37+
import org.labkey.cluster.pipeline.AbstractClusterExecutionEngine;
3538
import org.springframework.validation.BindException;
3639
import org.springframework.validation.Errors;
3740
import org.springframework.web.servlet.ModelAndView;
3841

42+
import java.io.File;
43+
import java.io.IOException;
3944
import java.util.ArrayList;
4045
import java.util.List;
4146

@@ -44,6 +49,8 @@ public class ClusterController extends SpringActionController
4449
private static final DefaultActionResolver _actionResolver = new DefaultActionResolver(ClusterController.class);
4550
public static final String NAME = "cluster";
4651

52+
private static final Logger _log = Logger.getLogger(ClusterController.class);
53+
4754
public ClusterController()
4855
{
4956
setActionResolver(_actionResolver);
@@ -82,26 +89,26 @@ public boolean handlePost(Object form, BindException errors) throws Exception
8289
}
8390

8491
@RequiresSiteAdmin
85-
public class ForcePipelineCancelAction extends ConfirmAction<ForcePipelineCancelForm>
92+
public class ForcePipelineCancelAction extends ConfirmAction<JobIdsForm>
8693
{
87-
public void validateCommand(ForcePipelineCancelForm form, Errors errors)
94+
public void validateCommand(JobIdsForm form, Errors errors)
8895
{
8996

9097
}
9198

92-
public URLHelper getSuccessURL(ForcePipelineCancelForm form)
99+
public URLHelper getSuccessURL(JobIdsForm form)
93100
{
94101
return PageFlowUtil.urlProvider(PipelineStatusUrls.class).urlBegin(getContainer());
95102
}
96103

97-
public ModelAndView getConfirmView(ForcePipelineCancelForm form, BindException errors) throws Exception
104+
public ModelAndView getConfirmView(JobIdsForm form, BindException errors) throws Exception
98105
{
99106
return new HtmlView(HtmlString.unsafe("This will change the status of the pipeline job with the provided ID to Cancelled. It is intended to help the situation when the normal UI leave a job in a perpetual 'Cancelling' state." +
100107
"To continue, enter a comma-delimited list of Job IDs and hit submit:<br><br>" +
101108
"<label>Enter Job ID(s): </label><input name=\"jobIds\"><br>"));
102109
}
103110

104-
public boolean handlePost(ForcePipelineCancelForm form, BindException errors) throws Exception
111+
public boolean handlePost(JobIdsForm form, BindException errors) throws Exception
105112
{
106113
String jobIDs = StringUtils.trimToNull(form.getJobIds());
107114
if (jobIDs == null)
@@ -139,7 +146,7 @@ public boolean handlePost(ForcePipelineCancelForm form, BindException errors) th
139146
}
140147
}
141148

142-
public static class ForcePipelineCancelForm
149+
public static class JobIdsForm
143150
{
144151
private String _jobIds;
145152

@@ -153,4 +160,77 @@ public void setJobIds(String jobIds)
153160
_jobIds = jobIds;
154161
}
155162
}
163+
164+
@RequiresSiteAdmin
165+
public class RecoverCompletedJobsAction extends ConfirmAction<JobIdsForm>
166+
{
167+
public void validateCommand(JobIdsForm form, Errors errors)
168+
{
169+
170+
}
171+
172+
public URLHelper getSuccessURL(JobIdsForm form)
173+
{
174+
return PageFlowUtil.urlProvider(PipelineStatusUrls.class).urlBegin(getContainer());
175+
}
176+
177+
public ModelAndView getConfirmView(JobIdsForm form, BindException errors) throws Exception
178+
{
179+
return new HtmlView(HtmlString.unsafe("This will attempt to re-queue existing pipeline jobs using their serialized JSON text files. It is intended as a workaround for the situation where a job has been marked complete." +
180+
"To continue, enter a comma-delimited list of Job IDs and hit submit:<br><br>" +
181+
"<label>Enter Job ID(s): </label><input name=\"jobIds\"><br>"));
182+
}
183+
184+
public boolean handlePost(JobIdsForm form, BindException errors) throws Exception
185+
{
186+
String jobIDs = StringUtils.trimToNull(form.getJobIds());
187+
if (jobIDs == null)
188+
{
189+
errors.reject(ERROR_MSG, "No JobIds provided");
190+
return false;
191+
}
192+
193+
List<PipelineStatusFile> sfs = new ArrayList<>();
194+
for (String id : jobIDs.split(","))
195+
{
196+
int jobId = Integer.parseInt(StringUtils.trimToNull(id));
197+
PipelineStatusFile sf = PipelineService.get().getStatusFile(jobId);
198+
if (sf == null)
199+
{
200+
errors.reject(ERROR_MSG, "Unable to find job: " + id);
201+
return false;
202+
}
203+
204+
if (!PipelineJob.TaskStatus.running.name().equalsIgnoreCase(sf.getStatus()))
205+
{
206+
errors.reject(ERROR_MSG, "This cannot be used on actively running jobs. Status was: " + sf.getStatus());
207+
return false;
208+
}
209+
210+
sfs.add(sf);
211+
}
212+
213+
sfs.forEach(sf -> {
214+
215+
File log = new File(sf.getFilePath());
216+
File json = AbstractClusterExecutionEngine.getSerializedJobFile(log);
217+
if (!json.exists())
218+
{
219+
return;
220+
}
221+
222+
try
223+
{
224+
PipelineJob job = PipelineJob.readFromFile(json);
225+
job.setStatus(job.getActiveTaskStatus());
226+
}
227+
catch (PipelineJobException | IOException e)
228+
{
229+
_log.error(e);
230+
}
231+
});
232+
233+
return true;
234+
}
235+
}
156236
}

cluster/src/org/labkey/cluster/pipeline/AbstractClusterExecutionEngine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
/**
5454
* Created by bimber on 7/11/2017.
5555
*/
56-
abstract class AbstractClusterExecutionEngine<ConfigType extends PipelineJobService.RemoteExecutionEngineConfig> implements RemoteClusterEngine, RemoteExecutionEngine<ConfigType>
56+
abstract public class AbstractClusterExecutionEngine<ConfigType extends PipelineJobService.RemoteExecutionEngineConfig> implements RemoteClusterEngine, RemoteExecutionEngine<ConfigType>
5757
{
5858
private Logger _log;
5959
public static final String PREPARING = "PREPARING";
@@ -177,7 +177,7 @@ public void runTestJob(Container c, User u) throws PipelineJobException
177177

178178
abstract protected Pair<String, String> getStatusForJob(ClusterJob job, Container c);
179179

180-
private File getSerializedJobFile(File jobLogFile)
180+
public static File getSerializedJobFile(File jobLogFile)
181181
{
182182
if (jobLogFile == null)
183183
{
@@ -769,7 +769,7 @@ protected void checkForCompletedJob(ClusterJob job)
769769
filter.addCondition(FieldKey.fromString("status"), PipelineJob.TaskStatus.complete.name().toUpperCase());
770770
if (new TableSelector(ti, filter, null).exists())
771771
{
772-
_log.error("unable to find record of job from condor; however, the submissions table indicates it was marked as complete. this might indicate a lost JMS message to update the job's status.", new Exception());
772+
_log.error("unable to find record of job from cluster; however, the submissions table indicates it was marked as complete. this might indicate a lost JMS message to update the job's status.", new Exception());
773773
//return PipelineJob.TaskStatus.complete.name().toUpperCase();
774774
}
775775
}

0 commit comments

Comments
 (0)