diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..596b3eef1 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,2 @@ +- In Java code, use JUnit 4 & Google truth. Test needs to be added to AllTests.java or else it won't run. +- See README.md & CONTRIBUTING.md diff --git a/README.md b/README.md index d613e0d4c..029e74558 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,9 @@ configuration for [pre-commit](https://pre-commit.com). Install the hook with pipenv shell ``` +From this shell, run `pip install -e .` to create `launchable` wrapper script from the workspace. +This is useful to test the CLI with real/local server. + ## Run tests cli ```shell diff --git a/launchable/jar/exe_deploy.jar b/launchable/jar/exe_deploy.jar index 5800a9055..875c78408 100755 Binary files a/launchable/jar/exe_deploy.jar and b/launchable/jar/exe_deploy.jar differ diff --git a/src/main/java/com/launchableinc/ingest/commits/BackgroundWorkStatus.java b/src/main/java/com/launchableinc/ingest/commits/BackgroundWorkStatus.java new file mode 100644 index 000000000..3a3708473 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/BackgroundWorkStatus.java @@ -0,0 +1,8 @@ +package com.launchableinc.ingest.commits; + +public enum BackgroundWorkStatus { + IN_PROGRESS, + SUCCEEDED, + FAILED, + ABANDONED +} diff --git a/src/main/java/com/launchableinc/ingest/commits/BoundedExecutorService.java b/src/main/java/com/launchableinc/ingest/commits/BoundedExecutorService.java new file mode 100644 index 000000000..d2d745d05 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/BoundedExecutorService.java @@ -0,0 +1,73 @@ +package com.launchableinc.ingest.commits; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * {@link ExecutorService} decorator that limits the number of concurrent tasks, + * and make the caller block when the limit is reached. + */ +class BoundedExecutorService extends AbstractExecutorService { + private final ExecutorService delegate; + private final Semaphore semaphore; + + BoundedExecutorService(int limit) { + this(Executors.newFixedThreadPool(limit), limit); + } + + BoundedExecutorService(ExecutorService delegate, int limit) { + this.delegate = delegate; + this.semaphore = new Semaphore(limit); + } + + @Override + public void execute(Runnable command) { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + try { + delegate.execute(() -> { + try { + command.run(); + } finally { + semaphore.release(); + } + }); + } catch (RejectedExecutionException e) { + semaphore.release(); + throw e; + } + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } +} diff --git a/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java index 7be425e2a..29605ffc1 100644 --- a/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java +++ b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java @@ -2,7 +2,6 @@ import org.apache.http.entity.ContentProducer; -import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; @@ -12,13 +11,13 @@ /** * Accepts T, buffers them, and writes them out as a batch. */ -abstract class ChunkStreamer implements FlushableConsumer, Closeable { +abstract class ChunkStreamer implements FlushableConsumer { /** * Encapsulation of how batches are sent. */ private final IOConsumer sender; private final int chunkSize; - private final List spool = new ArrayList<>(); + private List spool = new ArrayList<>(); ChunkStreamer(IOConsumer sender, int chunkSize) { this.sender = sender; @@ -50,6 +49,7 @@ public void flush() throws IOException { try { sender.accept(os -> writeTo(spool,os)); + // let sender own the list -- do not reuse } finally { spool.clear(); } diff --git a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java index 3ac660f2b..db40eb6a9 100644 --- a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java +++ b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java @@ -52,19 +52,26 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.zip.GZIPOutputStream; -import static com.google.common.collect.ImmutableList.*; -import static java.util.Arrays.*; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Arrays.stream; +import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * Compares what commits the local repository and the remote repository have, then send delta over. @@ -77,6 +84,7 @@ public class CommitGraphCollector { * Repository header is sent using this reserved file name */ static final String HEADER_FILE = ".launchable"; + private static final String APPLICATION_JSON = "application/json"; private final String rootName; @@ -87,7 +95,7 @@ public class CommitGraphCollector { */ private final Repository root; - private int commitsSent, filesSent; + private final AtomicInteger commitsSent = new AtomicInteger(), filesSent = new AtomicInteger(); private boolean collectCommitMessage, collectFiles; @@ -119,11 +127,11 @@ public CommitGraphCollector(String name, Repository git) { /** How many commits did we transfer? */ public int getCommitsSent() { - return commitsSent; + return commitsSent.get(); } public int getFilesSent() { - return filesSent; + return filesSent.get(); } private String dumpHeaderAsJson(Header[] headers) throws JsonProcessingException { @@ -165,7 +173,7 @@ public void transfer(URL service, Authenticator authenticator, boolean enableTim (ContentProducer commits) -> sendCommits(service, client, commits), new TreeReceiverImpl(service, client), (ContentProducer files) -> sendFiles(service, client, files), - 256); + 1024); } } @@ -194,6 +202,7 @@ private void sendFiles(URL service, CloseableHttpClient client, ContentProducer URL url = new URL(service, "collect/files"); HttpPost request = new HttpPost(url.toExternalForm()); request.setHeader("Content-Type", "application/octet-stream"); + request.setHeader("Accept", "application/json; mode=async"); // no content encoding, since .tar.gz is considered content request.setEntity(new EntityTemplate(os -> files.writeTo(new GZIPOutputStream(os)))); @@ -224,7 +233,37 @@ private void sendFiles(URL service, CloseableHttpClient client, ContentProducer if (dryRun) { return; } - handleError(url, client.execute(request)).close(); + + int workId = readResponse(handleError(url, client.execute(request)), JSAsyncFileCollectionResponse.class).workId; + URL workUrl = new URL(service, "collect/files/work/" + workId); + while (true) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // not expecting this to happen, sufficient to fail + throw new IOException(); + } + // TODO: utilize numFiles for progress report + HttpGet get = new HttpGet(url.toExternalForm()); + JSAsyncFileCollectionProgress status = readResponse(handleError(workUrl,client.execute(get)), JSAsyncFileCollectionProgress.class); + switch (status.status) { + case IN_PROGRESS: + break; // keep polling + case SUCCEEDED: + return; + case FAILED: + case ABANDONED: + throw new IOException("File collection failed: " + status.status); + } + } + } + + private T readResponse(CloseableHttpResponse response, Class type) throws IOException { + try (JsonParser parser = new JsonFactory().createParser(response.getEntity().getContent())) { + return objectMapper.readValue(parser, type); + } finally { + response.close(); + } } private void honorControlHeaders(HttpResponse response) { @@ -244,9 +283,8 @@ private void honorControlHeaders(HttpResponse response) { } } - private ImmutableList getAdvertisedRefs(HttpResponse response) throws IOException { - JsonParser parser = new JsonFactory().createParser(response.getEntity().getContent()); - String[] ids = objectMapper.readValue(parser, String[].class); + private ImmutableList getAdvertisedRefs(CloseableHttpResponse response) throws IOException { + String[] ids = readResponse(response, String[].class); return stream(ids) .map( s -> { @@ -271,10 +309,66 @@ public void transfer( Collection advertised, IOConsumer commitSender, TreeReceiver treeReceiver, IOConsumer fileSender, int chunkSize) throws IOException { ByRepository r = new ByRepository(root, rootName); - try (CommitChunkStreamer cs = new CommitChunkStreamer(commitSender, chunkSize); - FileChunkStreamer fs = new FileChunkStreamer(fileSender, chunkSize); - ProgressReportingConsumer fsr = new ProgressReportingConsumer<>(fs, VirtualFile::path, Duration.ofSeconds(3))) { - r.transfer(advertised, cs, treeReceiver, fsr); + + /* + Concurrency design + ================== + + The work ahead of us is: + - for each repository + - send all the files we need to send + - then record commits + + Commit recording has to happen after file sending, to ensure the server is in possession of all the files + relevant to the commits being recorded. This constraint allows for two places of parallelism: + 1. process multiple repositories in parallel + 2. within a repository, send (or "collect") files in parallel + + We exploit both of them. But by using two pools. The "scan" pool is used to parallelize the outer loop of + "for each repository". The "transfer" pool is used to parallelize file collection network I/O with the server. + + Those are matched M:N -- any scan thread can hand over work to any available transfer thread. This way, + the user gets speed boost, whether it's a single massive repo or a lot of small repos. + The separate transfer pool limit cap the concurrent server connections, to avoid overwhelming the server. + + Both thread pool are bounded, meaning the work producer gets blocked until the work consumer keeps up. + This creates natural work throttling, keeping overall memory consumption in check + */ + + ExecutorService scanPool = new BoundedExecutorService(4); +// ExecutorService scanPool = MoreExecutors.newDirectExecutorService(); // for debugging + ExecutorService transferPool = new BoundedExecutorService(4); + + ProgressReporter pr = new ProgressReporter<>(VirtualFile::path, Duration.ofSeconds(3)); + try { + r.forEachSubModule(scanPool, br -> { + if (collectFiles) { + // record all the necessary BLOBs first, before attempting to record its commit. + // this way, if the file collection fails, the server won't see this commit, so the future + // "record commit" invocation will retry the file collection, thereby making the behavior idempotent. + + // ConcurrentConsumer parallelizes file sending within a repository. When it leads the try block + // it ensures all the submissions have completed. + try (ConcurrentConsumer parallel = new ConcurrentConsumer<>(fileSender, transferPool); + FileChunkStreamer fs = new FileChunkStreamer(r.buildHeader(), parallel, chunkSize); + ProgressReporter.Consumer fsr = pr.newConsumer(fs)) { + br.collectFiles(advertised, treeReceiver, fsr); + } + } + + // we need to send commits in the topological order, so any parallelization within a repository + // is probably not worth the effort. + // TODO: If we process a repository and that doesn't create enough commits + // to form a full chunk, then it makes sense to concatenate them with other commits from other repositories. + // Even when # of repos is large, incremental transfer typically only produces a small amount of commits + // per repo, so this will considerably reduce the connection setup / tear down overhead. + try (CommitChunkStreamer cs = new CommitChunkStreamer(commitSender, chunkSize)) { + br.collectCommits(advertised, cs); + } + }); + } finally { + scanPool.shutdown(); + transferPool.shutdown(); } } @@ -323,83 +417,39 @@ final class ByRepository implements AutoCloseable { private final ObjectReader objectReader; private final Set shallowCommits; + private final ObjectId headId; ByRepository(Repository git, String name) throws IOException { this.name = name; this.git = git; this.objectReader = git.newObjectReader(); this.shallowCommits = objectReader.getShallowCommits(); + this.headId = git.resolve("HEAD"); } - /** - * Writes delta between local commits to the advertised to JSON stream. - * - * @param commitReceiver Receives commits that should be sent, one by one. - */ - public void transfer(Collection advertised, Consumer commitReceiver, TreeReceiver treeReceiver, FlushableConsumer fileReceiver) - throws IOException { - try (RevWalk walk = new RevWalk(git); TreeWalk treeWalk = new TreeWalk(git)) { - // walk reverse topological order, so that older commits get added to the server earlier. - // This way, the connectivity of the commit graph will be always maintained - walk.sort(RevSort.TOPO); - walk.sort(RevSort.REVERSE, true); - // also combine this with commit time based ordering, so that we can stop walking when we - // find old enough commits AFAICT, this is no-op in JGit and it always sorts things in - // commit time order, but it is in the contract, so I'm assuming we shouldn't rely on the - // implementation optimization that's currently enabling this all the time - walk.sort(RevSort.COMMIT_TIME_DESC, true); - - ObjectId headId = git.resolve("HEAD"); - RevCommit start = walk.parseCommit(headId); - walk.markStart(start); - treeWalk.addTree(start.getTree()); - - // don't walk commits too far back. - // for our purpose of computing CUT, these are unlikely to contribute meaningfully - // and it drastically cuts down the initial commit consumption of a new large repository. - // ... except we do want to capture the head commit, as that makes it easier to spot integration problems - // when `record build` and `record commit` are separated. - - // two RevFilters are order sensitive. This is because CommitTimeRevFilter.after doesn't return false to - // filter things out, it throws StopWalkException to terminate the walk, never giving a chance for the other - // branch of OR to be evaluated. So we need to put ObjectRevFilter first. - walk.setRevFilter( - OrRevFilter.create( - new ObjectRevFilter(headId), - CommitTimeRevFilter.after(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxDays)))); - - for (ObjectId id : advertised) { - try { - RevCommit c = walk.parseCommit(id); - walk.markUninteresting(c); - if (!reportAllFiles) { - treeWalk.addTree(c.getTree()); - } - } catch (MissingObjectException e) { - // it's possible that the server advertises a commit we don't have. - // - // TODO: how does git-push handles the case when the client doesn't recognize commits? - // Unless it tries to negotiate further what commits they have in common, - // git-upload-pack can end up creating a big pack with lots of redundant objects - // - // think about a case when a client is pushing a new branch against - // the master branch that moved on the server. - } - } - - // record all the necessary BLOBs first, before attempting to record its commit. - // this way, if the file collection fails, the server won't see this commit, so the future - // "record commit" invocation will retry the file collection, thereby making the behavior idempotent. - collectFiles(start, treeWalk, treeReceiver, fileReceiver); - fileReceiver.flush(); - - // walk the commits, transform them, and send them to the commitReceiver - for (RevCommit c : walk) { - commitReceiver.accept(transform(c)); - commitsSent++; + void forEachSubModule(ExecutorService threadPool, IOConsumer consumer) throws IOException { + for (Future f : forEachSubModuleAsync(threadPool, consumer)) { + try { + f.get(); + } catch (Exception e) { + throw new IOException("Failed to process a repository", e); } } + } + /** + * Recursively iterate all the sub-modules and apply the given consumer to them asynchronously, using the given + * thread pool. + * + *

+ * The way this function mixes (1) synchronous call to consumer with {@code this}, (2) use thread pool to recursively + * process submodules might be a bit hard to follow. This was motivated by the fact that {@link ByRepository} for + * sub-modules need to be closed, while {@code this} shouldn't be closed. + * + * @return all the async jobs that are forked off, to allow the caller to wait for their completion. + */ + Collection> forEachSubModuleAsync(ExecutorService threadPool, IOConsumer consumer) throws IOException { + Vector> jobs = new Vector<>(); /* Git submodule support ===================== @@ -418,73 +468,105 @@ That is, find submodules that are available in the working tree (thus `!isBare() if (!git.isBare()) { try (SubmoduleWalk swalk = SubmoduleWalk.forIndex(git)) { while (swalk.next()) { - try (Repository subRepo = swalk.getRepository()) { - if (subRepo != null) { - try { - try (ByRepository br = new ByRepository(subRepo, name + "/" + swalk.getModulesPath())) { - br.transfer(advertised, commitReceiver, treeReceiver, fileReceiver); + Repository subRepo = swalk.getRepository(); + if (subRepo != null) { + try { + ByRepository br = new ByRepository(subRepo, name + "/" + swalk.getModulesPath()); + jobs.add(threadPool.submit(() -> { + try { + jobs.addAll(br.forEachSubModuleAsync(threadPool, consumer)); + return null; + } finally { + br.close(); + subRepo.close(); } - } catch (ConfigInvalidException e) { - throw new IOException("Invalid Git submodule configuration: " + git.getDirectory(), e); - } + })); + } catch (ConfigInvalidException e) { + throw new IOException("Invalid Git submodule configuration: " + git.getDirectory(), e); } } } } } + + consumer.accept(this); + + return jobs; + } + + private void parseEachCommit(RevWalk walk, Collection advertised, IOConsumer consumer) throws IOException { + for (ObjectId id : advertised) { + try { + RevCommit c = walk.parseCommit(id); + consumer.accept(c); + } catch (MissingObjectException e) { + // it's possible that the server advertises a commit we don't have. + // + // TODO: how does git-push handles the case when the client doesn't recognize commits? + // Unless it tries to negotiate further what commits they have in common, + // git-upload-pack can end up creating a big pack with lots of redundant objects + // + // think about a case when a client is pushing a new branch against + // the master branch that moved on the server. + } + } } /** - * treeWalk contains the HEAD (the interesting commit) at the 0th position, then all the commits - * the server advertised in the 1st, 2nd, ... - * Our goal here is to find all the files that the server hasn't seen yet. We'll send them to the tree receiver, - * which further responds with the actual files we need to send to the server. + * Records all the necessary BLOBs first */ - private void collectFiles(RevCommit start, TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer fileReceiver) throws IOException { - if (!collectFiles) { - return; - } + void collectFiles(Collection advertised, TreeReceiver treeReceiver, FlushableConsumer fileReceiver) throws IOException { + try (TreeWalk treeWalk = new TreeWalk(git)) { + RevCommit start = git.parseCommit(headId); + treeWalk.addTree(start.getTree()); + + if (!reportAllFiles) { + // to optimize data transfer, skip files that the server has already seen + // i.e., files that are present in any of the advertised commits + // if the reportAllFiles flag is on, then skip this optimization on the client side. + // treeReceiver will still provide an opportunity for the server to be selective. + try (RevWalk walk = new RevWalk(git)) { + parseEachCommit(walk, advertised, c -> treeWalk.addTree(c.getTree())); + } + } - int c = treeWalk.getTreeCount(); + int c = treeWalk.getTreeCount(); - OUTER: - while (treeWalk.next()) { - ObjectId head = treeWalk.getObjectId(0); - for (int i = 1; i < c; i++) { - if (head.equals(treeWalk.getObjectId(i))) { - // file at the head is identical to one of the uninteresting commits, - // meaning we have already seen this file/directory on the server. - // if it is a dir, there's no need to visit this whole subtree, so skip over - continue OUTER; + OUTER: + while (treeWalk.next()) { + ObjectId head = treeWalk.getObjectId(0); + for (int i = 1; i < c; i++) { + if (head.equals(treeWalk.getObjectId(i))) { + // file at the head is identical to one of the uninteresting commits, + // meaning we have already seen this file/directory on the server. + // if it is a dir, there's no need to visit this whole subtree, so skip over + continue OUTER; + } } - } - if (treeWalk.isSubtree()) { - treeWalk.enterSubtree(); - } else { - if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) { - GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader); - // to avoid excessive data transfer, skip files that are too big - if (f.size() < 1024 * 1024 && f.isText() && !f.path.equals(HEADER_FILE)) { - treeReceiver.accept(f); + if (treeWalk.isSubtree()) { + treeWalk.enterSubtree(); + } else { + if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) { + GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader); + // to avoid excessive data transfer, skip files that are too big + if (f.size() < 1024 * 1024 && f.isText() && !f.path.equals(HEADER_FILE)) { + treeReceiver.accept(f); + } } } } - } - // Note(Konboi): To balance the order, since words like "test" and "spec" tend to appear - // toward the end in alphabetical sorting. - List files = new ArrayList<>(treeReceiver.response()); - if (!files.isEmpty()) { - fileReceiver.accept(buildHeader(start)); - filesSent++; + // Now let the server select the files it actually wants to see + Collection files = treeReceiver.response(); - Collections.shuffle(files); for (VirtualFile f : files) { fileReceiver.accept(f); - filesSent++; + filesSent.incrementAndGet(); } + + fileReceiver.flush(); } } @@ -492,7 +574,7 @@ private void collectFiles(RevCommit start, TreeWalk treeWalk, TreeReceiver treeR * Creates a per repository "header" file as a {@link VirtualFile}. * Currently, this is just the list of files in the repository. */ - private VirtualFile buildHeader(RevCommit start) throws IOException { + VirtualFile buildHeader() throws IOException { ByteArrayOutputStream os = new ByteArrayOutputStream(); try (JsonGenerator w = new JsonFactory().createGenerator(os)) { w.setCodec(objectMapper); @@ -500,7 +582,7 @@ private VirtualFile buildHeader(RevCommit start) throws IOException { w.writeArrayFieldStart("tree"); try (TreeWalk tw = new TreeWalk(git)) { - tw.addTree(start.getTree()); + tw.addTree(git.parseCommit(headId).getTree()); tw.setRecursive(true); while (tw.next()) { @@ -516,6 +598,43 @@ private VirtualFile buildHeader(RevCommit start) throws IOException { return VirtualFile.from(name, HEADER_FILE, ObjectId.zeroId(), os.toByteArray()); } + void collectCommits(Collection advertised, Consumer commitReceiver) throws IOException { + try (RevWalk walk = new RevWalk(git)) { + // walk reverse topological order, so that older commits get added to the server earlier. + // This way, the connectivity of the commit graph will be always maintained + walk.sort(RevSort.TOPO); + walk.sort(RevSort.REVERSE, true); + // also combine this with commit time based ordering, so that we can stop walking when we + // find old enough commits AFAICT, this is no-op in JGit and it always sorts things in + // commit time order, but it is in the contract, so I'm assuming we shouldn't rely on the + // implementation optimization that's currently enabling this all the time + walk.sort(RevSort.COMMIT_TIME_DESC, true); + + walk.markStart(walk.parseCommit(headId)); + + // don't walk commits too far back. + // for our purpose of computing CUT, these are unlikely to contribute meaningfully + // and it drastically cuts down the initial commit consumption of a new large repository. + // ... except we do want to capture the head commit, as that makes it easier to spot integration problems + // when `record build` and `record commit` are separated. + + // two RevFilters are order sensitive. This is because CommitTimeRevFilter.after doesn't return false to + // filter things out, it throws StopWalkException to terminate the walk, never giving a chance for the other + // branch of OR to be evaluated. So we need to put ObjectRevFilter first. + walk.setRevFilter( + OrRevFilter.create( + new ObjectRevFilter(headId), + CommitTimeRevFilter.after(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxDays)))); + + parseEachCommit(walk, advertised, walk::markUninteresting); + + // walk the commits, transform them, and send them to the commitReceiver + for (RevCommit c : walk) { + commitReceiver.accept(transform(c)); + commitsSent.incrementAndGet(); + } + } + } private JSCommit transform(RevCommit r) throws IOException { JSCommit c = new JSCommit(); @@ -622,7 +741,7 @@ public Collection response() { try { URL url = new URL(service, "collect/tree"); HttpPost request = new HttpPost(url.toExternalForm()); - request.setHeader("Content-Type", "application/json"); + request.setHeader("Content-Type", APPLICATION_JSON); request.setHeader("Content-Encoding", "gzip"); request.setEntity(new EntityTemplate(raw -> { try (OutputStream os = new GZIPOutputStream(raw)) { @@ -639,10 +758,7 @@ public Collection response() { } // even in dry run, this method needs to execute in order to show what files we'll be collecting - try (CloseableHttpResponse response = handleError(url, client.execute(request)); - JsonParser parser = new JsonFactory().createParser(response.getEntity().getContent())) { - return select(objectMapper.readValue(parser, String[].class)); - } + return select(readResponse(handleError(url, client.execute(request)), String[].class)); } catch (IOException e) { throw new UncheckedIOException(e); } finally { diff --git a/src/main/java/com/launchableinc/ingest/commits/ConcurrentConsumer.java b/src/main/java/com/launchableinc/ingest/commits/ConcurrentConsumer.java new file mode 100644 index 000000000..fccaa2b53 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/ConcurrentConsumer.java @@ -0,0 +1,45 @@ +package com.launchableinc.ingest.commits; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * A decorator of {@link IOConsumer}/{@link Consumer} that concurrently/asynchronously processes accepted items. + *

+ * {@link #close()} method would wait for all the processing to complete, and fail if any of them throw an exception. + */ +class ConcurrentConsumer implements IOConsumer, Consumer, Closeable { + private final IOConsumer delegate; + private final ExecutorService executor; + private final List> jobs = new ArrayList<>(); + + ConcurrentConsumer(IOConsumer delegate, ExecutorService executor) { + this.delegate = delegate; + this.executor = executor; + } + + @Override + public void accept(T t) { + jobs.add(executor.submit(() -> { + delegate.accept(t); + return null; // to use Callable interface so as not to wrap an exception + })); + } + + @Override + public void close() throws IOException { + try { + for (Future job : jobs) { + job.get(); + } + } catch (InterruptedException|ExecutionException e) { + throw new IOException(e); + } + } +} diff --git a/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java index 698c69438..fad3dab02 100644 --- a/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java +++ b/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java @@ -14,8 +14,11 @@ * Receives {@link GitFile}, buffers them, and writes them out in a gzipped tar file. */ final class FileChunkStreamer extends ChunkStreamer { - FileChunkStreamer(IOConsumer sender, int chunkSize) { + private final VirtualFile header; + + FileChunkStreamer(VirtualFile header, IOConsumer sender, int chunkSize) { super(sender, chunkSize); + this.header = header; } @Override @@ -23,14 +26,22 @@ protected void writeTo(List files, OutputStream os) throws IOExcept try (TarArchiveOutputStream tar = new TarArchiveOutputStream(os, "UTF-8")) { tar.setLongFileMode(LONGFILE_POSIX); + if (header!=null) { + write(header, tar); + } + for (VirtualFile f : files) { - TarArchiveEntry e = new TarArchiveEntry(f.path()); - e.setSize(f.size()); - e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID - tar.putArchiveEntry(e); - f.writeTo(tar); - tar.closeArchiveEntry(); + write(f, tar); } } } + + private static void write(VirtualFile f, TarArchiveOutputStream tar) throws IOException { + TarArchiveEntry e = new TarArchiveEntry(f.path()); + e.setSize(f.size()); + e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID + tar.putArchiveEntry(e); + f.writeTo(tar); + tar.closeArchiveEntry(); + } } diff --git a/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java index 7eb8c701d..8e3272d14 100644 --- a/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java +++ b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java @@ -1,12 +1,13 @@ package com.launchableinc.ingest.commits; +import java.io.Closeable; import java.io.IOException; import java.util.function.Consumer; /** * Consumers that spool items it accepts and process them in bulk. */ -public interface FlushableConsumer extends Consumer { +public interface FlushableConsumer extends Consumer, Closeable { /** * Process all items that have been accepted so far. */ @@ -15,7 +16,12 @@ public interface FlushableConsumer extends Consumer { static FlushableConsumer of(Consumer c) { return new FlushableConsumer() { @Override - public void flush() throws IOException { + public void flush() { + // noop + } + + @Override + public void close() { // noop } diff --git a/src/main/java/com/launchableinc/ingest/commits/JSAsyncFileCollectionProgress.java b/src/main/java/com/launchableinc/ingest/commits/JSAsyncFileCollectionProgress.java new file mode 100644 index 000000000..ac736f8d7 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/JSAsyncFileCollectionProgress.java @@ -0,0 +1,10 @@ +package com.launchableinc.ingest.commits; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class JSAsyncFileCollectionProgress { + @JsonProperty + public BackgroundWorkStatus status; + @JsonProperty + public int filesProcessed; +} diff --git a/src/main/java/com/launchableinc/ingest/commits/JSAsyncFileCollectionResponse.java b/src/main/java/com/launchableinc/ingest/commits/JSAsyncFileCollectionResponse.java new file mode 100644 index 000000000..c506f5938 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/JSAsyncFileCollectionResponse.java @@ -0,0 +1,8 @@ +package com.launchableinc.ingest.commits; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class JSAsyncFileCollectionResponse { + @JsonProperty + public int workId; +} diff --git a/src/main/java/com/launchableinc/ingest/commits/ProgressReporter.java b/src/main/java/com/launchableinc/ingest/commits/ProgressReporter.java new file mode 100644 index 000000000..d5ba0e7de --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/ProgressReporter.java @@ -0,0 +1,93 @@ +package com.launchableinc.ingest.commits; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static java.time.Instant.now; + +/** + * Given multiple concurrent slow {@link Consumer}s, each g oing over a large + * number of items in parallel, + * provide a progress report to show that the work is still in progress. + */ +class ProgressReporter { + private final Function printer; + private final Duration reportInterval; + private Instant nextReportTime; + + /** + * Number of items that need to be processed, across all consumers. + */ + private final AtomicInteger workload = new AtomicInteger(); + /** + * Number of items that have already been processed, across all consumers. + */ + private final AtomicInteger completed = new AtomicInteger(); + + ProgressReporter(Function printer, Duration reportInterval) { + this.printer = printer; + this.reportInterval = reportInterval; + this.nextReportTime = now().plus(reportInterval); + } + + /** + * Deals with one serial stream of work. + */ + class Consumer implements FlushableConsumer { + private final FlushableConsumer base; + private final List pool = new ArrayList<>(); + + Consumer(FlushableConsumer base) { + this.base = base; + } + + @Override + public void accept(T t) { + pool.add(t); + workload.incrementAndGet(); + } + + @Override + public void flush() throws IOException { + for (T x : pool) { + synchronized (ProgressReporter.this) { + if (now().isAfter(nextReportTime)) { + print(completed.get(), workload.get(), x); + nextReportTime = now().plus(reportInterval); + } + } + base.accept(x); + completed.incrementAndGet(); + } + pool.clear(); + base.flush(); + } + + @Override + public void close() throws IOException { + flush(); + } + } + + Consumer newConsumer(FlushableConsumer base) { + return new Consumer(base); + } + + protected void print(int c, int w, T x) { + int width = String.valueOf(w).length(); + System.err.printf("%s/%d: %s%n", pad(c, width), w, printer.apply(x)); + } + + static String pad(int i, int width) { + String s = String.valueOf(i); + while (s.length() < width) { + s = " " + s; + } + return s; + } +} diff --git a/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java b/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java deleted file mode 100644 index 858f63a47..000000000 --- a/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.launchableinc.ingest.commits; - -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; - -import static java.time.Instant.now; - -/** - * Given a slow {@link Consumer} that goes over a large number of items, - * provide a progress report to show that the work is still in progress. - */ -class ProgressReportingConsumer implements FlushableConsumer, AutoCloseable { - private final FlushableConsumer base; - private final List pool = new ArrayList<>(); - private final Function printer; - private final Duration reportInterval; - private int round = 1; - - ProgressReportingConsumer(FlushableConsumer base, Function printer, Duration reportInterval) { - this.base = base; - this.printer = printer; - this.reportInterval = reportInterval; - } - - @Override - public void accept(T t) { - pool.add(t); - } - - @Override - public void flush() throws IOException { - Instant nextReportTime = now().plus(reportInterval); - int width = String.valueOf(pool.size()).length(); - int i = 0; - for (T x : pool) { - i++; - if (now().isAfter(nextReportTime)) { - System.err.printf("%s%s/%d: %s%n", round(), pad(i, width), pool.size(), printer.apply(x)); - nextReportTime = now().plus(reportInterval); - } - base.accept(x); - } - pool.clear(); - base.flush(); - round++; - } - - private String round() { - if (round==1) { - // most of the time, there's only one round, so let's not bother - return ""; - } else { - return String.format("#%d ", round); - } - } - - @Override - public void close() throws IOException { - flush(); - } - - static String pad(int i, int width) { - String s = String.valueOf(i); - while (s.length() < width) { - s = " " + s; - } - return s; - } -} diff --git a/src/test/java/com/launchableinc/ingest/commits/AllTests.java b/src/test/java/com/launchableinc/ingest/commits/AllTests.java index dc1315cab..1124da7df 100644 --- a/src/test/java/com/launchableinc/ingest/commits/AllTests.java +++ b/src/test/java/com/launchableinc/ingest/commits/AllTests.java @@ -6,10 +6,12 @@ @RunWith(Suite.class) @SuiteClasses({ + BoundedExecutorServiceTest.class, CommitGraphCollectorTest.class, - MainTest.class, + ConcurrentConsumerTest.class, FileChunkStreamerTest.class, + MainTest.class, SSLBypassTest.class, - ProgressReportingConsumerTest.class + ProgressReporterTest.class }) public class AllTests {} diff --git a/src/test/java/com/launchableinc/ingest/commits/BoundedExecutorServiceTest.java b/src/test/java/com/launchableinc/ingest/commits/BoundedExecutorServiceTest.java new file mode 100644 index 000000000..2c45411ab --- /dev/null +++ b/src/test/java/com/launchableinc/ingest/commits/BoundedExecutorServiceTest.java @@ -0,0 +1,93 @@ +package com.launchableinc.ingest.commits; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class BoundedExecutorServiceTest { + @Test + public void basicExecution() throws InterruptedException { + BoundedExecutorService executor = new BoundedExecutorService(2); + List results = Collections.synchronizedList(new ArrayList<>()); + + try { + executor.execute(() -> results.add(1)); + executor.execute(() -> results.add(2)); + executor.execute(() -> results.add(3)); + } finally { + executor.shutdown(); + executor.awaitTermination(5, SECONDS); + } + + assertThat(results).containsExactly(1, 2, 3); + } + + @Test + public void enforcesConcurrencyLimit() throws InterruptedException { + int limit = 2; + BoundedExecutorService executor = new BoundedExecutorService(limit); + CountDownLatch startLatch = new CountDownLatch(limit); + CountDownLatch releaseLatch = new CountDownLatch(1); + AtomicInteger concurrentCount = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + AtomicInteger submittedCount = new AtomicInteger(0); + + Runnable task = () -> { + try { + int current = concurrentCount.incrementAndGet(); + maxConcurrent.updateAndGet(max -> Math.max(max, current)); + startLatch.countDown(); + releaseLatch.await(); + concurrentCount.decrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + // Use a separate thread to sequentially submit tasks + Thread submitter = new Thread(() -> { + for (int i = 0; i < 4; i++) { + executor.execute(task); + submittedCount.incrementAndGet(); + } + }); + + try { + submitter.start(); + + // Wait for limit number of tasks to start + startLatch.await(1, SECONDS); + + // Should have exactly 'limit' tasks running + assertThat(concurrentCount.get()).isEqualTo(limit); + + // The submitter thread should be blocked trying to submit the 3rd task + // Only 2 tasks should have been submitted successfully + assertThat(submittedCount.get()).isEqualTo(2); + + // Release all tasks + releaseLatch.countDown(); + + // Wait for submitter to finish + submitter.join(5000); + } finally { + executor.shutdown(); + executor.awaitTermination(5, SECONDS); + } + + // Verify that concurrent execution never exceeded the limit + assertThat(maxConcurrent.get()).isAtMost(limit); + // All 4 tasks should have eventually been submitted + assertThat(submittedCount.get()).isEqualTo(4); + } +} diff --git a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java index b069bee94..117e6ab85 100644 --- a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java @@ -90,7 +90,7 @@ public void bareRepo() throws Exception { try (Repository r = Git.open(barerepoDir).getRepository()) { CommitGraphCollector cgc = collectCommit(r, ImmutableList.of()); assertThat(cgc.getCommitsSent()).isEqualTo(1); - assertThat(cgc.getFilesSent()).isEqualTo(2); // header + .gitmodules + assertThat(cgc.getFilesSent()).isEqualTo(1); // .gitmodules } } @@ -121,7 +121,7 @@ public void chunking() throws Exception { 2); } assertThat(councCommitChunks[0]).isEqualTo(2); - assertThat(countFilesChunks[0]).isEqualTo(3); // header, a, .gitmodules, and header, sub/x, 5 files, 3 chunks + assertThat(countFilesChunks[0]).isEqualTo(2); // a, .gitmodules, and sub/x, 3 files, 2 chunks } private void assertValidTar(ContentProducer content) throws IOException { @@ -176,14 +176,8 @@ public void header() throws Exception { CommitGraphCollector cgc = new CommitGraphCollector("test", mainrepo.getRepository()); cgc.collectFiles(true); - cgc.new ByRepository(mainrepo.getRepository(), "main") - .transfer(Collections.emptyList(), c -> {}, - new PassThroughTreeReceiverImpl(), - FlushableConsumer.of(files::add)); - // header for the main repo, 'gitmodules', header for the sub repo, 'a', and 'x' in the sub repo - assertThat(files).hasSize(5); - VirtualFile header = files.get(2); + VirtualFile header = cgc.new ByRepository(mainrepo.getRepository(), "main").buildHeader(); assertThat(header.path()).isEqualTo(CommitGraphCollector.HEADER_FILE); JsonNode tree = assertValidJson(header::writeTo).get("tree"); assertThat(tree.isArray()).isTrue(); @@ -193,7 +187,7 @@ cgc.new ByRepository(mainrepo.getRepository(), "main") paths.add(i.get("path").asText()); } - assertThat(paths).containsExactly("a", "x"); + assertThat(paths).containsExactly(".gitmodules", "sub"); } } diff --git a/src/test/java/com/launchableinc/ingest/commits/ConcurrentConsumerTest.java b/src/test/java/com/launchableinc/ingest/commits/ConcurrentConsumerTest.java new file mode 100644 index 000000000..a448ab97e --- /dev/null +++ b/src/test/java/com/launchableinc/ingest/commits/ConcurrentConsumerTest.java @@ -0,0 +1,124 @@ +package com.launchableinc.ingest.commits; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +public class ConcurrentConsumerTest { + @Test + public void basicProcessing() throws IOException { + ExecutorService executor = Executors.newFixedThreadPool(2); + List processed = Collections.synchronizedList(new ArrayList<>()); + + try (ConcurrentConsumer consumer = new ConcurrentConsumer<>(processed::add, executor)) { + consumer.accept(1); + consumer.accept(2); + consumer.accept(3); + } finally { + executor.shutdown(); + } + + assertThat(processed.size()).isEqualTo(3); + assertThat(processed).contains(1); + assertThat(processed).contains(2); + assertThat(processed).contains(3); + } + + @Test + public void concurrentProcessing() throws IOException, InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(3); + CountDownLatch latch = new CountDownLatch(3); + AtomicInteger concurrentCount = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + + IOConsumer slowConsumer = (i) -> { + try { + int current = concurrentCount.incrementAndGet(); + maxConcurrent.updateAndGet(max -> Math.max(max, current)); + latch.countDown(); + + // Wait for all three to be running concurrently + latch.await(); + Thread.sleep(50); + concurrentCount.decrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + try (ConcurrentConsumer consumer = new ConcurrentConsumer<>(slowConsumer, executor)) { + consumer.accept(1); + consumer.accept(2); + consumer.accept(3); + } finally { + executor.shutdown(); + } + + assertThat(maxConcurrent.get()).isEqualTo(3); + } + + @Test + public void exceptionPropagation() { + ExecutorService executor = Executors.newFixedThreadPool(2); + + IOConsumer throwingConsumer = (i) -> { + if (i == 2) { + throw new IOException("Test exception"); + } + }; + + ConcurrentConsumer consumer = new ConcurrentConsumer<>(throwingConsumer, executor); + consumer.accept(1); + consumer.accept(2); + consumer.accept(3); + + // Expected - exception should be thrown on close + try { + consumer.close(); + fail("Expected IOException was not thrown"); + } catch (IOException e) { + assertThat(e.getMessage()).contains("Test exception"); + } finally { + executor.shutdown(); + } + } + + @Test + public void closeWaitsForCompletion() throws IOException { + ExecutorService executor = Executors.newFixedThreadPool(2); + AtomicInteger completed = new AtomicInteger(0); + + IOConsumer slowConsumer = (i) -> { + try { + Thread.sleep(100); + completed.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + try (ConcurrentConsumer consumer = new ConcurrentConsumer<>(slowConsumer, executor)) { + consumer.accept(1); + consumer.accept(2); + consumer.accept(3); + + // Items should still be processing + assertThat(completed.get()).isLessThan(3); + } finally { + executor.shutdown(); + } + + // After close(), all items should be completed + assertThat(completed.get()).isEqualTo(3); + } +} diff --git a/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java index 3d9cc7e70..01b946558 100644 --- a/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java @@ -20,7 +20,7 @@ public class FileChunkStreamerTest { @Test public void no_op_if_no_content() throws Exception { - try (FileChunkStreamer fs = new FileChunkStreamer(content -> fail(), 2)) { + try (FileChunkStreamer fs = new FileChunkStreamer(null, content -> fail(), 2)) { // no write } } @@ -28,13 +28,13 @@ public void no_op_if_no_content() throws Exception { @Test public void basics() throws Exception { int[] count = new int[1]; - try (FileChunkStreamer fs = new FileChunkStreamer(content -> { + try (FileChunkStreamer fs = new FileChunkStreamer(new VirtualFileImpl("header.txt"), content -> { switch(count[0]++) { case 0: - assertThat(readEntries(content)).containsExactly("foo.txt", "bar.txt").inOrder(); + assertThat(readEntries(content)).containsExactly("header.txt", "foo.txt", "bar.txt").inOrder(); break; case 1: - assertThat(readEntries(content)).containsExactly("zot.txt").inOrder(); + assertThat(readEntries(content)).containsExactly("header.txt", "zot.txt").inOrder(); break; default: fail(); diff --git a/src/test/java/com/launchableinc/ingest/commits/ProgressReporterTest.java b/src/test/java/com/launchableinc/ingest/commits/ProgressReporterTest.java new file mode 100644 index 000000000..b518c73d9 --- /dev/null +++ b/src/test/java/com/launchableinc/ingest/commits/ProgressReporterTest.java @@ -0,0 +1,95 @@ +package com.launchableinc.ingest.commits; + +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static com.google.common.truth.Truth.*; +import static java.util.Collections.*; + +public class ProgressReporterTest { + ProgressReporter pr = new ProgressReporter(String::valueOf, Duration.ofMillis(100)) { + int cc = 0; + int ww = 0; + @Override + protected void print(int c, int w, String x) { + super.print(c, w, x); + + // ensure numbers are monotonically increasing + assertThat(c).isAtLeast(cc); + assertThat(w).isAtLeast(ww); + cc = c; + ww = w; + } + }; + + /** + * Tests the most important bit -- that all items are processed. + */ + @Test + public void serial() throws Exception { + List done = new ArrayList<>(); + try (ProgressReporter.Consumer x = pr.newConsumer(FlushableConsumer.of(s -> { + done.add(s); + sleep(); + }))) { + for (int i = 0; i < 100; i++) { + x.accept("item " + i); + } + } + assertThat(done.size()).isEqualTo(100); + } + + /** + * Perform work in parallel and make sure they all do get processed. + */ + @Test + public void parallel() throws Exception { + Set done = synchronizedSet(new HashSet<>()); + + ExecutorService es = Executors.newFixedThreadPool(10); + List> all = new ArrayList<>(); + for (int i=0; i<10; i++) { + final int ii = i; + all.add(es.submit(() -> { + try (ProgressReporter.Consumer x = pr.newConsumer(FlushableConsumer.of(s -> {done.add(s);sleep();}))) { + for (int j = 0; j < 100; j++) { + x.accept("item " + (ii*100+j)); + } + return null; + } + })); + } + for (Future f : all) { + f.get(); + } + es.shutdown(); + + assertThat(done.size()).isEqualTo(1000); + for (int i=0; i<1000; i++) { + assertThat(done).contains("item " + i); + } + } + + private static void sleep() { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new UnsupportedOperationException(); + } + } + + @Test + public void pad() { + assertThat(ProgressReporter.pad(5,3)).isEqualTo(" 5"); + assertThat(ProgressReporter.pad(15,3)).isEqualTo(" 15"); + assertThat(ProgressReporter.pad(1234,3)).isEqualTo("1234"); + } +} diff --git a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java b/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java deleted file mode 100644 index 3c655a95c..000000000 --- a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.launchableinc.ingest.commits; - -import com.google.common.truth.Truth; -import org.junit.Test; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -public class ProgressReportingConsumerTest { - @Test - public void basic() throws IOException { - List done = new ArrayList<>(); - try (ProgressReportingConsumer x = new ProgressReportingConsumer<>(FlushableConsumer.of(s -> {done.add(s);sleep();}), String::valueOf, Duration.ofMillis(100))) { - for (int i = 0; i < 100; i++) { - x.accept("item " + i); - } - } - Truth.assertThat(done.size()).isEqualTo(100); - } - - private static void sleep() { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new UnsupportedOperationException(); - } - } -}