diff --git a/engine/src/main/java/de/gesellix/docker/engine/AttachConfig.java b/engine/src/main/java/de/gesellix/docker/engine/AttachConfig.java deleted file mode 100644 index d76099d5..00000000 --- a/engine/src/main/java/de/gesellix/docker/engine/AttachConfig.java +++ /dev/null @@ -1,119 +0,0 @@ -package de.gesellix.docker.engine; - -import okhttp3.Response; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.function.Function; -import java.util.function.Supplier; - -public class AttachConfig { - - private final boolean expectMultiplexedResponse; - private final Streams streams; - private final Callbacks callbacks; - - public AttachConfig() { - this(false); - } - - public AttachConfig(boolean expectMultiplexedResponse) { - this.expectMultiplexedResponse = expectMultiplexedResponse; - this.streams = new Streams(); - this.callbacks = new Callbacks(); - } - - public Streams getStreams() { - return streams; - } - - public Object onFailure(Exception e) { - return callbacks.onFailure.apply(e); - } - - public void setOnFailure(Function onFailure) { - callbacks.onFailure = onFailure; - } - - public Object onResponse(Response r) { - return callbacks.onResponse.apply(r); - } - - public void setOnResponse(Function onResponse) { - callbacks.onResponse = onResponse; - } - - public Object onSinkClosed(Response r) { - return callbacks.onSinkClosed.apply(r); - } - - public void setOnSinkClosed(Function onSinkClosed) { - callbacks.onSinkClosed = onSinkClosed; - } - - public Object onSinkWritten(Response r) { - return callbacks.onSinkWritten.apply(r); - } - - public void setOnSinkWritten(Function onSinkWritten) { - callbacks.onSinkWritten = onSinkWritten; - } - - public Object onSourceConsumed() { - return callbacks.onSourceConsumed.get(); - } - - public void setOnSourceConsumed(Supplier onSourceConsumed) { - callbacks.onSourceConsumed = onSourceConsumed; - } - - public boolean isExpectMultiplexedResponse() { - return expectMultiplexedResponse; - } - - public static class Streams { - - private InputStream stdin; - private OutputStream stdout; - private OutputStream stderr; - - public Streams() { - this.stdin = null; - this.stdout = System.out; - this.stderr = System.err; - } - - public InputStream getStdin() { - return stdin; - } - - public void setStdin(InputStream stdin) { - this.stdin = stdin; - } - - public OutputStream getStdout() { - return stdout; - } - - public void setStdout(OutputStream stdout) { - this.stdout = stdout; - } - - public OutputStream getStderr() { - return stderr; - } - - public void setStderr(OutputStream stderr) { - this.stderr = stderr; - } - } - - public static class Callbacks { - - private Function onFailure = (Exception e) -> null; - private Function onResponse = (Response r) -> null; - private Function onSinkClosed = (Response r) -> null; - private Function onSinkWritten = (Response r) -> null; - private Supplier onSourceConsumed = () -> null; - } -} diff --git a/engine/src/main/java/de/gesellix/docker/engine/EngineRequest.java b/engine/src/main/java/de/gesellix/docker/engine/EngineRequest.java index a0a41628..a9af2b33 100644 --- a/engine/src/main/java/de/gesellix/docker/engine/EngineRequest.java +++ b/engine/src/main/java/de/gesellix/docker/engine/EngineRequest.java @@ -19,7 +19,6 @@ public class EngineRequest { private int timeout = 0; private boolean async = false; - private AttachConfig attach = null; private OutputStream stdout; private String apiVersion = null; @@ -93,14 +92,6 @@ public void setAsync(boolean async) { this.async = async; } - public AttachConfig getAttach() { - return attach; - } - - public void setAttach(AttachConfig attach) { - this.attach = attach; - } - public OutputStream getStdout() { return stdout; } @@ -124,11 +115,11 @@ public boolean equals(Object o) { EngineRequest that = (EngineRequest) o; return timeout == that.timeout && async == that.async && method == that.method && Objects.equals(path, that.path) && Objects.equals(headers, that.headers) && Objects.equals(query, that.query) && Objects.equals(contentType, that.contentType) && Objects.equals(body, that.body) && - Objects.equals(attach, that.attach) && Objects.equals(stdout, that.stdout) && Objects.equals(apiVersion, that.apiVersion); + Objects.equals(stdout, that.stdout) && Objects.equals(apiVersion, that.apiVersion); } @Override public int hashCode() { - return Objects.hash(method, path, headers, query, contentType, body, timeout, async, attach, stdout, apiVersion); + return Objects.hash(method, path, headers, query, contentType, body, timeout, async, stdout, apiVersion); } } diff --git a/engine/src/main/java/de/gesellix/docker/engine/OkDockerClient.java b/engine/src/main/java/de/gesellix/docker/engine/OkDockerClient.java index ec6afb88..e66995dc 100644 --- a/engine/src/main/java/de/gesellix/docker/engine/OkDockerClient.java +++ b/engine/src/main/java/de/gesellix/docker/engine/OkDockerClient.java @@ -6,8 +6,6 @@ import de.gesellix.docker.client.filesocket.NamedPipeSocketFactory; import de.gesellix.docker.client.filesocket.UnixSocketFactory; import de.gesellix.docker.client.filesocket.UnixSocketFactorySupport; -import de.gesellix.docker.hijack.HijackingInterceptor; -import de.gesellix.docker.hijack.OkResponseCallback; import de.gesellix.docker.json.CustomObjectAdapterFactory; import de.gesellix.docker.rawstream.RawInputStream; import de.gesellix.docker.response.JsonContentHandler; @@ -148,60 +146,30 @@ public WebSocket webSocket(Map requestConfig, WebSocketListener public EngineResponse request(EngineRequest requestConfig) { EngineRequest config = ensureValidRequestConfig(requestConfig); - AttachConfig attachConfig = null; - if (config.getAttach() != null) { - Map headers = config.getHeaders(); - if (headers == null) { - headers = new HashMap<>(); - } - config.setHeaders(headers); - // https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach - // To hint potential proxies about connection hijacking, the Docker client sends connection upgrade headers. - headers.put("Upgrade", "tcp"); - headers.put("Connection", "Upgrade"); - attachConfig = config.getAttach(); - } -// boolean multiplexStreams = config.multiplexStreams - Request.Builder requestBuilder = prepareRequest(new Request.Builder(), config); final Request request = requestBuilder.build(); OkHttpClient.Builder clientBuilder = prepareClient(new OkHttpClient.Builder(), config.getTimeout()); - OkResponseCallback responseCallback = null; - if (attachConfig != null) { - clientBuilder.addNetworkInterceptor(new HijackingInterceptor( - attachConfig, - attachConfig.getStreams().getStdin() == null ? null : Okio.source(attachConfig.getStreams().getStdin()), - attachConfig.getStreams().getStdout() == null ? null : Okio.sink(attachConfig.getStreams().getStdout()))); - responseCallback = new OkResponseCallback(attachConfig); - } final OkHttpClient client = newClient(clientBuilder); log.debug(request.method() + " " + request.url() + " using proxy: " + client.proxy()); Call call = client.newCall(request); - if (responseCallback != null) { - call.enqueue(responseCallback); - log.debug("request enqueued"); - return new EngineResponse(); - } - else { - EngineResponse dockerResponse; - try { - Response response = call.execute(); - log.debug("response: " + response); - dockerResponse = handleResponse(response, config); - if (dockerResponse.getStream() == null) { + EngineResponse dockerResponse; + try { + Response response = call.execute(); + log.debug("response: " + response); + dockerResponse = handleResponse(response, config); + if (dockerResponse.getStream() == null) { // log.warn("closing response..."); - response.close(); - } - } - catch (Exception e) { - log.error("Request failed", e); - throw new RuntimeException("Request failed", e); + response.close(); } - return dockerResponse; } + catch (Exception e) { + log.error("Request failed", e); + throw new RuntimeException("Request failed", e); + } + return dockerResponse; } private Request.Builder prepareRequest(final Request.Builder builder, final EngineRequest config) { @@ -526,7 +494,6 @@ private EngineRequest ensureValidRequestConfig(final Map config, engineRequest.setBody(config.get("body")); engineRequest.setAsync(config.get("async") != null && (Boolean) config.get("async")); - engineRequest.setAttach((AttachConfig) config.get("attach")); engineRequest.setStdout((OutputStream) config.get("stdout")); engineRequest.setApiVersion((String) config.get("apiVersion")); diff --git a/engine/src/main/java/de/gesellix/docker/engine/TcpUpgradeVerificator.java b/engine/src/main/java/de/gesellix/docker/engine/TcpUpgradeVerificator.java deleted file mode 100644 index 94b511e8..00000000 --- a/engine/src/main/java/de/gesellix/docker/engine/TcpUpgradeVerificator.java +++ /dev/null @@ -1,31 +0,0 @@ -package de.gesellix.docker.engine; - -import okhttp3.Response; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.ProtocolException; - -public class TcpUpgradeVerificator { - - private static final Logger log = LoggerFactory.getLogger(TcpUpgradeVerificator.class); - - public static void ensureTcpUpgrade(final Response response) throws ProtocolException { - if (response.code() != 101) { - log.error("expected status 101, but got " + response.code() + " " + response.message()); - throw new ProtocolException("Expected HTTP 101 Connection Upgrade"); - } - - final String headerConnection = response.header("Connection"); - if (headerConnection == null || !headerConnection.equalsIgnoreCase("upgrade")) { - log.error("expected 'Connection: Upgrade', but got 'Connection: " + headerConnection + "'"); - throw new ProtocolException("Expected 'Connection: Upgrade'"); - } - - final String headerUpgrade = response.header("Upgrade"); - if (headerUpgrade == null || !headerUpgrade.equalsIgnoreCase("tcp")) { - log.error("expected 'Upgrade: tcp', but got 'Upgrade: " + headerUpgrade + "'"); - throw new ProtocolException("Expected 'Upgrade: tcp'"); - } - } -} diff --git a/engine/src/main/java/de/gesellix/docker/hijack/HijackingInterceptor.java b/engine/src/main/java/de/gesellix/docker/hijack/HijackingInterceptor.java deleted file mode 100644 index 13fbe02a..00000000 --- a/engine/src/main/java/de/gesellix/docker/hijack/HijackingInterceptor.java +++ /dev/null @@ -1,143 +0,0 @@ -package de.gesellix.docker.hijack; - -import de.gesellix.docker.engine.AttachConfig; -import de.gesellix.docker.rawstream.Frame; -import de.gesellix.docker.rawstream.FrameReader; -import okhttp3.Connection; -import okhttp3.Interceptor; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.internal.connection.RealConnection; -import okio.Buffer; -import okio.BufferedSink; -import okio.Okio; -import okio.Sink; -import okio.Source; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HijackingInterceptor implements Interceptor { - - private static final Logger log = LoggerFactory.getLogger(HijackingInterceptor.class); - - private final AttachConfig attachConfig; - private final Source stdin; - private final Sink stdout; - - public HijackingInterceptor(AttachConfig attachConfig, Source stdin, Sink stdout) { - this.attachConfig = attachConfig; - this.stdin = stdin; - this.stdout = stdout; - } - - @Override - public Response intercept(Interceptor.Chain chain) throws IOException { - Connection connection = chain.connection(); - if (connection == null) { - throw new IllegalStateException("Connection is null. This one should only be used as a network interceptor, not as application interceptor."); - } - - Sink sink = Okio.sink(connection.socket()); - Source source = Okio.source(connection.socket()); - - Request originalRequest = chain.request(); - Request modifiedRequest = originalRequest; - if (stdin != null) { - modifiedRequest = originalRequest.newBuilder() - .method(originalRequest.method(), originalRequest.body()) - .header("transfer-encoding", "chunked") -// .tag(new HijackedSink(sink)) - .build(); - } - - Response response = chain.proceed(modifiedRequest); - - if (!(response.code() == 101 || response.isSuccessful()) || stdin == null) { - return response; - } -// TcpUpgradeVerificator.ensureTcpUpgrade(response); - - connection.socket().setSoTimeout(0); - ((RealConnection) connection).setNoNewExchanges(true); - chain.call().timeout().clearTimeout().clearDeadline(); - - // stdin -> sink - Thread stdin2sink = new Thread(() -> { - Buffer tmpBuffer = new Buffer(); - try (BufferedSink bufferedSink = Okio.buffer(sink)) { - long count = 0; - while (bufferedSink.isOpen()) { - long n = stdin.read(tmpBuffer, 1024); - if (n < 0) { - log.warn("finished after " + count + " bytes"); - attachConfig.onSinkWritten(response); - break; - } - count += n; - bufferedSink.write(tmpBuffer, n); - bufferedSink.flush(); -// attachConfig.onBytesWrittenToSink(n, count); - } - } - catch (Exception e) { - log.error("error", e); - attachConfig.onFailure(e); - throw new RuntimeException(e); - } - attachConfig.onSinkClosed(response); - }); - stdin2sink.setName("stdin2sink-" + System.identityHashCode(originalRequest)); - stdin2sink.setUncaughtExceptionHandler((thread, exception) -> log.error("", exception)); - stdin2sink.setDaemon(true); - stdin2sink.start(); - - // source -> stdout - Thread source2stdout = new Thread(() -> { - Buffer tmpBuffer = new Buffer(); - try (BufferedSink bufferedSink = Okio.buffer(stdout)) { - long count = 0; - - if (true || attachConfig.isExpectMultiplexedResponse()) { - FrameReader frameReader = new FrameReader(source, attachConfig.isExpectMultiplexedResponse()); - Frame frame; - while ((frame = frameReader.readNext(Frame.class)) != null) { -// while (bufferedSink.isOpen()) { -// frame = frameReader.readNext(Frame.class); - if (frame != null && frame.getPayload() != null) { - count += frame.getPayload().length; -// tmpBuffer.write(frame.getPayload()); - bufferedSink.write(frame.getPayload()); - bufferedSink.flush(); - } - } - } - else { - while (bufferedSink.isOpen()) { - long n = source.read(tmpBuffer, 1024); - if (n < 0) { - break; - } - count += n; - bufferedSink.write(tmpBuffer, n); - bufferedSink.flush(); - } - } - } - catch (Exception e) { - log.error("error", e); - attachConfig.onFailure(e); - throw new RuntimeException(e); - } - attachConfig.onSourceConsumed(); - }); - source2stdout.setName("source2stdout-" + System.identityHashCode(originalRequest)); - source2stdout.setUncaughtExceptionHandler((thread, exception) -> log.error("", exception)); - source2stdout.setDaemon(true); - source2stdout.start(); - - attachConfig.onResponse(response); - return response; - } -} diff --git a/engine/src/main/java/de/gesellix/docker/hijack/OkResponseCallback.java b/engine/src/main/java/de/gesellix/docker/hijack/OkResponseCallback.java deleted file mode 100644 index 322f894e..00000000 --- a/engine/src/main/java/de/gesellix/docker/hijack/OkResponseCallback.java +++ /dev/null @@ -1,35 +0,0 @@ -package de.gesellix.docker.hijack; - -import de.gesellix.docker.engine.AttachConfig; -import de.gesellix.docker.engine.TcpUpgradeVerificator; -import okhttp3.Call; -import okhttp3.Callback; -import okhttp3.Response; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class OkResponseCallback implements Callback { - - private static final Logger log = LoggerFactory.getLogger(OkResponseCallback.class); - - private final AttachConfig attachConfig; - - public OkResponseCallback(AttachConfig attachConfig) { - this.attachConfig = attachConfig; - } - - @Override - public void onFailure(Call call, final IOException e) { - log.error("connection failed: " + e.getMessage(), e); - attachConfig.onFailure(e); - } - - @Override - public void onResponse(final Call call, final Response response) throws IOException { - TcpUpgradeVerificator.ensureTcpUpgrade(response); - log.debug("Response content type: " + response.header("Content-Type")); - attachConfig.onResponse(response); - } -} diff --git a/engine/src/main/java/de/gesellix/docker/rawstream/Frame.java b/engine/src/main/java/de/gesellix/docker/rawstream/Frame.java deleted file mode 100644 index 1539d70b..00000000 --- a/engine/src/main/java/de/gesellix/docker/rawstream/Frame.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.gesellix.docker.rawstream; - -import java.nio.charset.StandardCharsets; - -public class Frame { - - private final StreamType streamType; - private final byte[] payload; - - public Frame(StreamType streamType, byte[] payload) { - this.streamType = streamType; - this.payload = payload; - } - - public StreamType getStreamType() { - return streamType; - } - - public byte[] getPayload() { - return payload; - } - - public String getPayloadAsString() { - if (payload == null) { - return null; - } - return new String(payload, StandardCharsets.UTF_8).trim(); - } - - @Override - public String toString() { - return "Frame{" + - "streamType=" + streamType + - ", payload=" + getPayloadAsString() + - '}'; - } - - /** - * STREAM_TYPE can be: - *
    - *
  • 0: stdin (will be written on stdout)
  • - *
  • 1: stdout
  • - *
  • 2: stderr
  • - *
  • 3: systemerr
  • - *
- * See the paragraph _Stream format_ at https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach. - * Reference implementation: https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go. - * Docker client GoDoc: https://godoc.org/github.com/moby/moby/client#Client.ContainerAttach. - */ - public enum StreamType { - - // special case for `container.tty == false` - RAW((byte) -1), - STDIN((byte) 0), - STDOUT((byte) 1), - STDERR((byte) 2), - SYSTEMERR((byte) 3); - - StreamType(Object streamTypeId) { - this.streamTypeId = ((byte) (streamTypeId)); - } - - public static StreamType valueOf(final byte b) { - switch (b) { - case 0: - return STDIN; - case 1: - return STDOUT; - case 2: - return STDERR; - case 3: - return SYSTEMERR; - default: - throw new IllegalArgumentException("no enum value for " + String.valueOf(b) + " found."); - } - } - - public byte getStreamTypeId() { - return streamTypeId; - } - - private final byte streamTypeId; - } -} diff --git a/engine/src/main/java/de/gesellix/docker/rawstream/FrameReader.java b/engine/src/main/java/de/gesellix/docker/rawstream/FrameReader.java deleted file mode 100644 index 79395c63..00000000 --- a/engine/src/main/java/de/gesellix/docker/rawstream/FrameReader.java +++ /dev/null @@ -1,78 +0,0 @@ -package de.gesellix.docker.rawstream; - -import de.gesellix.docker.response.Reader; -import okio.Buffer; -import okio.BufferedSource; -import okio.Okio; -import okio.Source; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; - -public class FrameReader implements Reader { - - private final static Logger log = LoggerFactory.getLogger(FrameReader.class); - - private final BufferedSource bufferedSource; - private boolean expectMultiplexedResponse; - - private final Buffer buffer = new Buffer(); - - public FrameReader(Source source) { - this(source, false); - } - - public FrameReader(Source source, boolean expectMultiplexedResponse) { - this.bufferedSource = Okio.buffer(source); - this.expectMultiplexedResponse = expectMultiplexedResponse; - } - - @Override - public Frame readNext(Class type) { - if (expectMultiplexedResponse) { - // See https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach for the stream format documentation. - // header := [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4} - - try { - Frame.StreamType streamType = Frame.StreamType.valueOf(bufferedSource.readByte()); - bufferedSource.skip(3); - int frameSize = bufferedSource.readInt(); - - return new Frame(streamType, bufferedSource.readByteArray(frameSize)); - } - catch (EOFException e) { - return null; - } - catch (IOException e) { - log.error("error reading multiplexed frames", e); - return null; - } - } - else { - long byteCount; -// buffer.clear(); - try { - byteCount = bufferedSource.read(buffer, 8192L); - if (byteCount < 0) { - return null; - } - return new Frame(Frame.StreamType.RAW, buffer.readByteArray(byteCount)); - } - catch (IOException e) { - return null; - } - } - } - - @Override - public boolean hasNext() { - try { - return !Thread.currentThread().isInterrupted() && bufferedSource.isOpen() && !bufferedSource.peek().exhausted(); - } - catch (Exception e) { - return false; - } - } -} diff --git a/engine/src/main/java/de/gesellix/docker/response/LineReader.java b/engine/src/main/java/de/gesellix/docker/response/LineReader.java deleted file mode 100644 index b42d287e..00000000 --- a/engine/src/main/java/de/gesellix/docker/response/LineReader.java +++ /dev/null @@ -1,26 +0,0 @@ -package de.gesellix.docker.response; - -import okio.BufferedSource; -import okio.Okio; -import okio.Source; - -import java.io.IOException; - -public class LineReader implements Reader { - - private final BufferedSource buffer; - - public LineReader(Source source) { - this.buffer = Okio.buffer(source); - } - - @Override - public String readNext(Class type) throws IOException { - return buffer.readUtf8Line(); - } - - @Override - public boolean hasNext() throws IOException { - return !Thread.currentThread().isInterrupted() && !buffer.exhausted(); - } -} diff --git a/engine/src/test/groovy/de/gesellix/docker/engine/OkDockerClientSpec.groovy b/engine/src/test/groovy/de/gesellix/docker/engine/OkDockerClientSpec.groovy index fe8a8c3d..c319f6fb 100644 --- a/engine/src/test/groovy/de/gesellix/docker/engine/OkDockerClientSpec.groovy +++ b/engine/src/test/groovy/de/gesellix/docker/engine/OkDockerClientSpec.groovy @@ -6,7 +6,6 @@ import de.gesellix.docker.ssl.DockerSslSocket import de.gesellix.docker.ssl.SslSocketConfigFactory import de.gesellix.util.IOUtils import groovy.util.logging.Slf4j -import okhttp3.Headers import okhttp3.Interceptor import okhttp3.MediaType import okhttp3.OkHttpClient @@ -25,8 +24,6 @@ import spock.lang.Unroll import javax.net.ssl.HostnameVerifier import javax.net.ssl.SSLSession import javax.net.ssl.SSLSocket -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit import static de.gesellix.docker.engine.RequestMethod.DELETE import static de.gesellix.docker.engine.RequestMethod.GET @@ -530,68 +527,6 @@ class OkDockerClientSpec extends Specification { mockWebServer.shutdown() } - def "async request with headers"() { - given: - def mockWebServer = new MockWebServer() - mockWebServer.enqueue(new MockResponse() - .setResponseCode(101) - .addHeader("Connection", "Upgrade") - .addHeader("Upgrade", "tcp")) - mockWebServer.start() - - def errors = [] - def hasVerified = false - Closure verifyRequest = { Interceptor.Chain chain -> - def expectedHeaders = Headers.of("header-a", "header-a-value") - expectedHeaders.names().each { String k -> - def v = expectedHeaders.get(k) - if (chain.request().headers().get(k) != v) { - errors << "expected header ${k} to be ${v}, got ${chain.request().headers().get(k)}" -// throw new AssertionError("expected header ${k} to be ${v}, got ${chain.request().headers().get(k)}") - } - } - hasVerified = true - log.info("verified: ${errors}") - true - } - def client = new OkDockerClient() { - - @Override - OkHttpClient newClient(OkHttpClient.Builder clientBuilder) { - clientBuilder - .addNetworkInterceptor(new TestInterceptor(verifyRequest, { r, c -> true })) - .build() - } - } - client.dockerClientConfig.apply(new DockerEnv( - dockerHost: mockWebServer.url("/").toString())) - - when: - def latch = new CountDownLatch(1) - def onResponse = { - latch.countDown() - } - - client.request(new EngineRequest(OPTIONS, "/a-resource") - .tap { - attach = new AttachConfig(onResponse: onResponse) - headers = [ - "header-a" : "header-a-value", - "Upgrade" : "tcp", - "Connection": "Upgrade" - ] - }) - then: - latch.await(10, TimeUnit.SECONDS) - and: - errors.empty - and: - hasVerified - - cleanup: - mockWebServer.shutdown() - } - def "connect via plain http connection"() { given: def mockWebServer = new MockWebServer() diff --git a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy index 341757ba..154045e7 100644 --- a/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy +++ b/integrationtest/src/test/groovy/de/gesellix/docker/engine/OkDockerClientIntegrationSpec.groovy @@ -1,19 +1,13 @@ package de.gesellix.docker.engine import groovy.util.logging.Slf4j -import okhttp3.Response import okio.Okio -import spock.lang.Ignore import spock.lang.IgnoreIf import spock.lang.Requires import spock.lang.Specification -import spock.lang.Unroll - -import java.util.concurrent.CountDownLatch import static de.gesellix.docker.engine.RequestMethod.GET import static de.gesellix.docker.engine.TestConstants.CONSTANTS -import static java.util.concurrent.TimeUnit.SECONDS @Slf4j @Requires({ LocalDocker.available() }) @@ -114,98 +108,4 @@ class OkDockerClientIntegrationSpec extends Specification { dockerHost.startsWith("npipe://") response.status.code == 200 } - - @Ignore("Attach is broken") - @Unroll - def "attach (openStdin: #openStdin, tty: #tty)"() { - given: - def client = new OkDockerClient() - - // pull image (ensure it exists locally) - client.post([path : "/images/create", - query: [fromImage: CONSTANTS.imageName]]) - // create container - def containerConfig = [ - Tty : tty, - OpenStdin : openStdin, - Image : CONSTANTS.imageName, - Entrypoint: ["/cat"] - ] - String containerId = client.post([path : "/containers/create".toString(), - query : [name: ""], - body : containerConfig, - requestContentType: "application/json"]).content.Id - // start container - client.post([path : "/containers/${containerId}/start".toString(), - requestContentType: "application/json"]) - // resize container TTY -// client.post([path : "/containers/${containerId}/attach/resize".toString(), -// query: [h: 46, w: 158]]) - // inspect container -// boolean multiplexStreams = !client.get([path: "/containers/${containerId}/json".toString()]).content.Config.Tty - - String content = "attach ${UUID.randomUUID()}" - String expectedOutput = containerConfig.Tty ? "$content\r\n$content\r\n" : "$content\n" - - def stdout = new ByteArrayOutputStream(expectedOutput.length()) - def stdin = new PipedOutputStream() - - def onSinkClosed = new CountDownLatch(1) - def onSinkWritten = new CountDownLatch(1) - def onSourceConsumed = new CountDownLatch(1) - - def attachConfig = new AttachConfig(!containerConfig.Tty) - attachConfig.streams.stdin = new PipedInputStream(stdin) -// attachConfig.streams.stdout = stdout - attachConfig.streams.stdout = new TeeOutputStream(stdout, System.out) - attachConfig.onResponse = { Response response -> - log.info("[attach (interactive)] got response") - } - attachConfig.onSinkClosed = { Response response -> - log.info("[attach (interactive)] sink closed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") - onSinkClosed.countDown() - } - attachConfig.onSinkWritten = { Response response -> - log.info("[attach (interactive)] sink written (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") - onSinkWritten.countDown() - } - attachConfig.onSourceConsumed = { - if (stdout.toString() == expectedOutput) { - log.info("[attach (interactive)] consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") - onSourceConsumed.countDown() - } else { - log.info("[attach (interactive)] consumed (complete: ${stdout.toString() == expectedOutput})\n${stdout.toString()}") - } - } -// new OkDockerClient().post([ - client.post([ - path : "/containers/${containerId}/attach".toString(), - query : [logs: true, stream: true, stdin: true, stdout: true, stderr: true], - attach: attachConfig]) - - when: - stdin.write("$content\n".bytes) - stdin.flush() - stdin.close() - boolean sinkWritten = onSinkWritten.await(5, SECONDS) - boolean sinkClosed = onSinkClosed.await(5, SECONDS) - boolean sourceConsumed = onSourceConsumed.await(5, SECONDS) - - then: - sinkClosed - sinkWritten - sourceConsumed - stdout.size() > 0 - stdout.toByteArray() == expectedOutput.bytes - - cleanup: - client.post([path: "/containers/${containerId}/stop".toString(), query: [t: 10]]) - client.post([path: "/containers/${containerId}/wait".toString()]) - client.delete([path: "/containers/${containerId}".toString(), query: [:]]) - - where: - tty | openStdin - false | true -// true | true // TODO: fix on Windows - } }