diff --git a/primeseq/src/org/labkey/primeseq/PrimeseqController.java b/primeseq/src/org/labkey/primeseq/PrimeseqController.java index bcd58996..df0b0a40 100644 --- a/primeseq/src/org/labkey/primeseq/PrimeseqController.java +++ b/primeseq/src/org/labkey/primeseq/PrimeseqController.java @@ -43,6 +43,7 @@ import org.labkey.api.pipeline.PipelineService; import org.labkey.api.pipeline.PipelineStatusFile; import org.labkey.api.pipeline.PipelineUrls; +import org.labkey.api.query.DetailsURL; import org.labkey.api.security.RequiresPermission; import org.labkey.api.security.RequiresSiteAdmin; import org.labkey.api.security.permissions.ReadPermission; @@ -58,6 +59,7 @@ import org.labkey.api.view.HtmlView; import org.labkey.api.writer.PrintWriters; import org.labkey.primeseq.pipeline.MhcCleanupPipelineJob; +import org.labkey.primeseq.pipeline.RemoteEtlDebugPipelineJob; import org.springframework.validation.BindException; import org.springframework.validation.Errors; import org.springframework.web.servlet.ModelAndView; @@ -72,6 +74,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; public class PrimeseqController extends SpringActionController @@ -318,6 +321,7 @@ private SQLFragment getSql(UpdateFilePathsForm form) return sql; } + @Override public boolean handlePost(UpdateFilePathsForm form, BindException errors) throws Exception { @@ -646,7 +650,8 @@ public boolean handlePost(PerformMhcCleanupForm o, BindException errors) throws { PipeRoot pipelineRoot = PipelineService.get().findPipelineRoot(getContainer()); MhcCleanupPipelineJob job = new MhcCleanupPipelineJob(getContainer(), getUser(), getViewContext().getActionURL(), pipelineRoot, o.isPerformDeletes(), o.getMinAnalysisId()); - if (o.isDeleteMultiLineage()) { + if (o.isDeleteMultiLineage()) + { job.setDropMultiLineageMHC(o.isDeleteMultiLineage()); } @@ -798,4 +803,35 @@ public void setRestartJobs(boolean restartJobs) _restartJobs = restartJobs; } } + + @UtilityAction(label = "Initiate Remote ETL Debug", description = "This starts a pipeline job designed to debug remote ETL queries") + @RequiresSiteAdmin + public static class DoRemoteEtlDebugAction extends ConfirmAction + { + @Override + public ModelAndView getConfirmView(Object o, BindException errors) throws Exception + { + return new HtmlView(HtmlString.of("This starts a pipeline job designed to simulate remote ETL queries, pausing periodically to simulate a commit. Do you want to continue?")); + } + + @Override + public boolean handlePost(Object o, BindException errors) throws Exception + { + PipelineService.get().queueJob(RemoteEtlDebugPipelineJob.create(getContainer(), getUser())); + + return true; + } + + @Override + public void validateCommand(Object o, Errors errors) + { + + } + + @Override + public @NotNull URLHelper getSuccessURL(Object o) + { + return Objects.requireNonNull(DetailsURL.fromString("/pipeline-status/showList.view", getContainer()).getActionURL()); + } + } } \ No newline at end of file diff --git a/primeseq/src/org/labkey/primeseq/PrimeseqModule.java b/primeseq/src/org/labkey/primeseq/PrimeseqModule.java index 7bf969e3..5af95d8c 100644 --- a/primeseq/src/org/labkey/primeseq/PrimeseqModule.java +++ b/primeseq/src/org/labkey/primeseq/PrimeseqModule.java @@ -40,6 +40,7 @@ import org.labkey.primeseq.pipeline.DeleteJobCheckpointButton; import org.labkey.primeseq.pipeline.ExacloudResourceSettings; import org.labkey.primeseq.pipeline.MhcCleanupPipelineJob; +import org.labkey.primeseq.pipeline.RemoteEtlDebugPipelineJob; import org.labkey.primeseq.pipeline.SequenceJobResourceAllocator; import org.labkey.primeseq.query.PerformMhcCleanupButton; import org.labkey.primeseq.query.UpdateResourcesButton; @@ -80,6 +81,7 @@ protected void doStartupAfterSpringConfig(ModuleContext moduleContext) ClusterService.get().registerResourceAllocator(new SequenceJobResourceAllocator.Factory()); PipelineService.get().registerPipelineProvider(new MhcCleanupPipelineJob.Provider(this)); + PipelineService.get().registerPipelineProvider(new RemoteEtlDebugPipelineJob.Provider(this)); //register resources new PipelineStartup(); diff --git a/primeseq/src/org/labkey/primeseq/pipeline/RemoteEtlDebugPipelineJob.java b/primeseq/src/org/labkey/primeseq/pipeline/RemoteEtlDebugPipelineJob.java new file mode 100644 index 00000000..abdb5d0f --- /dev/null +++ b/primeseq/src/org/labkey/primeseq/pipeline/RemoteEtlDebugPipelineJob.java @@ -0,0 +1,233 @@ +package org.labkey.primeseq.pipeline; + +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.Logger; +import org.labkey.api.assay.AssayFileWriter; +import org.labkey.api.data.Container; +import org.labkey.api.data.DbScope; +import org.labkey.api.data.TableInfo; +import org.labkey.api.dataiterator.DataIterator; +import org.labkey.api.dataiterator.DataIteratorContext; +import org.labkey.api.dataiterator.WrapperDataIterator; +import org.labkey.api.di.DataIntegrationService; +import org.labkey.api.module.Module; +import org.labkey.api.pipeline.PipeRoot; +import org.labkey.api.pipeline.PipelineDirectory; +import org.labkey.api.pipeline.PipelineJob; +import org.labkey.api.pipeline.PipelineProvider; +import org.labkey.api.pipeline.PipelineService; +import org.labkey.api.query.QueryService; +import org.labkey.api.query.QueryUpdateService; +import org.labkey.api.query.UserSchema; +import org.labkey.api.reader.JSONDataLoader; +import org.labkey.api.security.User; +import org.labkey.api.util.FileUtil; +import org.labkey.api.util.URLHelper; +import org.labkey.api.view.ViewBackgroundInfo; +import org.labkey.api.view.ViewContext; +import org.labkey.primeseq.PrimeseqController; +import org.labkey.remoteapi.CommandException; +import org.labkey.remoteapi.Connection; +import org.labkey.remoteapi.query.SelectRowsCommand; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Date; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemoteEtlDebugPipelineJob extends PipelineJob +{ + public static class Provider extends PipelineProvider + { + public static final String NAME = "etlDebugPipeline"; + + public Provider(Module owningModule) + { + super(NAME, owningModule); + } + + @Override + public void updateFileProperties(ViewContext context, PipeRoot pr, PipelineDirectory directory, boolean includeAll) + { + + } + } + + // Default constructor for serialization + protected RemoteEtlDebugPipelineJob() + { + } + + private RemoteEtlDebugPipelineJob(String providerName, Container c, User u, PipeRoot pipeRoot) + { + super(providerName, new ViewBackgroundInfo(c, u, null), pipeRoot); + } + + public static RemoteEtlDebugPipelineJob create(Container c, User u) + { + PipeRoot pipeRoot = PipelineService.get().getPipelineRootSetting(c); + RemoteEtlDebugPipelineJob job = new RemoteEtlDebugPipelineJob(Provider.NAME, c, u, pipeRoot); + job.setLogFile(AssayFileWriter.findUniqueFileName(FileUtil.makeLegalName("etlDebugJob.log"), pipeRoot.getRootPath()).toPath()); + + return job; + } + + @Override + public URLHelper getStatusHref() + { + return null; + } + + @Override + public String getDescription() + { + return "Designed to debug remote API calls and mimic ETLs"; + } + + @Override + public void run() + { + try + { + setStatus(TaskStatus.running); + getLogger().info("Starting"); + + DataIntegrationService.RemoteConnection rc = DataIntegrationService.get().getRemoteConnection("PRIMe", getContainer(), _log); + if (rc == null) + { + throw new IllegalStateException("Unable to find remote data source named PRIMe in this container"); + } + + getLogger().info("Querying remote server"); + + SelectRowsCommand2 sr = new SelectRowsCommand2("study", "weight"); + sr.setColumns(Arrays.asList("Id", "date", "objectid", "QCState/Label")); + + final int BATCH_SIZE = 1000; + final int PAUSE_LENGTH = 10; + AtomicInteger al = new AtomicInteger(); + DataIterator di = sr.getWrapperDataIterator(rc.connection, rc.remoteContainer, getContainer(), getLogger()); + + UserSchema us = QueryService.get().getUserSchema(getUser(), getContainer(), "study"); + if (us == null) + { + throw new IllegalStateException("Unable to find user schema for study"); + } + + TableInfo weights = us.getTable("weight"); + if (weights == null) + { + throw new IllegalStateException("Unable to find weights"); + } + + QueryUpdateService qus = weights.getUpdateService(); + qus.setBulkLoad(true); + + + try (DbScope.Transaction transaction = DbScope.getLabKeyScope().ensureTransaction()) + { + final Date lastValue = new Date(); + di.stream().forEach(row -> { + try + { + al.getAndIncrement(); + qus.insertRows(getUser(), getContainer(), Arrays.asList(row), null, null, null); + + if (al.get() % BATCH_SIZE == 0) + { + Date now = new Date(); + long duration = now.getTime() - lastValue.getTime(); + lastValue.setTime(now.getTime()); + getLogger().info("Imported " + al.get() + " rows. Duration (including sleep): " + (duration/1000)); + + try + { + getLogger().info("Sleeping for " + PAUSE_LENGTH + " seconds."); + Thread.sleep(PAUSE_LENGTH * 1000); + getLogger().info("Done sleeping"); + } + catch (InterruptedException e) + { + getLogger().error(e.getMessage(), e); + } + } + } + catch (Exception e) + { + getLogger().error(e.getMessage(), e); + } + }); + } + + getLogger().info("Done!"); + } + catch (Exception e) + { + setStatus(TaskStatus.error); + throw new RuntimeException(e); + } + } + + // This is a hack to access SelectRowsStreamHack + private static class SelectRowsCommand2 extends SelectRowsCommand + { + + public SelectRowsCommand2(String schemaName, String queryName) + { + super(schemaName, queryName); + } + + public WrapperDataIterator getWrapperDataIterator(Connection connection, String remoteContainer, Container localContainer, Logger logger) + { + Response response; + try + { + response = _execute(connection, remoteContainer); + logger.info("response headers:"); + for (String key : Arrays.asList("Connection", "Transfer-Encoding", "Cache-Control", "Keep-Alive", "Pragma")) + { + logger.info(key + ": " + response.getHeaderValue(key)); + } + } + catch (CommandException | IOException e) + { + throw new RuntimeException("Failed to execute remote query", e); + } + + try + { + final InputStream is = response.getInputStream(); + final JSONDataLoader loader = new JSONDataLoader(is, localContainer); + DataIteratorContext di = new DataIteratorContext(); + WrapperDataIterator wrapper = new WrapperDataIterator(loader.getDataIterator(di)) + { + @Override + public void close() throws IOException + { + logger.info("Calling close() in WrapperDataIterator"); + + // close the InputStream and http connection + if (is != null) + IOUtils.closeQuietly(is); + response.close(); + + // close the JSONDataLoader + super.close(); + } + }; + wrapper.setDebugName("SelectRows:JSONDataLoader"); + + return wrapper; + } + catch (Exception e) + { + logger.info(e.getMessage()); + logger.info("status code: " + response.getStatusCode()); + logger.error(e); + + throw new RuntimeException(e); + } + } + } +}