diff --git a/README.md b/README.md index d613e0d4c..029e74558 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,9 @@ configuration for [pre-commit](https://pre-commit.com). Install the hook with pipenv shell ``` +From this shell, run `pip install -e .` to create `launchable` wrapper script from the workspace. +This is useful to test the CLI with real/local server. + ## Run tests cli ```shell diff --git a/launchable/jar/exe_deploy.jar b/launchable/jar/exe_deploy.jar index 5800a9055..5d5e8a75e 100755 Binary files a/launchable/jar/exe_deploy.jar and b/launchable/jar/exe_deploy.jar differ diff --git a/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java index 7be425e2a..d6ef08fce 100644 --- a/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java +++ b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java @@ -2,7 +2,6 @@ import org.apache.http.entity.ContentProducer; -import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; @@ -12,7 +11,7 @@ /** * Accepts T, buffers them, and writes them out as a batch. */ -abstract class ChunkStreamer implements FlushableConsumer, Closeable { +abstract class ChunkStreamer implements FlushableConsumer { /** * Encapsulation of how batches are sent. */ diff --git a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java index 3ac660f2b..30951ebb7 100644 --- a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java +++ b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java @@ -58,7 +58,12 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.zip.GZIPOutputStream; @@ -87,7 +92,7 @@ public class CommitGraphCollector { */ private final Repository root; - private int commitsSent, filesSent; + private final AtomicInteger commitsSent = new AtomicInteger(), filesSent = new AtomicInteger(); private boolean collectCommitMessage, collectFiles; @@ -119,11 +124,11 @@ public CommitGraphCollector(String name, Repository git) { /** How many commits did we transfer? */ public int getCommitsSent() { - return commitsSent; + return commitsSent.get(); } public int getFilesSent() { - return filesSent; + return filesSent.get(); } private String dumpHeaderAsJson(Header[] headers) throws JsonProcessingException { @@ -271,10 +276,37 @@ public void transfer( Collection advertised, IOConsumer commitSender, TreeReceiver treeReceiver, IOConsumer fileSender, int chunkSize) throws IOException { ByRepository r = new ByRepository(root, rootName); - try (CommitChunkStreamer cs = new CommitChunkStreamer(commitSender, chunkSize); - FileChunkStreamer fs = new FileChunkStreamer(fileSender, chunkSize); - ProgressReportingConsumer fsr = new ProgressReportingConsumer<>(fs, VirtualFile::path, Duration.ofSeconds(3))) { - r.transfer(advertised, cs, treeReceiver, fsr); + + ExecutorService es = Executors.newFixedThreadPool(4); + // for debugging +// ExecutorService es = MoreExecutors.newDirectExecutorService(); + + ProgressReporter pr = new ProgressReporter<>(VirtualFile::path, Duration.ofSeconds(3)); + try { + r.forEachSubModule(es, br -> { + if (collectFiles) { + // record all the necessary BLOBs first, before attempting to record its commit. + // this way, if the file collection fails, the server won't see this commit, so the future + // "record commit" invocation will retry the file collection, thereby making the behavior idempotent. + // TODO: file transfer can be parallelized more aggressively, where we send chunks in parallel + try (FileChunkStreamer fs = new FileChunkStreamer(r.buildHeader(), fileSender, chunkSize); + ProgressReporter.Consumer fsr = pr.newConsumer(fs)) { + br.collectFiles(advertised, treeReceiver, fsr); + } + } + + // we need to send commits in the topological order, so any parallelization within a repository + // is probably not worth the effort. + // TODO: If we process a repository and that doesn't create enough commits + // to form a full chunk, then it makes sense to concatenate them with other commits from other repositories. + // Even when # of repos is large, incremental transfer typically only produces a small amount of commits + // per repo, so this will considerably reduce the connection setup / tear down overhead. + try (CommitChunkStreamer cs = new CommitChunkStreamer(commitSender, chunkSize)) { + br.collectCommits(advertised, cs); + } + }); + } finally { + es.shutdown(); } } @@ -323,83 +355,39 @@ final class ByRepository implements AutoCloseable { private final ObjectReader objectReader; private final Set shallowCommits; + private final ObjectId headId; ByRepository(Repository git, String name) throws IOException { this.name = name; this.git = git; this.objectReader = git.newObjectReader(); this.shallowCommits = objectReader.getShallowCommits(); + this.headId = git.resolve("HEAD"); } - /** - * Writes delta between local commits to the advertised to JSON stream. - * - * @param commitReceiver Receives commits that should be sent, one by one. - */ - public void transfer(Collection advertised, Consumer commitReceiver, TreeReceiver treeReceiver, FlushableConsumer fileReceiver) - throws IOException { - try (RevWalk walk = new RevWalk(git); TreeWalk treeWalk = new TreeWalk(git)) { - // walk reverse topological order, so that older commits get added to the server earlier. - // This way, the connectivity of the commit graph will be always maintained - walk.sort(RevSort.TOPO); - walk.sort(RevSort.REVERSE, true); - // also combine this with commit time based ordering, so that we can stop walking when we - // find old enough commits AFAICT, this is no-op in JGit and it always sorts things in - // commit time order, but it is in the contract, so I'm assuming we shouldn't rely on the - // implementation optimization that's currently enabling this all the time - walk.sort(RevSort.COMMIT_TIME_DESC, true); - - ObjectId headId = git.resolve("HEAD"); - RevCommit start = walk.parseCommit(headId); - walk.markStart(start); - treeWalk.addTree(start.getTree()); - - // don't walk commits too far back. - // for our purpose of computing CUT, these are unlikely to contribute meaningfully - // and it drastically cuts down the initial commit consumption of a new large repository. - // ... except we do want to capture the head commit, as that makes it easier to spot integration problems - // when `record build` and `record commit` are separated. - - // two RevFilters are order sensitive. This is because CommitTimeRevFilter.after doesn't return false to - // filter things out, it throws StopWalkException to terminate the walk, never giving a chance for the other - // branch of OR to be evaluated. So we need to put ObjectRevFilter first. - walk.setRevFilter( - OrRevFilter.create( - new ObjectRevFilter(headId), - CommitTimeRevFilter.after(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxDays)))); - - for (ObjectId id : advertised) { - try { - RevCommit c = walk.parseCommit(id); - walk.markUninteresting(c); - if (!reportAllFiles) { - treeWalk.addTree(c.getTree()); - } - } catch (MissingObjectException e) { - // it's possible that the server advertises a commit we don't have. - // - // TODO: how does git-push handles the case when the client doesn't recognize commits? - // Unless it tries to negotiate further what commits they have in common, - // git-upload-pack can end up creating a big pack with lots of redundant objects - // - // think about a case when a client is pushing a new branch against - // the master branch that moved on the server. - } - } - - // record all the necessary BLOBs first, before attempting to record its commit. - // this way, if the file collection fails, the server won't see this commit, so the future - // "record commit" invocation will retry the file collection, thereby making the behavior idempotent. - collectFiles(start, treeWalk, treeReceiver, fileReceiver); - fileReceiver.flush(); - - // walk the commits, transform them, and send them to the commitReceiver - for (RevCommit c : walk) { - commitReceiver.accept(transform(c)); - commitsSent++; + void forEachSubModule(ExecutorService threadPool, IOConsumer consumer) throws IOException { + for (Future f : forEachSubModuleAsync(threadPool, consumer)) { + try { + f.get(); + } catch (Exception e) { + throw new IOException("Failed to process a repository", e); } } + } + /** + * Recursively iterate all the sub-modules and apply the given consumer to them asynchronously, using the given + * thread pool. + * + *

+ * The way this function mixes (1) synchronous call to consumer with {@code this}, (2) use thread pool to recursively + * process submodules might be a bit hard to follow. This was motivated by the fact that {@link ByRepository} for + * sub-modules need to be closed, while {@code this} shouldn't be closed. + * + * @return all the async jobs that are forked off, to allow the caller to wait for their completion. + */ + Collection> forEachSubModuleAsync(ExecutorService threadPool, IOConsumer consumer) throws IOException { + Vector> jobs = new Vector<>(); /* Git submodule support ===================== @@ -418,73 +406,108 @@ That is, find submodules that are available in the working tree (thus `!isBare() if (!git.isBare()) { try (SubmoduleWalk swalk = SubmoduleWalk.forIndex(git)) { while (swalk.next()) { - try (Repository subRepo = swalk.getRepository()) { - if (subRepo != null) { - try { - try (ByRepository br = new ByRepository(subRepo, name + "/" + swalk.getModulesPath())) { - br.transfer(advertised, commitReceiver, treeReceiver, fileReceiver); + Repository subRepo = swalk.getRepository(); + if (subRepo != null) { + try { + ByRepository br = new ByRepository(subRepo, name + "/" + swalk.getModulesPath()); + jobs.add(threadPool.submit(() -> { + try { + jobs.addAll(br.forEachSubModuleAsync(threadPool, consumer)); + return null; + } finally { + br.close(); + subRepo.close(); } - } catch (ConfigInvalidException e) { - throw new IOException("Invalid Git submodule configuration: " + git.getDirectory(), e); - } + })); + } catch (ConfigInvalidException e) { + throw new IOException("Invalid Git submodule configuration: " + git.getDirectory(), e); } } } } } + + consumer.accept(this); + + return jobs; + } + + private void parseEachCommit(RevWalk walk, Collection advertised, IOConsumer consumer) throws IOException { + for (ObjectId id : advertised) { + try { + RevCommit c = walk.parseCommit(id); + consumer.accept(c); + } catch (MissingObjectException e) { + // it's possible that the server advertises a commit we don't have. + // + // TODO: how does git-push handles the case when the client doesn't recognize commits? + // Unless it tries to negotiate further what commits they have in common, + // git-upload-pack can end up creating a big pack with lots of redundant objects + // + // think about a case when a client is pushing a new branch against + // the master branch that moved on the server. + } + } } /** - * treeWalk contains the HEAD (the interesting commit) at the 0th position, then all the commits - * the server advertised in the 1st, 2nd, ... - * Our goal here is to find all the files that the server hasn't seen yet. We'll send them to the tree receiver, - * which further responds with the actual files we need to send to the server. + * Records all the necessary BLOBs first */ - private void collectFiles(RevCommit start, TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer fileReceiver) throws IOException { - if (!collectFiles) { - return; - } + void collectFiles(Collection advertised, TreeReceiver treeReceiver, FlushableConsumer fileReceiver) throws IOException { + try (TreeWalk treeWalk = new TreeWalk(git)) { + RevCommit start = git.parseCommit(headId); + treeWalk.addTree(start.getTree()); + + if (!reportAllFiles) { + // to optimize data transfer, skip files that the server has already seen + // i.e., files that are present in any of the advertised commits + // if the reportAllFiles flag is on, then skip this optimization on the client side. + // treeReceiver will still provide an opportunity for the server to be selective. + try (RevWalk walk = new RevWalk(git)) { + parseEachCommit(walk, advertised, c -> treeWalk.addTree(c.getTree())); + } + } - int c = treeWalk.getTreeCount(); + int c = treeWalk.getTreeCount(); - OUTER: - while (treeWalk.next()) { - ObjectId head = treeWalk.getObjectId(0); - for (int i = 1; i < c; i++) { - if (head.equals(treeWalk.getObjectId(i))) { - // file at the head is identical to one of the uninteresting commits, - // meaning we have already seen this file/directory on the server. - // if it is a dir, there's no need to visit this whole subtree, so skip over - continue OUTER; + OUTER: + while (treeWalk.next()) { + ObjectId head = treeWalk.getObjectId(0); + for (int i = 1; i < c; i++) { + if (head.equals(treeWalk.getObjectId(i))) { + // file at the head is identical to one of the uninteresting commits, + // meaning we have already seen this file/directory on the server. + // if it is a dir, there's no need to visit this whole subtree, so skip over + continue OUTER; + } } - } - if (treeWalk.isSubtree()) { - treeWalk.enterSubtree(); - } else { - if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) { - GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader); - // to avoid excessive data transfer, skip files that are too big - if (f.size() < 1024 * 1024 && f.isText() && !f.path.equals(HEADER_FILE)) { - treeReceiver.accept(f); + if (treeWalk.isSubtree()) { + treeWalk.enterSubtree(); + } else { + if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) { + GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader); + // to avoid excessive data transfer, skip files that are too big + if (f.size() < 1024 * 1024 && f.isText() && !f.path.equals(HEADER_FILE)) { + treeReceiver.accept(f); + } } } } - } - // Note(Konboi): To balance the order, since words like "test" and "spec" tend to appear - // toward the end in alphabetical sorting. - List files = new ArrayList<>(treeReceiver.response()); - if (!files.isEmpty()) { - fileReceiver.accept(buildHeader(start)); - filesSent++; + // Now let the server select the files it actually wants to see + List files = new ArrayList<>(treeReceiver.response()); + // Note(Konboi): To balance the order, since words like "test" and "spec" tend to appear + // toward the end in alphabetical sorting. Collections.shuffle(files); for (VirtualFile f : files) { fileReceiver.accept(f); - filesSent++; + filesSent.incrementAndGet(); } + + fileReceiver.flush(); } } @@ -492,7 +515,7 @@ private void collectFiles(RevCommit start, TreeWalk treeWalk, TreeReceiver treeR * Creates a per repository "header" file as a {@link VirtualFile}. * Currently, this is just the list of files in the repository. */ - private VirtualFile buildHeader(RevCommit start) throws IOException { + VirtualFile buildHeader() throws IOException { ByteArrayOutputStream os = new ByteArrayOutputStream(); try (JsonGenerator w = new JsonFactory().createGenerator(os)) { w.setCodec(objectMapper); @@ -500,7 +523,7 @@ private VirtualFile buildHeader(RevCommit start) throws IOException { w.writeArrayFieldStart("tree"); try (TreeWalk tw = new TreeWalk(git)) { - tw.addTree(start.getTree()); + tw.addTree(git.parseCommit(headId).getTree()); tw.setRecursive(true); while (tw.next()) { @@ -516,6 +539,43 @@ private VirtualFile buildHeader(RevCommit start) throws IOException { return VirtualFile.from(name, HEADER_FILE, ObjectId.zeroId(), os.toByteArray()); } + void collectCommits(Collection advertised, Consumer commitReceiver) throws IOException { + try (RevWalk walk = new RevWalk(git)) { + // walk reverse topological order, so that older commits get added to the server earlier. + // This way, the connectivity of the commit graph will be always maintained + walk.sort(RevSort.TOPO); + walk.sort(RevSort.REVERSE, true); + // also combine this with commit time based ordering, so that we can stop walking when we + // find old enough commits AFAICT, this is no-op in JGit and it always sorts things in + // commit time order, but it is in the contract, so I'm assuming we shouldn't rely on the + // implementation optimization that's currently enabling this all the time + walk.sort(RevSort.COMMIT_TIME_DESC, true); + + walk.markStart(walk.parseCommit(headId)); + + // don't walk commits too far back. + // for our purpose of computing CUT, these are unlikely to contribute meaningfully + // and it drastically cuts down the initial commit consumption of a new large repository. + // ... except we do want to capture the head commit, as that makes it easier to spot integration problems + // when `record build` and `record commit` are separated. + + // two RevFilters are order sensitive. This is because CommitTimeRevFilter.after doesn't return false to + // filter things out, it throws StopWalkException to terminate the walk, never giving a chance for the other + // branch of OR to be evaluated. So we need to put ObjectRevFilter first. + walk.setRevFilter( + OrRevFilter.create( + new ObjectRevFilter(headId), + CommitTimeRevFilter.after(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxDays)))); + + parseEachCommit(walk, advertised, walk::markUninteresting); + + // walk the commits, transform them, and send them to the commitReceiver + for (RevCommit c : walk) { + commitReceiver.accept(transform(c)); + commitsSent.incrementAndGet(); + } + } + } private JSCommit transform(RevCommit r) throws IOException { JSCommit c = new JSCommit(); diff --git a/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java index 698c69438..fad3dab02 100644 --- a/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java +++ b/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java @@ -14,8 +14,11 @@ * Receives {@link GitFile}, buffers them, and writes them out in a gzipped tar file. */ final class FileChunkStreamer extends ChunkStreamer { - FileChunkStreamer(IOConsumer sender, int chunkSize) { + private final VirtualFile header; + + FileChunkStreamer(VirtualFile header, IOConsumer sender, int chunkSize) { super(sender, chunkSize); + this.header = header; } @Override @@ -23,14 +26,22 @@ protected void writeTo(List files, OutputStream os) throws IOExcept try (TarArchiveOutputStream tar = new TarArchiveOutputStream(os, "UTF-8")) { tar.setLongFileMode(LONGFILE_POSIX); + if (header!=null) { + write(header, tar); + } + for (VirtualFile f : files) { - TarArchiveEntry e = new TarArchiveEntry(f.path()); - e.setSize(f.size()); - e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID - tar.putArchiveEntry(e); - f.writeTo(tar); - tar.closeArchiveEntry(); + write(f, tar); } } } + + private static void write(VirtualFile f, TarArchiveOutputStream tar) throws IOException { + TarArchiveEntry e = new TarArchiveEntry(f.path()); + e.setSize(f.size()); + e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID + tar.putArchiveEntry(e); + f.writeTo(tar); + tar.closeArchiveEntry(); + } } diff --git a/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java index 7eb8c701d..8e3272d14 100644 --- a/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java +++ b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java @@ -1,12 +1,13 @@ package com.launchableinc.ingest.commits; +import java.io.Closeable; import java.io.IOException; import java.util.function.Consumer; /** * Consumers that spool items it accepts and process them in bulk. */ -public interface FlushableConsumer extends Consumer { +public interface FlushableConsumer extends Consumer, Closeable { /** * Process all items that have been accepted so far. */ @@ -15,7 +16,12 @@ public interface FlushableConsumer extends Consumer { static FlushableConsumer of(Consumer c) { return new FlushableConsumer() { @Override - public void flush() throws IOException { + public void flush() { + // noop + } + + @Override + public void close() { // noop } diff --git a/src/main/java/com/launchableinc/ingest/commits/ProgressReporter.java b/src/main/java/com/launchableinc/ingest/commits/ProgressReporter.java new file mode 100644 index 000000000..d5ba0e7de --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/ProgressReporter.java @@ -0,0 +1,93 @@ +package com.launchableinc.ingest.commits; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static java.time.Instant.now; + +/** + * Given multiple concurrent slow {@link Consumer}s, each g oing over a large + * number of items in parallel, + * provide a progress report to show that the work is still in progress. + */ +class ProgressReporter { + private final Function printer; + private final Duration reportInterval; + private Instant nextReportTime; + + /** + * Number of items that need to be processed, across all consumers. + */ + private final AtomicInteger workload = new AtomicInteger(); + /** + * Number of items that have already been processed, across all consumers. + */ + private final AtomicInteger completed = new AtomicInteger(); + + ProgressReporter(Function printer, Duration reportInterval) { + this.printer = printer; + this.reportInterval = reportInterval; + this.nextReportTime = now().plus(reportInterval); + } + + /** + * Deals with one serial stream of work. + */ + class Consumer implements FlushableConsumer { + private final FlushableConsumer base; + private final List pool = new ArrayList<>(); + + Consumer(FlushableConsumer base) { + this.base = base; + } + + @Override + public void accept(T t) { + pool.add(t); + workload.incrementAndGet(); + } + + @Override + public void flush() throws IOException { + for (T x : pool) { + synchronized (ProgressReporter.this) { + if (now().isAfter(nextReportTime)) { + print(completed.get(), workload.get(), x); + nextReportTime = now().plus(reportInterval); + } + } + base.accept(x); + completed.incrementAndGet(); + } + pool.clear(); + base.flush(); + } + + @Override + public void close() throws IOException { + flush(); + } + } + + Consumer newConsumer(FlushableConsumer base) { + return new Consumer(base); + } + + protected void print(int c, int w, T x) { + int width = String.valueOf(w).length(); + System.err.printf("%s/%d: %s%n", pad(c, width), w, printer.apply(x)); + } + + static String pad(int i, int width) { + String s = String.valueOf(i); + while (s.length() < width) { + s = " " + s; + } + return s; + } +} diff --git a/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java b/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java deleted file mode 100644 index 858f63a47..000000000 --- a/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.launchableinc.ingest.commits; - -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; - -import static java.time.Instant.now; - -/** - * Given a slow {@link Consumer} that goes over a large number of items, - * provide a progress report to show that the work is still in progress. - */ -class ProgressReportingConsumer implements FlushableConsumer, AutoCloseable { - private final FlushableConsumer base; - private final List pool = new ArrayList<>(); - private final Function printer; - private final Duration reportInterval; - private int round = 1; - - ProgressReportingConsumer(FlushableConsumer base, Function printer, Duration reportInterval) { - this.base = base; - this.printer = printer; - this.reportInterval = reportInterval; - } - - @Override - public void accept(T t) { - pool.add(t); - } - - @Override - public void flush() throws IOException { - Instant nextReportTime = now().plus(reportInterval); - int width = String.valueOf(pool.size()).length(); - int i = 0; - for (T x : pool) { - i++; - if (now().isAfter(nextReportTime)) { - System.err.printf("%s%s/%d: %s%n", round(), pad(i, width), pool.size(), printer.apply(x)); - nextReportTime = now().plus(reportInterval); - } - base.accept(x); - } - pool.clear(); - base.flush(); - round++; - } - - private String round() { - if (round==1) { - // most of the time, there's only one round, so let's not bother - return ""; - } else { - return String.format("#%d ", round); - } - } - - @Override - public void close() throws IOException { - flush(); - } - - static String pad(int i, int width) { - String s = String.valueOf(i); - while (s.length() < width) { - s = " " + s; - } - return s; - } -} diff --git a/src/test/java/com/launchableinc/ingest/commits/AllTests.java b/src/test/java/com/launchableinc/ingest/commits/AllTests.java index dc1315cab..e5d2426ef 100644 --- a/src/test/java/com/launchableinc/ingest/commits/AllTests.java +++ b/src/test/java/com/launchableinc/ingest/commits/AllTests.java @@ -10,6 +10,6 @@ MainTest.class, FileChunkStreamerTest.class, SSLBypassTest.class, - ProgressReportingConsumerTest.class + ProgressReporterTest.class }) public class AllTests {} diff --git a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java index b069bee94..117e6ab85 100644 --- a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java @@ -90,7 +90,7 @@ public void bareRepo() throws Exception { try (Repository r = Git.open(barerepoDir).getRepository()) { CommitGraphCollector cgc = collectCommit(r, ImmutableList.of()); assertThat(cgc.getCommitsSent()).isEqualTo(1); - assertThat(cgc.getFilesSent()).isEqualTo(2); // header + .gitmodules + assertThat(cgc.getFilesSent()).isEqualTo(1); // .gitmodules } } @@ -121,7 +121,7 @@ public void chunking() throws Exception { 2); } assertThat(councCommitChunks[0]).isEqualTo(2); - assertThat(countFilesChunks[0]).isEqualTo(3); // header, a, .gitmodules, and header, sub/x, 5 files, 3 chunks + assertThat(countFilesChunks[0]).isEqualTo(2); // a, .gitmodules, and sub/x, 3 files, 2 chunks } private void assertValidTar(ContentProducer content) throws IOException { @@ -176,14 +176,8 @@ public void header() throws Exception { CommitGraphCollector cgc = new CommitGraphCollector("test", mainrepo.getRepository()); cgc.collectFiles(true); - cgc.new ByRepository(mainrepo.getRepository(), "main") - .transfer(Collections.emptyList(), c -> {}, - new PassThroughTreeReceiverImpl(), - FlushableConsumer.of(files::add)); - // header for the main repo, 'gitmodules', header for the sub repo, 'a', and 'x' in the sub repo - assertThat(files).hasSize(5); - VirtualFile header = files.get(2); + VirtualFile header = cgc.new ByRepository(mainrepo.getRepository(), "main").buildHeader(); assertThat(header.path()).isEqualTo(CommitGraphCollector.HEADER_FILE); JsonNode tree = assertValidJson(header::writeTo).get("tree"); assertThat(tree.isArray()).isTrue(); @@ -193,7 +187,7 @@ cgc.new ByRepository(mainrepo.getRepository(), "main") paths.add(i.get("path").asText()); } - assertThat(paths).containsExactly("a", "x"); + assertThat(paths).containsExactly(".gitmodules", "sub"); } } diff --git a/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java index 3d9cc7e70..01b946558 100644 --- a/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java @@ -20,7 +20,7 @@ public class FileChunkStreamerTest { @Test public void no_op_if_no_content() throws Exception { - try (FileChunkStreamer fs = new FileChunkStreamer(content -> fail(), 2)) { + try (FileChunkStreamer fs = new FileChunkStreamer(null, content -> fail(), 2)) { // no write } } @@ -28,13 +28,13 @@ public void no_op_if_no_content() throws Exception { @Test public void basics() throws Exception { int[] count = new int[1]; - try (FileChunkStreamer fs = new FileChunkStreamer(content -> { + try (FileChunkStreamer fs = new FileChunkStreamer(new VirtualFileImpl("header.txt"), content -> { switch(count[0]++) { case 0: - assertThat(readEntries(content)).containsExactly("foo.txt", "bar.txt").inOrder(); + assertThat(readEntries(content)).containsExactly("header.txt", "foo.txt", "bar.txt").inOrder(); break; case 1: - assertThat(readEntries(content)).containsExactly("zot.txt").inOrder(); + assertThat(readEntries(content)).containsExactly("header.txt", "zot.txt").inOrder(); break; default: fail(); diff --git a/src/test/java/com/launchableinc/ingest/commits/ProgressReporterTest.java b/src/test/java/com/launchableinc/ingest/commits/ProgressReporterTest.java new file mode 100644 index 000000000..b518c73d9 --- /dev/null +++ b/src/test/java/com/launchableinc/ingest/commits/ProgressReporterTest.java @@ -0,0 +1,95 @@ +package com.launchableinc.ingest.commits; + +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static com.google.common.truth.Truth.*; +import static java.util.Collections.*; + +public class ProgressReporterTest { + ProgressReporter pr = new ProgressReporter(String::valueOf, Duration.ofMillis(100)) { + int cc = 0; + int ww = 0; + @Override + protected void print(int c, int w, String x) { + super.print(c, w, x); + + // ensure numbers are monotonically increasing + assertThat(c).isAtLeast(cc); + assertThat(w).isAtLeast(ww); + cc = c; + ww = w; + } + }; + + /** + * Tests the most important bit -- that all items are processed. + */ + @Test + public void serial() throws Exception { + List done = new ArrayList<>(); + try (ProgressReporter.Consumer x = pr.newConsumer(FlushableConsumer.of(s -> { + done.add(s); + sleep(); + }))) { + for (int i = 0; i < 100; i++) { + x.accept("item " + i); + } + } + assertThat(done.size()).isEqualTo(100); + } + + /** + * Perform work in parallel and make sure they all do get processed. + */ + @Test + public void parallel() throws Exception { + Set done = synchronizedSet(new HashSet<>()); + + ExecutorService es = Executors.newFixedThreadPool(10); + List> all = new ArrayList<>(); + for (int i=0; i<10; i++) { + final int ii = i; + all.add(es.submit(() -> { + try (ProgressReporter.Consumer x = pr.newConsumer(FlushableConsumer.of(s -> {done.add(s);sleep();}))) { + for (int j = 0; j < 100; j++) { + x.accept("item " + (ii*100+j)); + } + return null; + } + })); + } + for (Future f : all) { + f.get(); + } + es.shutdown(); + + assertThat(done.size()).isEqualTo(1000); + for (int i=0; i<1000; i++) { + assertThat(done).contains("item " + i); + } + } + + private static void sleep() { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new UnsupportedOperationException(); + } + } + + @Test + public void pad() { + assertThat(ProgressReporter.pad(5,3)).isEqualTo(" 5"); + assertThat(ProgressReporter.pad(15,3)).isEqualTo(" 15"); + assertThat(ProgressReporter.pad(1234,3)).isEqualTo("1234"); + } +} diff --git a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java b/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java deleted file mode 100644 index 3c655a95c..000000000 --- a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.launchableinc.ingest.commits; - -import com.google.common.truth.Truth; -import org.junit.Test; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -public class ProgressReportingConsumerTest { - @Test - public void basic() throws IOException { - List done = new ArrayList<>(); - try (ProgressReportingConsumer x = new ProgressReportingConsumer<>(FlushableConsumer.of(s -> {done.add(s);sleep();}), String::valueOf, Duration.ofMillis(100))) { - for (int i = 0; i < 100; i++) { - x.accept("item " + i); - } - } - Truth.assertThat(done.size()).isEqualTo(100); - } - - private static void sleep() { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new UnsupportedOperationException(); - } - } -}