diff --git a/.editorconfig b/.editorconfig index f7255ecf7..4e0c3a2c1 100644 --- a/.editorconfig +++ b/.editorconfig @@ -8,4 +8,4 @@ charset=utf-8 indent_style=space [*.java] -indent_size = 4 +indent_size = 2 diff --git a/WORKSPACE b/WORKSPACE index 3fedc788f..903fea22d 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -9,9 +9,11 @@ http_archive( ) load("@rules_jvm_external//:repositories.bzl", "rules_jvm_external_deps") + rules_jvm_external_deps() load("@rules_jvm_external//:setup.bzl", "rules_jvm_external_setup") + rules_jvm_external_setup() load("@rules_jvm_external//:defs.bzl", "maven_install") @@ -25,6 +27,7 @@ maven_install( "com.fasterxml.jackson.core:jackson-core:2.18.2", "com.fasterxml.jackson.core:jackson-databind:2.18.2", "com.google.guava:guava:33.3.1-jre", + "org.apache.commons:commons-compress:1.27.1", "org.apache.httpcomponents:httpclient:4.5.14", # This is the last release that produce Java 8 class files. "org.eclipse.jgit:org.eclipse.jgit:5.13.3.202401111512-r", @@ -45,10 +48,10 @@ maven_install( version = "1.4.4", ), ], + fetch_sources = True, maven_install_json = "//src:maven_install.json", repositories = ["https://repo1.maven.org/maven2"], version_conflict_policy = "pinned", - fetch_sources = True, ) load("@maven//:defs.bzl", "pinned_maven_install") diff --git a/launchable/commands/record/commit.py b/launchable/commands/record/commit.py index 5cd831ae2..16128f101 100644 --- a/launchable/commands/record/commit.py +++ b/launchable/commands/record/commit.py @@ -65,10 +65,12 @@ def commit(ctx, source: str, executable: bool, max_days: int, scrub_pii: bool, i # Commit messages are not collected in the default. is_collect_message = False + is_collect_files = False try: res = client.request("get", "commits/collect/options") res.raise_for_status() is_collect_message = res.json().get("commitMessage", False) + is_collect_files = res.json().get("files", False) except Exception as e: tracking_client.send_error_event( event_name=Tracking.ErrorEvent.INTERNAL_CLI_ERROR, @@ -79,7 +81,7 @@ def commit(ctx, source: str, executable: bool, max_days: int, scrub_pii: bool, i cwd = os.path.abspath(source) try: - exec_jar(cwd, max_days, ctx.obj, is_collect_message) + exec_jar(cwd, max_days, ctx.obj, is_collect_message, is_collect_files) except Exception as e: if os.getenv(REPORT_ERROR_KEY): raise e @@ -89,7 +91,7 @@ def commit(ctx, source: str, executable: bool, max_days: int, scrub_pii: bool, i "If not, please set a directory use by --source option.\nerror: {}".format(cwd, e)) -def exec_jar(source: str, max_days: int, app: Application, is_collect_message: bool): +def exec_jar(source: str, max_days: int, app: Application, is_collect_message: bool, is_collect_files: bool): java = get_java_command() if not java: @@ -120,6 +122,8 @@ def exec_jar(source: str, max_days: int, app: Application, is_collect_message: b command.append("-skip-cert-verification") if is_collect_message: command.append("-commit-message") + if is_collect_files: + command.append("-files") if os.getenv(COMMIT_TIMEOUT): command.append("-enable-timeout") command.append(cygpath(source)) diff --git a/launchable/jar/exe_deploy.jar b/launchable/jar/exe_deploy.jar index 4282159ff..78d3546f3 100755 Binary files a/launchable/jar/exe_deploy.jar and b/launchable/jar/exe_deploy.jar differ diff --git a/src/main/java/com/launchableinc/ingest/commits/BUILD b/src/main/java/com/launchableinc/ingest/commits/BUILD index 5b16fbd7d..80576782f 100644 --- a/src/main/java/com/launchableinc/ingest/commits/BUILD +++ b/src/main/java/com/launchableinc/ingest/commits/BUILD @@ -1,5 +1,5 @@ package( - default_visibility = ["//visibility:public"] + default_visibility = ["//visibility:public"], ) java_library( @@ -11,6 +11,7 @@ java_library( "@maven//:com_fasterxml_jackson_core_jackson_core", "@maven//:com_fasterxml_jackson_core_jackson_databind", "@maven//:com_google_guava_guava", + "@maven//:org_apache_commons_commons_compress", "@maven//:org_apache_httpcomponents_httpclient", "@maven//:org_apache_httpcomponents_httpcore", "@maven//:org_eclipse_jgit_org_eclipse_jgit", diff --git a/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java new file mode 100644 index 000000000..bf065dc68 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/ChunkStreamer.java @@ -0,0 +1,59 @@ +package com.launchableinc.ingest.commits; + +import org.apache.http.entity.ContentProducer; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * Accepts T, buffers them, and writes them out as a batch. + */ +abstract class ChunkStreamer implements Consumer, Closeable { + /** + * Encapsulation of how batches are sent. + */ + private final IOConsumer sender; + private final int chunkSize; + private final List spool = new ArrayList<>(); + + ChunkStreamer(IOConsumer sender, int chunkSize) { + this.sender = sender; + this.chunkSize = chunkSize; + } + + @Override + public void accept(T f) { + spool.add(f); + if (spool.size() >= chunkSize) { + try { + flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @Override + public void close() throws IOException { + flush(); + } + + private void flush() throws IOException { + if (spool.isEmpty()) { + return; + } + + try { + sender.accept(os -> writeTo(spool,os)); + } finally { + spool.clear(); + } + } + + protected abstract void writeTo(List spool, OutputStream os) throws IOException; +} diff --git a/src/main/java/com/launchableinc/ingest/commits/CommitChunkStreamer.java b/src/main/java/com/launchableinc/ingest/commits/CommitChunkStreamer.java new file mode 100644 index 000000000..570f5db52 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/CommitChunkStreamer.java @@ -0,0 +1,37 @@ +package com.launchableinc.ingest.commits; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.http.entity.ContentProducer; +import org.eclipse.jgit.revwalk.RevCommit; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.function.Consumer; + +/** + * {@link Consumer} that groups commits into chunks and write them as JSON, using streams supplied + * by the factory. + */ +final class CommitChunkStreamer extends ChunkStreamer { + CommitChunkStreamer(IOConsumer sender, int chunkSize) { + super(sender, chunkSize); + } + + @Override + protected void writeTo(List spool, OutputStream os) throws IOException { + JsonGenerator w = new JsonFactory().createGenerator(os).useDefaultPrettyPrinter(); + w.setCodec(CommitGraphCollector.objectMapper); + w.writeStartObject(); + w.writeArrayFieldStart("commits"); + + for (JSCommit commit : spool) { + w.writeObject(commit); + } + + w.writeEndArray(); + w.writeEndObject(); + w.close(); + } +} diff --git a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java index ac1a53e03..6c0df383f 100644 --- a/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java +++ b/src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java @@ -1,19 +1,21 @@ package com.launchableinc.ingest.commits; import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.io.CharStreams; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.http.Header; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentProducer; import org.apache.http.entity.EntityTemplate; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; @@ -22,6 +24,7 @@ import org.eclipse.jgit.errors.InvalidObjectIdException; import org.eclipse.jgit.errors.MissingObjectException; import org.eclipse.jgit.lib.ConfigConstants; +import org.eclipse.jgit.lib.FileMode; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.ObjectReader; import org.eclipse.jgit.lib.PersonIdent; @@ -32,15 +35,14 @@ import org.eclipse.jgit.revwalk.filter.CommitTimeRevFilter; import org.eclipse.jgit.revwalk.filter.OrRevFilter; import org.eclipse.jgit.submodule.SubmoduleWalk; +import org.eclipse.jgit.treewalk.TreeWalk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -63,7 +65,7 @@ */ public class CommitGraphCollector { private static final Logger logger = LoggerFactory.getLogger(CommitGraphCollector.class); - private static final ObjectMapper objectMapper = new ObjectMapper(); + static final ObjectMapper objectMapper = new ObjectMapper(); private static final int HTTP_TIMEOUT_MILLISECONDS = 15_000; /** @@ -73,9 +75,9 @@ public class CommitGraphCollector { */ private final Repository root; - private int commitsSent; + private int commitsSent, filesSent; - private boolean collectCommitMessage; + private boolean collectCommitMessage, collectFiles; private int maxDays; @@ -105,6 +107,10 @@ public int getCommitsSent() { return commitsSent; } + public int getFilesSent() { + return filesSent; + } + private String dumpHeaderAsJson(Header[] headers) throws JsonProcessingException { ObjectNode header = objectMapper.createObjectNode(); for (Header h : headers) { @@ -115,7 +121,7 @@ private String dumpHeaderAsJson(Header[] headers) throws JsonProcessingException /** Transfers the commits to the remote endpoint. */ public void transfer(URL service, Authenticator authenticator, boolean enableTimeout) throws IOException { - URL url; + URL latestUrl; HttpClientBuilder builder = HttpClientBuilder.create() .useSystemProperties() @@ -128,52 +134,83 @@ public void transfer(URL service, Authenticator authenticator, boolean enableTim builder.setDefaultRequestConfig(config); } try (CloseableHttpClient client = builder.build()) { - url = new URL(service, "latest"); + latestUrl = new URL(service, "latest"); if (outputAuditLog()) { System.err.printf( - "AUDIT:launchable:%ssend request method:get path: %s%n", dryRunPrefix(), url); + "AUDIT:launchable:%ssend request method:get path: %s%n", dryRunPrefix(), latestUrl); } - CloseableHttpResponse latestResponse = client.execute(new HttpGet(url.toExternalForm())); - ImmutableList advertised = getAdvertisedRefs(handleError(url, latestResponse)); + CloseableHttpResponse latestResponse = client.execute(new HttpGet(latestUrl.toExternalForm())); + ImmutableList advertised = getAdvertisedRefs(handleError(latestUrl, latestResponse)); honorMaxDaysHeader(latestResponse); // every time a new stream is needed, supply ByteArrayOutputStream, and when the data is all // written, turn around and ship that over transfer( advertised, - () -> { + (ContentProducer commits) -> { try { - return new GZIPOutputStream( - new ByteArrayOutputStream() { - @Override - public void close() throws IOException { - URL url = new URL(service, "collect"); - HttpPost request = new HttpPost(url.toExternalForm()); - request.setHeader("Content-Type", "application/json"); - request.setHeader("Content-Encoding", "gzip"); - request.setEntity(new EntityTemplate(this::writeTo)); - - if (outputAuditLog()) { - InputStreamReader gzip = - new InputStreamReader( - new GZIPInputStream(new ByteArrayInputStream(toByteArray())), - StandardCharsets.UTF_8); - String json = CharStreams.toString(gzip); - System.err.printf( - "AUDIT:launchable:%ssend request method:post path:%s headers:%s" - + " args:%s%n", - dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders()), json); - } - if (dryRun) { - return; - } - handleError(url, client.execute(request)); - } - }); + URL url = new URL(service, "collect"); + HttpPost request = new HttpPost(url.toExternalForm()); + request.setHeader("Content-Type", "application/json"); + request.setHeader("Content-Encoding", "gzip"); + request.setEntity(new EntityTemplate(os -> commits.writeTo(new GZIPOutputStream(os)))); + + if (outputAuditLog()) { + System.err.printf( + "AUDIT:launchable:%ssend request method:post path:%s headers:%s" + + " args:", + dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); + commits.writeTo(System.err); + System.err.println(); + } + if (dryRun) { + return; + } + handleError(url, client.execute(request)); } catch (IOException e) { throw new UncheckedIOException(e); } }, + (ContentProducer files) -> { + try { + URL url = new URL(service, "collect/files"); + HttpPost request = new HttpPost(url.toExternalForm()); + request.setHeader("Content-Type", "application/octet-stream"); + // no content encoding, since .tar.gz is considered content + request.setEntity(new EntityTemplate(os -> files.writeTo(new GZIPOutputStream(os)))); + + if (outputAuditLog()) { + System.err.printf( + "AUDIT:launchable:%ssend request method:post path:%s headers:%s args:", + dryRunPrefix(), url, dumpHeaderAsJson(request.getAllHeaders())); + + // TODO: inefficient to buffer everything in memory just to read it back + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + files.writeTo(baos); + TarArchiveInputStream tar = + new TarArchiveInputStream( + new ByteArrayInputStream(baos.toByteArray()), + "UTF-8"); + TarArchiveEntry entry; + boolean first = true; + while ((entry = tar.getNextTarEntry()) != null) { + System.err.printf(entry.getName()); + if (first) { + first = false; + } else { + System.err.print(", "); + } + } + System.err.println(); + } + if (dryRun) { + return; + } + handleError(latestUrl, client.execute(request)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, 256); } } @@ -210,67 +247,16 @@ private ImmutableList getAdvertisedRefs(HttpResponse response) throws /** * Writes delta between local commits to the advertised to JSON stream. * - * @param streams Commits are written to streams provided by this {@link Supplier}, in the given + * @param commitSender Commits are written to streams provided by this {@link Supplier}, in the given * chunk size. */ public void transfer( - Collection advertised, Supplier streams, int chunkSize) + Collection advertised, IOConsumer commitSender, IOConsumer fileSender, int chunkSize) throws IOException { - try (ChunkStreamer cs = new ChunkStreamer(streams, chunkSize)) { - new ByRepository(root).transfer(advertised, cs); - } - } - - /** - * {@link Consumer} that groups commits into chunks and write them as JSON, using streams supplied - * by the factory. - */ - private static final class ChunkStreamer implements Consumer, Closeable { - - private final Supplier streams; - private JsonGenerator w; - /** Count # of items we wrote to this stream. */ - private int count; - - private final int chunkSize; - - ChunkStreamer(Supplier streams, int chunkSize) { - this.streams = streams; - this.chunkSize = chunkSize; - } - - @Override - public void accept(JSCommit commit) { - try { - if (w == null) { - open(); - } - w.writeObject(commit); - if (++count >= chunkSize) { - close(); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - public void open() throws IOException { - w = new JsonFactory().createGenerator(streams.get()).useDefaultPrettyPrinter(); - w.setCodec(objectMapper); - w.writeStartObject(); - w.writeArrayFieldStart("commits"); - } - - @Override - public void close() throws IOException { - if (w == null) { - return; // already closed - } - w.writeEndArray(); - w.writeEndObject(); - w.close(); - w = null; - count = 0; + ByRepository r = new ByRepository(root); + try (CommitChunkStreamer cs = new CommitChunkStreamer(commitSender, chunkSize); + FileChunkStreamer fs = new FileChunkStreamer(fileSender, chunkSize)) { + r.transfer(advertised, cs, fs); } } @@ -307,6 +293,10 @@ public void setDryRun(boolean dryRun) { this.dryRun = dryRun; } + public void collectFiles(boolean collectFiles) { + this.collectFiles = collectFiles; + } + /** Process commits per repository. */ final class ByRepository implements AutoCloseable { @@ -324,11 +314,11 @@ final class ByRepository implements AutoCloseable { /** * Writes delta between local commits to the advertised to JSON stream. * - * @param receiver Receives commits that should be sent, one by one. + * @param commitReceiver Receives commits that should be sent, one by one. */ - public void transfer(Collection advertised, Consumer receiver) + public void transfer(Collection advertised, Consumer commitReceiver, Consumer fileReceiver) throws IOException { - try (RevWalk walk = new RevWalk(git)) { + try (RevWalk walk = new RevWalk(git); TreeWalk treeWalk = new TreeWalk(git)) { // walk reverse topological order, so that older commits get added to the server earlier. // This way, the connectivity of the commit graph will be always maintained walk.sort(RevSort.TOPO); @@ -340,7 +330,9 @@ public void transfer(Collection advertised, Consumer receive walk.sort(RevSort.COMMIT_TIME_DESC, true); ObjectId headId = git.resolve("HEAD"); - walk.markStart(walk.parseCommit(headId)); + RevCommit start = walk.parseCommit(headId); + walk.markStart(start); + treeWalk.addTree(start.getTree()); // don't walk commits too far back. // for our purpose of computing CUT, these are unlikely to contribute meaningfully @@ -352,11 +344,11 @@ public void transfer(Collection advertised, Consumer receive CommitTimeRevFilter.after(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxDays)), new ObjectRevFilter(headId))); - for (ObjectId id : advertised) { try { RevCommit c = walk.parseCommit(id); walk.markUninteresting(c); + treeWalk.addTree(c.getTree()); } catch (MissingObjectException e) { // it's possible that the server advertises a commit we don't have. // @@ -369,10 +361,11 @@ public void transfer(Collection advertised, Consumer receive } } - // walk the commits, transform them, and send them to the receiver + collectFiles(treeWalk, fileReceiver); + + // walk the commits, transform them, and send them to the commitReceiver for (RevCommit c : walk) { - JSCommit d = transform(c); - receiver.accept(d); + commitReceiver.accept(transform(c)); commitsSent++; } } @@ -398,7 +391,7 @@ That is, find submodules that are available in the working tree (thus `!isBare() try (Repository subRepo = swalk.getRepository()) { if (subRepo != null) { try (ByRepository br = new ByRepository(subRepo)) { - br.transfer(advertised, receiver); + br.transfer(advertised, commitReceiver, fileReceiver); } } } @@ -407,6 +400,39 @@ That is, find submodules that are available in the working tree (thus `!isBare() } } + private void collectFiles(TreeWalk treeWalk, Consumer receiver) throws IOException { + if (!collectFiles) return; + + int c = treeWalk.getTreeCount(); + + OUTER: + while (treeWalk.next()) { + ObjectId head = treeWalk.getObjectId(0); + for (int i=1; i { + FileChunkStreamer(IOConsumer sender, int chunkSize) { + super(sender, chunkSize); + } + + @Override + protected void writeTo(List files, OutputStream os) throws IOException { + try (TarArchiveOutputStream tar = new TarArchiveOutputStream(os, "UTF-8")) { + tar.setLongFileMode(LONGFILE_POSIX); + + for (VirtualFile f : files) { + TarArchiveEntry e = new TarArchiveEntry(f.path()); + e.setSize(f.size()); + tar.putArchiveEntry(e); + f.writeTo(tar); + tar.closeArchiveEntry(); + } + } + } +} diff --git a/src/main/java/com/launchableinc/ingest/commits/GitFile.java b/src/main/java/com/launchableinc/ingest/commits/GitFile.java new file mode 100644 index 000000000..be5aeb71f --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/GitFile.java @@ -0,0 +1,38 @@ +package com.launchableinc.ingest.commits; + +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectReader; + +import java.io.IOException; +import java.io.OutputStream; + +import static org.eclipse.jgit.lib.Constants.*; + +/** + * Represents a file in a Git repository, and encapsulates the read access for convenience. + */ +final class GitFile implements VirtualFile { + final String path; + final ObjectId blob; + private final ObjectReader objectReader; + + public GitFile(String path, ObjectId blob, ObjectReader objectReader) { + this.path = path; + this.blob = blob; + this.objectReader = objectReader; + } + + @Override + public String path() { + return path; + } + + public long size() throws IOException { + return objectReader.open(blob, OBJ_BLOB).getSize(); + } + + @Override + public void writeTo(OutputStream os) throws IOException { + objectReader.open(blob, OBJ_BLOB).copyTo(os); + } +} diff --git a/src/main/java/com/launchableinc/ingest/commits/IOConsumer.java b/src/main/java/com/launchableinc/ingest/commits/IOConsumer.java new file mode 100644 index 000000000..d7f34a7c3 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/IOConsumer.java @@ -0,0 +1,7 @@ +package com.launchableinc.ingest.commits; + +import java.io.IOException; + +public interface IOConsumer { + void accept(T t) throws IOException; +} diff --git a/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java b/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java new file mode 100644 index 000000000..bee881cb5 --- /dev/null +++ b/src/main/java/com/launchableinc/ingest/commits/VirtualFile.java @@ -0,0 +1,10 @@ +package com.launchableinc.ingest.commits; + +import java.io.IOException; +import java.io.OutputStream; + +public interface VirtualFile { + String path(); + long size() throws IOException; + void writeTo(OutputStream os) throws IOException; +} diff --git a/src/maven_install.json b/src/maven_install.json index 42453d3d6..ed92dd600 100644 --- a/src/maven_install.json +++ b/src/maven_install.json @@ -1,7 +1,7 @@ { "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": "THERE_IS_NO_DATA_ONLY_ZUUL", - "__INPUT_ARTIFACTS_HASH": -567104162, - "__RESOLVED_ARTIFACTS_HASH": -1847404694, + "__INPUT_ARTIFACTS_HASH": 945199548, + "__RESOLVED_ARTIFACTS_HASH": -603343145, "artifacts": { "args4j:args4j": { "shasums": { @@ -124,10 +124,17 @@ }, "commons-codec:commons-codec": { "shasums": { - "jar": "e599d5318e97aa48f42136a2927e6dfa4e8881dff0e6c8e3109ddbbff51d7b7d", - "sources": "901cb5d1f7c2877017c95d3c5efd5a497738d0162ef72cdf58e9cb13f50b2e9c" + "jar": "f9f6cb103f2ddc3c99a9d80ada2ae7bf0685111fd6bffccb72033d1da4e6ff23", + "sources": "2283aff7f425dff23ebdb7a8fc0f03ae21b4ced7a43aacba47cedae126dc5d4a" }, - "version": "1.11" + "version": "1.17.1" + }, + "commons-io:commons-io": { + "shasums": { + "jar": "f41f7baacd716896447ace9758621f62c1c6b0a91d89acee488da26fc477c84f", + "sources": "fcfe84e39fb44e38a0ea0ab0815b53adea6fff89c7b72535bc42495f400cb9a1" + }, + "version": "2.16.1" }, "commons-logging:commons-logging": { "shasums": { @@ -157,6 +164,20 @@ }, "version": "1.7.1" }, + "org.apache.commons:commons-compress": { + "shasums": { + "jar": "293d80f54b536b74095dcd7ea3cf0a29bbfc3402519281332495f4420d370d16", + "sources": "90839ce19b8afb8ee50db75c84aa7555ebc054ba1bd43fdb2202cbf094c77f81" + }, + "version": "1.27.1" + }, + "org.apache.commons:commons-lang3": { + "shasums": { + "jar": "08709dd74d602b705ce4017d26544210056a4ba583d5b20c09373406fe7a00f8", + "sources": "268058f1ce33558da48f07b5f5dae4ae6433357bf07e1bbd0f1d864adda1269c" + }, + "version": "3.16.0" + }, "org.apache.httpcomponents:httpclient": { "shasums": { "jar": "c8bc7e1c51a6d4ce72f40d2ebbabf1c4b68bfe76e732104b04381b493478e9d6", @@ -423,6 +444,11 @@ "org.apache.ant:ant": [ "org.apache.ant:ant-launcher" ], + "org.apache.commons:commons-compress": [ + "commons-codec:commons-codec", + "commons-io:commons-io", + "org.apache.commons:commons-lang3" + ], "org.apache.httpcomponents:httpclient": [ "commons-codec:commons-codec", "commons-logging:commons-logging", @@ -681,6 +707,23 @@ "org.apache.commons.codec.language.bm", "org.apache.commons.codec.net" ], + "commons-io:commons-io": [ + "org.apache.commons.io", + "org.apache.commons.io.build", + "org.apache.commons.io.channels", + "org.apache.commons.io.charset", + "org.apache.commons.io.comparator", + "org.apache.commons.io.file", + "org.apache.commons.io.file.attribute", + "org.apache.commons.io.file.spi", + "org.apache.commons.io.filefilter", + "org.apache.commons.io.function", + "org.apache.commons.io.input", + "org.apache.commons.io.input.buffer", + "org.apache.commons.io.monitor", + "org.apache.commons.io.output", + "org.apache.commons.io.serialization" + ], "commons-logging:commons-logging": [ "org.apache.commons.logging", "org.apache.commons.logging.impl" @@ -753,6 +796,64 @@ "org.apache.ant:ant-launcher": [ "org.apache.tools.ant.launch" ], + "org.apache.commons:commons-compress": [ + "org.apache.commons.compress", + "org.apache.commons.compress.archivers", + "org.apache.commons.compress.archivers.ar", + "org.apache.commons.compress.archivers.arj", + "org.apache.commons.compress.archivers.cpio", + "org.apache.commons.compress.archivers.dump", + "org.apache.commons.compress.archivers.examples", + "org.apache.commons.compress.archivers.jar", + "org.apache.commons.compress.archivers.sevenz", + "org.apache.commons.compress.archivers.tar", + "org.apache.commons.compress.archivers.zip", + "org.apache.commons.compress.changes", + "org.apache.commons.compress.compressors", + "org.apache.commons.compress.compressors.brotli", + "org.apache.commons.compress.compressors.bzip2", + "org.apache.commons.compress.compressors.deflate", + "org.apache.commons.compress.compressors.deflate64", + "org.apache.commons.compress.compressors.gzip", + "org.apache.commons.compress.compressors.lz4", + "org.apache.commons.compress.compressors.lz77support", + "org.apache.commons.compress.compressors.lzma", + "org.apache.commons.compress.compressors.lzw", + "org.apache.commons.compress.compressors.pack200", + "org.apache.commons.compress.compressors.snappy", + "org.apache.commons.compress.compressors.xz", + "org.apache.commons.compress.compressors.z", + "org.apache.commons.compress.compressors.zstandard", + "org.apache.commons.compress.harmony", + "org.apache.commons.compress.harmony.archive.internal.nls", + "org.apache.commons.compress.harmony.pack200", + "org.apache.commons.compress.harmony.unpack200", + "org.apache.commons.compress.harmony.unpack200.bytecode", + "org.apache.commons.compress.harmony.unpack200.bytecode.forms", + "org.apache.commons.compress.java.util.jar", + "org.apache.commons.compress.parallel", + "org.apache.commons.compress.utils" + ], + "org.apache.commons:commons-lang3": [ + "org.apache.commons.lang3", + "org.apache.commons.lang3.arch", + "org.apache.commons.lang3.builder", + "org.apache.commons.lang3.compare", + "org.apache.commons.lang3.concurrent", + "org.apache.commons.lang3.concurrent.locks", + "org.apache.commons.lang3.event", + "org.apache.commons.lang3.exception", + "org.apache.commons.lang3.function", + "org.apache.commons.lang3.math", + "org.apache.commons.lang3.mutable", + "org.apache.commons.lang3.reflect", + "org.apache.commons.lang3.stream", + "org.apache.commons.lang3.text", + "org.apache.commons.lang3.text.translate", + "org.apache.commons.lang3.time", + "org.apache.commons.lang3.tuple", + "org.apache.commons.lang3.util" + ], "org.apache.httpcomponents:httpclient": [ "org.apache.http.auth", "org.apache.http.auth.params", @@ -2184,6 +2285,8 @@ "commons-cli:commons-cli:jar:sources", "commons-codec:commons-codec", "commons-codec:commons-codec:jar:sources", + "commons-io:commons-io", + "commons-io:commons-io:jar:sources", "commons-logging:commons-logging", "commons-logging:commons-logging:jar:sources", "junit:junit", @@ -2191,6 +2294,10 @@ "org.apache.ant:ant", "org.apache.ant:ant-launcher", "org.apache.ant:ant:jar:sources", + "org.apache.commons:commons-compress", + "org.apache.commons:commons-compress:jar:sources", + "org.apache.commons:commons-lang3", + "org.apache.commons:commons-lang3:jar:sources", "org.apache.httpcomponents:httpclient", "org.apache.httpcomponents:httpclient:jar:sources", "org.apache.httpcomponents:httpcore", diff --git a/src/test/java/com/launchableinc/ingest/commits/AllTests.java b/src/test/java/com/launchableinc/ingest/commits/AllTests.java index eb2934e43..96a930c8d 100644 --- a/src/test/java/com/launchableinc/ingest/commits/AllTests.java +++ b/src/test/java/com/launchableinc/ingest/commits/AllTests.java @@ -8,6 +8,7 @@ @SuiteClasses({ CommitGraphCollectorTest.class, CommitIngesterTest.class, + FileChunkStreamerTest.class, SSLBypassTest.class }) public class AllTests {} diff --git a/src/test/java/com/launchableinc/ingest/commits/BUILD b/src/test/java/com/launchableinc/ingest/commits/BUILD index c4bf189dd..be6cd4499 100644 --- a/src/test/java/com/launchableinc/ingest/commits/BUILD +++ b/src/test/java/com/launchableinc/ingest/commits/BUILD @@ -3,9 +3,13 @@ java_test( srcs = glob(["*.java"]), deps = [ "//src/main/java/com/launchableinc/ingest/commits", + "@maven//:com_fasterxml_jackson_core_jackson_databind", "@maven//:com_google_guava_guava", "@maven//:com_google_truth_truth", + "@maven//:commons_io_commons_io", "@maven//:junit_junit", + "@maven//:org_apache_commons_commons_compress", + "@maven//:org_apache_httpcomponents_httpcore", "@maven//:org_eclipse_jgit_org_eclipse_jgit", "@maven//:org_mock_server_mockserver_junit_rule_no_dependencies", ], @@ -16,6 +20,6 @@ sh_test( srcs = ["java8-compat.sh"], data = [ "//src/main/java/com/launchableinc/ingest/commits:exe_deploy.jar", - "@maven//:org_jvnet_animal_sniffer" - ] + "@maven//:org_jvnet_animal_sniffer", + ], ) diff --git a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java index a5e6ea4b0..ad8b4a435 100644 --- a/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java +++ b/src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java @@ -1,15 +1,11 @@ package com.launchableinc.ingest.commits; -import static com.google.common.truth.Truth.assertThat; - +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.List; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; +import org.apache.http.entity.ContentProducer; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.PersonIdent; @@ -23,6 +19,17 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.List; + +import static com.google.common.truth.Truth.*; + @RunWith(JUnit4.class) public class CommitGraphCollectorTest { @@ -79,14 +86,15 @@ public void bareRepo() throws Exception { try (Repository r = Git.open(barerepoDir).getRepository()) { CommitGraphCollector cgc = collectCommit(r, ImmutableList.of()); assertThat(cgc.getCommitsSent()).isEqualTo(1); + assertThat(cgc.getFilesSent()).isEqualTo(1); } } /** Tests the chunking behavior. */ @Test public void chunking() throws Exception { - int[] countStreams = new int[1]; - int[] countClose = new int[1]; + int[] councCommitChunks = new int[1]; + int[] countFilesChunks = new int[1]; // Create 3 commits setupRepos(); @@ -94,23 +102,39 @@ public void chunking() throws Exception { addCommitInSubRepo(mainrepo); CommitGraphCollector cgc = new CommitGraphCollector(mainrepo.getRepository()); cgc.setMaxDays(30); + cgc.collectFiles(true); cgc.transfer( ImmutableList.of(), - () -> { - countStreams[0]++; - return new ByteArrayOutputStream() { - @Override - public void close() throws IOException { - super.close(); - assertThat(size()).isGreaterThan(0); - countClose[0]++; - } - }; + (ContentProducer commits) -> { + councCommitChunks[0]++; + assertValidJson(commits); + }, + (ContentProducer files) -> { + countFilesChunks[0]++; + assertValidTar(files); }, 2); } - assertThat(countStreams[0]).isEqualTo(2); - assertThat(countClose[0]).isEqualTo(2); + assertThat(councCommitChunks[0]).isEqualTo(2); + assertThat(countFilesChunks[0]).isEqualTo(1); // a and sub/x, 2 files, 1 chunk + } + + private void assertValidTar(ContentProducer content) throws IOException { + try (TarArchiveInputStream tar = new TarArchiveInputStream(read(content))) { + while (tar.getNextEntry() != null) { + IOUtils.copy(tar, NullOutputStream.INSTANCE); + } + } + } + + private void assertValidJson(ContentProducer content) throws IOException { + new ObjectMapper().readTree(read(content)); + } + + private InputStream read(ContentProducer content) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + content.writeTo(baos); + return new ByteArrayInputStream(baos.toByteArray()); } @Test @@ -121,7 +145,7 @@ public void scrubPii() throws Exception { addCommitInSubRepo(mainrepo); CommitGraphCollector cgc = new CommitGraphCollector(mainrepo.getRepository()); cgc.setMaxDays(30); - cgc.transfer(ImmutableList.of(), () -> baos, Integer.MAX_VALUE); + cgc.transfer(ImmutableList.of(), c -> c.writeTo(baos), f -> {}, Integer.MAX_VALUE); } String requestBody = baos.toString(StandardCharsets.UTF_8); assertThat(requestBody).doesNotContain(committer.getEmailAddress()); @@ -132,14 +156,8 @@ private CommitGraphCollector collectCommit(Repository r, List advertis throws IOException { CommitGraphCollector cgc = new CommitGraphCollector(r); cgc.setMaxDays(30); - cgc.transfer( - advertised, - () -> - new FilterOutputStream(System.err) { - @Override - public void close() {} - }, - 3); + cgc.collectFiles(true); + cgc.transfer(advertised, c -> {}, f -> {}, 3); return cgc; } diff --git a/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java new file mode 100644 index 000000000..e64cebbf4 --- /dev/null +++ b/src/test/java/com/launchableinc/ingest/commits/FileChunkStreamerTest.java @@ -0,0 +1,86 @@ +package com.launchableinc.ingest.commits; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; +import org.apache.http.entity.ContentProducer; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import static com.google.common.truth.Truth.*; +import static org.junit.Assert.fail; + +public class FileChunkStreamerTest { + @Test + public void no_op_if_no_content() throws Exception { + try (FileChunkStreamer fs = new FileChunkStreamer(content -> fail(), 2)) { + // no write + } + } + + @Test + public void basics() throws Exception { + int[] count = new int[1]; + try (FileChunkStreamer fs = new FileChunkStreamer(content -> { + switch(count[0]++) { + case 0: + assertThat(readEntries(content)).containsExactly("foo.txt", "bar.txt").inOrder(); + break; + case 1: + assertThat(readEntries(content)).containsExactly("zot.txt").inOrder(); + break; + default: + fail(); + } + }, 2)) { + fs.accept(new VirtualFileImpl("foo.txt")); + fs.accept(new VirtualFileImpl("bar.txt")); + fs.accept(new VirtualFileImpl("zot.txt")); + } + assertThat(count[0]).isEqualTo(2); + } + + private List readEntries(ContentProducer content) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + content.writeTo(baos); + + try (TarArchiveInputStream tar = new TarArchiveInputStream(new ByteArrayInputStream(baos.toByteArray()))) { + List entries = new java.util.ArrayList<>(); + TarArchiveEntry entry; + while ((entry = tar.getNextTarEntry()) != null) { + entries.add(entry.getName()); + IOUtils.copy(tar, NullOutputStream.INSTANCE); + } + return entries; + } + } + + private static class VirtualFileImpl implements VirtualFile { + private final String path; + + VirtualFileImpl(String path) { + this.path = path; + } + + @Override + public String path() { + return path; + } + + @Override + public long size() { + return path.getBytes().length; + } + + @Override + public void writeTo(OutputStream os) throws IOException { + os.write(path.getBytes()); + } + } +}