Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ configuration for [pre-commit](https://pre-commit.com). Install the hook with
pipenv shell
```

From this shell, run `pip install -e .` to create `launchable` wrapper script from the workspace.
This is useful to test the CLI with real/local server.

## Run tests cli

```shell
Expand Down
Binary file modified launchable/jar/exe_deploy.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.http.entity.ContentProducer;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
Expand All @@ -12,7 +11,7 @@
/**
* Accepts T, buffers them, and writes them out as a batch.
*/
abstract class ChunkStreamer<T> implements FlushableConsumer<T>, Closeable {
abstract class ChunkStreamer<T> implements FlushableConsumer<T> {
/**
* Encapsulation of how batches are sent.
*/
Expand Down
296 changes: 178 additions & 118 deletions src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,34 @@
* Receives {@link GitFile}, buffers them, and writes them out in a gzipped tar file.
*/
final class FileChunkStreamer extends ChunkStreamer<VirtualFile> {
FileChunkStreamer(IOConsumer<ContentProducer> sender, int chunkSize) {
private final VirtualFile header;

FileChunkStreamer(VirtualFile header, IOConsumer<ContentProducer> sender, int chunkSize) {
super(sender, chunkSize);
this.header = header;
}

@Override
protected void writeTo(List<VirtualFile> files, OutputStream os) throws IOException {
try (TarArchiveOutputStream tar = new TarArchiveOutputStream(os, "UTF-8")) {
tar.setLongFileMode(LONGFILE_POSIX);

if (header!=null) {
write(header, tar);
}

for (VirtualFile f : files) {
TarArchiveEntry e = new TarArchiveEntry(f.path());
e.setSize(f.size());
e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID
tar.putArchiveEntry(e);
f.writeTo(tar);
tar.closeArchiveEntry();
write(f, tar);
}
}
}

private static void write(VirtualFile f, TarArchiveOutputStream tar) throws IOException {
TarArchiveEntry e = new TarArchiveEntry(f.path());
e.setSize(f.size());
e.setGroupName(f.blob().name()); // HACK - reuse the group name field to store the blob ID
tar.putArchiveEntry(e);
f.writeTo(tar);
tar.closeArchiveEntry();
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.launchableinc.ingest.commits;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;

/**
* Consumers that spool items it accepts and process them in bulk.
*/
public interface FlushableConsumer<T> extends Consumer<T> {
public interface FlushableConsumer<T> extends Consumer<T>, Closeable {
/**
* Process all items that have been accepted so far.
*/
Expand All @@ -15,7 +16,12 @@ public interface FlushableConsumer<T> extends Consumer<T> {
static <T> FlushableConsumer<T> of(Consumer<T> c) {
return new FlushableConsumer<T>() {
@Override
public void flush() throws IOException {
public void flush() {
// noop
}

@Override
public void close() {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.launchableinc.ingest.commits;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static java.time.Instant.now;

/**
* Given multiple concurrent slow {@link Consumer}s, each g oing over a large
* number of items in parallel,
* provide a progress report to show that the work is still in progress.
*/
class ProgressReporter<T> {
private final Function<T, String> printer;
private final Duration reportInterval;
private Instant nextReportTime;

/**
* Number of items that need to be processed, across all consumers.
*/
private final AtomicInteger workload = new AtomicInteger();
/**
* Number of items that have already been processed, across all consumers.
*/
private final AtomicInteger completed = new AtomicInteger();

ProgressReporter(Function<T, String> printer, Duration reportInterval) {
this.printer = printer;
this.reportInterval = reportInterval;
this.nextReportTime = now().plus(reportInterval);
}

/**
* Deals with one serial stream of work.
*/
class Consumer implements FlushableConsumer<T> {
private final FlushableConsumer<T> base;
private final List<T> pool = new ArrayList<>();

Consumer(FlushableConsumer<T> base) {
this.base = base;
}

@Override
public void accept(T t) {
pool.add(t);
workload.incrementAndGet();
}

@Override
public void flush() throws IOException {
for (T x : pool) {
synchronized (ProgressReporter.this) {
if (now().isAfter(nextReportTime)) {
print(completed.get(), workload.get(), x);
nextReportTime = now().plus(reportInterval);
}
}
base.accept(x);
completed.incrementAndGet();
}
pool.clear();
base.flush();
}

@Override
public void close() throws IOException {
flush();
}
}

Consumer newConsumer(FlushableConsumer<T> base) {
return new Consumer(base);
}

protected void print(int c, int w, T x) {
int width = String.valueOf(w).length();
System.err.printf("%s/%d: %s%n", pad(c, width), w, printer.apply(x));
}

static String pad(int i, int width) {
String s = String.valueOf(i);
while (s.length() < width) {
s = " " + s;
}
return s;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
MainTest.class,
FileChunkStreamerTest.class,
SSLBypassTest.class,
ProgressReportingConsumerTest.class
ProgressReporterTest.class
})
public class AllTests {}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void bareRepo() throws Exception {
try (Repository r = Git.open(barerepoDir).getRepository()) {
CommitGraphCollector cgc = collectCommit(r, ImmutableList.of());
assertThat(cgc.getCommitsSent()).isEqualTo(1);
assertThat(cgc.getFilesSent()).isEqualTo(2); // header + .gitmodules
assertThat(cgc.getFilesSent()).isEqualTo(1); // .gitmodules
}
}

Expand Down Expand Up @@ -121,7 +121,7 @@ public void chunking() throws Exception {
2);
}
assertThat(councCommitChunks[0]).isEqualTo(2);
assertThat(countFilesChunks[0]).isEqualTo(3); // header, a, .gitmodules, and header, sub/x, 5 files, 3 chunks
assertThat(countFilesChunks[0]).isEqualTo(2); // a, .gitmodules, and sub/x, 3 files, 2 chunks
}

private void assertValidTar(ContentProducer content) throws IOException {
Expand Down Expand Up @@ -176,14 +176,8 @@ public void header() throws Exception {

CommitGraphCollector cgc = new CommitGraphCollector("test", mainrepo.getRepository());
cgc.collectFiles(true);
cgc.new ByRepository(mainrepo.getRepository(), "main")
.transfer(Collections.emptyList(), c -> {},
new PassThroughTreeReceiverImpl(),
FlushableConsumer.of(files::add));

// header for the main repo, 'gitmodules', header for the sub repo, 'a', and 'x' in the sub repo
assertThat(files).hasSize(5);
VirtualFile header = files.get(2);
VirtualFile header = cgc.new ByRepository(mainrepo.getRepository(), "main").buildHeader();
assertThat(header.path()).isEqualTo(CommitGraphCollector.HEADER_FILE);
JsonNode tree = assertValidJson(header::writeTo).get("tree");
assertThat(tree.isArray()).isTrue();
Expand All @@ -193,7 +187,7 @@ cgc.new ByRepository(mainrepo.getRepository(), "main")
paths.add(i.get("path").asText());
}

assertThat(paths).containsExactly("a", "x");
assertThat(paths).containsExactly(".gitmodules", "sub");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@
public class FileChunkStreamerTest {
@Test
public void no_op_if_no_content() throws Exception {
try (FileChunkStreamer fs = new FileChunkStreamer(content -> fail(), 2)) {
try (FileChunkStreamer fs = new FileChunkStreamer(null, content -> fail(), 2)) {
// no write
}
}

@Test
public void basics() throws Exception {
int[] count = new int[1];
try (FileChunkStreamer fs = new FileChunkStreamer(content -> {
try (FileChunkStreamer fs = new FileChunkStreamer(new VirtualFileImpl("header.txt"), content -> {
switch(count[0]++) {
case 0:
assertThat(readEntries(content)).containsExactly("foo.txt", "bar.txt").inOrder();
assertThat(readEntries(content)).containsExactly("header.txt", "foo.txt", "bar.txt").inOrder();
break;
case 1:
assertThat(readEntries(content)).containsExactly("zot.txt").inOrder();
assertThat(readEntries(content)).containsExactly("header.txt", "zot.txt").inOrder();
break;
default:
fail();
Expand Down
Loading
Loading