diff --git a/launchable/jar/exe_deploy.jar b/launchable/jar/exe_deploy.jar index 31b53f9cc..e7af26047 100755 Binary files a/launchable/jar/exe_deploy.jar and b/launchable/jar/exe_deploy.jar differ diff --git a/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java index bf065dc68..7be425e2a 100644 --- a/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java +++ b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java @@ -8,12 +8,11 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; -import java.util.function.Consumer; /** * Accepts T, buffers them, and writes them out as a batch. */ -abstract class ChunkStreamer implements Consumer, Closeable { +abstract class ChunkStreamer implements FlushableConsumer, Closeable { /** * Encapsulation of how batches are sent. */ @@ -43,7 +42,8 @@ public void close() throws IOException { flush(); } - private void flush() throws IOException { + @Override + public void flush() throws IOException { if (spool.isEmpty()) { return; } diff --git a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java index 4ef2d854c..2a8093262 100644 --- a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java +++ b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java @@ -1,6 +1,7 @@ package com.launchableinc.ingest.commits; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -44,23 +45,25 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import java.util.zip.GZIPOutputStream; -import static com.google.common.collect.ImmutableList.*; -import static java.util.Arrays.*; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Arrays.stream; /** * Compares what commits the local repository and the remote repository have, then send delta over. @@ -171,11 +174,88 @@ public void transfer(URL service, Authenticator authenticator, boolean enableTim if (dryRun) { return; } - handleError(url, client.execute(request)); + handleError(url, client.execute(request)).close(); } catch (IOException e) { throw new UncheckedIOException(e); } }, + new TreeReceiver() { + private final List files = new ArrayList<>(); + + private void writeJsonTo(OutputStream os) throws IOException { + try (JsonGenerator w = new JsonFactory().createGenerator(os)) { + w.setCodec(objectMapper); + w.writeStartObject(); + w.writeArrayFieldStart("tree"); + + for (VirtualFile commit : files) { + w.writeStartObject(); + w.writeFieldName("path"); + w.writeString(commit.path()); + w.writeFieldName("blob"); + w.writeString(commit.blob().name()); + w.writeEndObject(); + } + + w.writeEndArray(); + w.writeEndObject(); + } + } + @Override + public Collection response() { + try { + URL url = new URL(service, "collect/tree"); + HttpPost request = new HttpPost(url.toExternalForm()); + request.setHeader("Content-Type", "application/json"); + request.setHeader("Content-Encoding", "gzip"); + request.setEntity(new EntityTemplate(raw -> { + try (OutputStream os = new GZIPOutputStream(raw)) { + writeJsonTo(os); + } + })); + + if (outputAuditLog()) { + System.err.printf( + "AUDIT:launchable:%ssend request method:post path:%s headers:%s args:", + dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); + writeJsonTo(System.err); + System.err.println(); + } + + // even in dry run, this method needs to execute in order to show what files we'll be collecting + try (CloseableHttpResponse response = handleError(url, client.execute(request)); + JsonParser parser = new JsonFactory().createParser(response.getEntity().getContent())) { + return select(objectMapper.readValue(parser, String[].class)); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + files.clear(); + } + } + + private List select(String[] response) { + Map filesByPath = new HashMap<>(); + for (VirtualFile f : files) { + filesByPath.put(f.path(), f); + } + + List selected = new ArrayList<>(); + for (String path : response) { + VirtualFile f = filesByPath.get(path); + if (f!=null) { + selected.add(f); + } + } + + return selected; + } + + @Override + public void accept(VirtualFile f) { + files.add(f); + } + }, (ContentProducer files) -> { try { URL url = new URL(service, "collect/files"); @@ -211,7 +291,7 @@ public void transfer(URL service, Authenticator authenticator, boolean enableTim if (dryRun) { return; } - handleError(url, client.execute(request)); + handleError(url, client.execute(request)).close(); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -256,13 +336,13 @@ private ImmutableList getAdvertisedRefs(HttpResponse response) throws * chunk size. */ public void transfer( - Collection advertised, IOConsumer commitSender, IOConsumer fileSender, int chunkSize) + Collection advertised, IOConsumer commitSender, TreeReceiver treeReceiver, IOConsumer fileSender, int chunkSize) throws IOException { ByRepository r = new ByRepository(root, rootName); try (CommitChunkStreamer cs = new CommitChunkStreamer(commitSender, chunkSize); FileChunkStreamer fs = new FileChunkStreamer(fileSender, chunkSize); ProgressReportingConsumer fsr = new ProgressReportingConsumer<>(fs, VirtualFile::path, Duration.ofSeconds(3))) { - r.transfer(advertised, cs, fsr); + r.transfer(advertised, cs, treeReceiver, fsr); } } @@ -323,7 +403,7 @@ final class ByRepository implements AutoCloseable { * * @param commitReceiver Receives commits that should be sent, one by one. */ - public void transfer(Collection advertised, Consumer commitReceiver, Consumer fileReceiver) + public void transfer(Collection advertised, Consumer commitReceiver, TreeReceiver treeReceiver, FlushableConsumer fileReceiver) throws IOException { try (RevWalk walk = new RevWalk(git); TreeWalk treeWalk = new TreeWalk(git)) { // walk reverse topological order, so that older commits get added to the server earlier. @@ -368,7 +448,11 @@ public void transfer(Collection advertised, Consumer commitR } } - collectFiles(treeWalk, fileReceiver); + // record all the necessary BLOBs first, before attempting to record its commit. + // this way, if the file collection fails, the server won't see this commit, so the future + // "record commit" invocation will retry the file collection, thereby making the behavior idempotent. + collectFiles(treeWalk, treeReceiver, fileReceiver); + fileReceiver.flush(); // walk the commits, transform them, and send them to the commitReceiver for (RevCommit c : walk) { @@ -399,7 +483,7 @@ That is, find submodules that are available in the working tree (thus `!isBare() if (subRepo != null) { try { try (ByRepository br = new ByRepository(subRepo, name + "/" + swalk.getModulesPath())) { - br.transfer(advertised, commitReceiver, fileReceiver); + br.transfer(advertised, commitReceiver, treeReceiver, fileReceiver); } } catch (ConfigInvalidException e) { throw new IOException("Invalid Git submodule configuration: " + git.getDirectory(), e); @@ -411,15 +495,24 @@ That is, find submodules that are available in the working tree (thus `!isBare() } } - private void collectFiles(TreeWalk treeWalk, Consumer receiver) throws IOException { - if (!collectFiles) return; + /** + * treeWalk contains the HEAD (the interesting commit) at the 0th position, then all the commits + * the server advertised in the 1st, 2nd, ... + * Our goal here is to find all the files that the server hasn't seen yet. We'll send them to the tree receiver, + * which further responds with the actual files we need to send to the server. + */ + private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer fileReceiver) throws IOException { + if (!collectFiles) { + return; + } + int c = treeWalk.getTreeCount(); OUTER: while (treeWalk.next()) { ObjectId head = treeWalk.getObjectId(0); - for (int i=1; i receiver) thr if (treeWalk.isSubtree()) { treeWalk.enterSubtree(); } else { - if ((treeWalk.getFileMode(0).getBits()&FileMode.TYPE_MASK)==FileMode.TYPE_FILE) { + if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) { GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader); // to avoid excessive data transfer, skip files that are too big - if (f.size()<1024*1024 && f.isText()) { - receiver.accept(f); - filesSent++; + if (f.size() < 1024 * 1024 && f.isText()) { + treeReceiver.accept(f); } } } } + + for (VirtualFile f : treeReceiver.response()) { + fileReceiver.accept(f); + filesSent++; + } } diff --git a/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java index 5483adfeb..698c69438 100644 --- a/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java +++ b/src/main/java/com/launchableinc/ingest/commits/FileChunkStreamer.java @@ -26,6 +26,7 @@ protected void writeTo(List files, OutputStream os) throws IOExcept for (VirtualFile f : files) { TarArchiveEntry e = new TarArchiveEntry(f.path()); e.setSize(f.size()); + e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID tar.putArchiveEntry(e); f.writeTo(tar); tar.closeArchiveEntry(); diff --git a/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java new file mode 100644 index 000000000..104649ab7 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java @@ -0,0 +1,14 @@ +package com.launchableinc.ingest.commits; + +import java.io.IOException; +import java.util.function.Consumer; + +/** + * Consumers that spool items it accepts and process them in bulk. + */ +public interface FlushableConsumer extends Consumer { + /** + * Process all items that have been accepted so far. + */ + void flush() throws IOException; +} diff --git a/src/main/java/com/launchableinc/ingest/commits/GitFile.java b/src/main/java/com/launchableinc/ingest/commits/GitFile.java index b3e9cc22e..332dab776 100644 --- a/src/main/java/com/launchableinc/ingest/commits/GitFile.java +++ b/src/main/java/com/launchableinc/ingest/commits/GitFile.java @@ -40,6 +40,11 @@ public String path() { return path; } + @Override + public ObjectId blob() { + return blob; + } + public long size() throws IOException { return open().getSize(); } diff --git a/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java b/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java index 8f39acc1a..858f63a47 100644 --- a/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java +++ b/src/main/java/com/launchableinc/ingest/commits/ProgressReportingConsumer.java @@ -1,5 +1,6 @@ package com.launchableinc.ingest.commits; +import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -13,44 +14,61 @@ * Given a slow {@link Consumer} that goes over a large number of items, * provide a progress report to show that the work is still in progress. */ -class ProgressReportingConsumer implements Consumer, AutoCloseable { - private final Consumer base; - private final List pool = new ArrayList<>(); - private final Function printer; - private final Duration reportInterval; - - ProgressReportingConsumer(Consumer base, Function printer, Duration reportInterval) { - this.base = base; - this.printer = printer; - this.reportInterval = reportInterval; - } +class ProgressReportingConsumer implements FlushableConsumer, AutoCloseable { + private final FlushableConsumer base; + private final List pool = new ArrayList<>(); + private final Function printer; + private final Duration reportInterval; + private int round = 1; + + ProgressReportingConsumer(FlushableConsumer base, Function printer, Duration reportInterval) { + this.base = base; + this.printer = printer; + this.reportInterval = reportInterval; + } - @Override - public void accept(T t) { - pool.add(t); + @Override + public void accept(T t) { + pool.add(t); + } + + @Override + public void flush() throws IOException { + Instant nextReportTime = now().plus(reportInterval); + int width = String.valueOf(pool.size()).length(); + int i = 0; + for (T x : pool) { + i++; + if (now().isAfter(nextReportTime)) { + System.err.printf("%s%s/%d: %s%n", round(), pad(i, width), pool.size(), printer.apply(x)); + nextReportTime = now().plus(reportInterval); + } + base.accept(x); } + pool.clear(); + base.flush(); + round++; + } - @Override - public void close() { - Instant nextReportTime = now().plus(reportInterval); - int width = String.valueOf(pool.size()).length(); - int i = 0; - for (T x : pool) { - i++; - if (now().isAfter(nextReportTime)) { - System.err.printf("%s/%d: %s%n", pad(i, width), pool.size(), printer.apply(x)); - nextReportTime = now().plus(reportInterval); - } - base.accept(x); - } - pool.clear(); + private String round() { + if (round==1) { + // most of the time, there's only one round, so let's not bother + return ""; + } else { + return String.format("#%d ", round); } + } + + @Override + public void close() throws IOException { + flush(); + } - static String pad(int i, int width) { - String s = String.valueOf(i); - while (s.length() < width) { - s = " " + s; - } - return s; + static String pad(int i, int width) { + String s = String.valueOf(i); + while (s.length() < width) { + s = " " + s; } + return s; + } } diff --git a/src/main/java/com/launchableinc/ingest/commits/TreeReceiver.java b/src/main/java/com/launchableinc/ingest/commits/TreeReceiver.java new file mode 100644 index 000000000..b6bd56484 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/TreeReceiver.java @@ -0,0 +1,18 @@ +package com.launchableinc.ingest.commits; + +import java.util.Collection; +import java.util.function.Consumer; + +/** + * Used by {@link CommitGraphCollector} as the abstraction of a server endpoint that + * receives a list of paths in a Git repository and responds with which ones it wants to see. + */ +public interface TreeReceiver extends Consumer { + /** + * Receives the subset of {@link VirtualFile}s sent to the server thus far, which + * the server wants to see. + *

+ * This resets the spool. + */ + Collection response(); +} diff --git a/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java b/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java index 69fe3aeab..b8126eba5 100644 --- a/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java +++ b/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java @@ -1,5 +1,7 @@ package com.launchableinc.ingest.commits; +import org.eclipse.jgit.lib.ObjectId; + import java.io.IOException; import java.io.OutputStream; @@ -13,6 +15,12 @@ public interface VirtualFile { * Path to the file within the repository. */ String path(); + + /** + * Blob ID of the file content. + */ + ObjectId blob(); + long size() throws IOException; void writeTo(OutputStream os) throws IOException; } diff --git a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java index 9f51447aa..a03b3e75a 100644 --- a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.Collection; import java.util.List; import static com.google.common.truth.Truth.*; @@ -109,6 +110,7 @@ public void chunking() throws Exception { councCommitChunks[0]++; assertValidJson(commits); }, + new PassThroughTreeReceiverImpl(), (ContentProducer files) -> { countFilesChunks[0]++; assertValidTar(files); @@ -145,7 +147,7 @@ public void scrubPii() throws Exception { addCommitInSubRepo(mainrepo); CommitGraphCollector cgc = new CommitGraphCollector("test", mainrepo.getRepository()); cgc.setMaxDays(30); - cgc.transfer(ImmutableList.of(), c -> c.writeTo(baos), f -> {}, Integer.MAX_VALUE); + cgc.transfer(ImmutableList.of(), c -> c.writeTo(baos), new PassThroughTreeReceiverImpl(), f -> {}, Integer.MAX_VALUE); } String requestBody = baos.toString(StandardCharsets.UTF_8); assertThat(requestBody).doesNotContain(committer.getEmailAddress()); @@ -157,7 +159,7 @@ private CommitGraphCollector collectCommit(Repository r, List advertis CommitGraphCollector cgc = new CommitGraphCollector("test", r); cgc.setMaxDays(30); cgc.collectFiles(true); - cgc.transfer(advertised, c -> {}, f -> {}, 3); + cgc.transfer(advertised, c -> {}, new PassThroughTreeReceiverImpl(),f -> {}, 3); return cgc; } diff --git a/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java index d018b5dd4..3d9cc7e70 100644 --- a/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java @@ -5,6 +5,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.NullOutputStream; import org.apache.http.entity.ContentProducer; +import org.eclipse.jgit.lib.ObjectId; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -78,6 +79,11 @@ public String path() { return path; } + @Override + public ObjectId blob() { + return ObjectId.zeroId(); + } + @Override public long size() { return path.getBytes().length; diff --git a/src/test/java/com/launchableinc/ingest/commits/PassThroughTreeReceiverImpl.java b/src/test/java/com/launchableinc/ingest/commits/PassThroughTreeReceiverImpl.java new file mode 100644 index 000000000..2ca14648d --- /dev/null +++ b/src/test/java/com/launchableinc/ingest/commits/PassThroughTreeReceiverImpl.java @@ -0,0 +1,20 @@ +package com.launchableinc.ingest.commits; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class PassThroughTreeReceiverImpl implements TreeReceiver { + private final List files = new ArrayList<>(); + @Override + public Collection response() { + List r = new ArrayList<>(files); + files.clear(); + return r; + } + + @Override + public void accept(VirtualFile f) { + files.add(f); + } +} diff --git a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java b/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java index eccb34bfa..52b73d527 100644 --- a/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java @@ -3,15 +3,17 @@ import com.google.common.truth.Truth; import org.junit.Test; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; public class ProgressReportingConsumerTest { @Test - public void basic() { + public void basic() throws IOException { List done = new ArrayList<>(); - try (ProgressReportingConsumer x = new ProgressReportingConsumer<>(s -> {done.add(s);sleep();}, String::valueOf, Duration.ofMillis(100))) { + try (ProgressReportingConsumer x = new ProgressReportingConsumer<>(flushableConsumer(s -> {done.add(s);sleep();}), String::valueOf, Duration.ofMillis(100))) { for (int i = 0; i < 100; i++) { x.accept("item " + i); } @@ -26,4 +28,18 @@ private static void sleep() { throw new UnsupportedOperationException(); } } + + private FlushableConsumer flushableConsumer(Consumer c) { + return new FlushableConsumer() { + @Override + public void flush() throws IOException { + // noop + } + + @Override + public void accept(T t) { + c.accept(t); + } + }; + } }