diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelAdapter.java b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelAdapter.java new file mode 100644 index 0000000000..b124e6bb14 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelAdapter.java @@ -0,0 +1,95 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +final class Jep380SocketChannelAdapter extends Socket { + private final SocketChannel channel; + private final Jep380SocketChannelImplAdapter adapter; + + Jep380SocketChannelAdapter(final SocketChannel channel) throws IOException { + this(channel, new Jep380SocketChannelImplAdapter(channel)); + } + + private Jep380SocketChannelAdapter(final SocketChannel channel, final Jep380SocketChannelImplAdapter adapter) { + this.channel = channel; + this.adapter = adapter; + } + + @Override + public void connect(final SocketAddress endpoint, final int timeout) throws IOException { + channel.connect(endpoint); + } + + @Override + public void connect(final SocketAddress endpoint) throws IOException { + channel.connect(endpoint); + } + + @Override + public boolean isConnected() { + return channel.isConnected(); + } + + @Override + public InputStream getInputStream() throws IOException { + return adapter.getInputStream(); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return adapter.getOutputStream(); + } + + @Override + public boolean isClosed() { + return !channel.isOpen(); + } + + @Override + public void close() throws IOException { + adapter.close(); + } + + @Override + public int getSoTimeout() throws SocketException { + return adapter.soTimeoutMs; + } + + @Override + public void setSoTimeout(final int timeout) throws SocketException { + adapter.soTimeoutMs = timeout; + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelImplAdapter.java b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelImplAdapter.java new file mode 100644 index 0000000000..c35a491dd7 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelImplAdapter.java @@ -0,0 +1,232 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketImpl; +import java.net.SocketOptions; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + +final class Jep380SocketChannelImplAdapter extends SocketImpl { + private final SocketChannel channel; + volatile int soTimeoutMs = 0; + + public Jep380SocketChannelImplAdapter(final SocketChannel channel) throws IOException { + this.channel = channel; + channel.configureBlocking(false); + } + + @Override + protected void close() throws IOException { + channel.close(); + } + + @Override + protected InputStream getInputStream() throws IOException { + return new InputStreamAdapter(); + } + + @Override + protected OutputStream getOutputStream() throws IOException { + return new OutputStreamAdapter(); + } + + @Override + public Object getOption(final int optID) throws SocketException { + try { + switch (optID) { + case SocketOptions.SO_TIMEOUT: + return soTimeoutMs; + case SocketOptions.SO_RCVBUF: + return channel.getOption(StandardSocketOptions.SO_RCVBUF); + case SocketOptions.SO_SNDBUF: + return channel.getOption(StandardSocketOptions.SO_SNDBUF); + } + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + throw new UnsupportedOperationException("getOption: " + optID); + } + + @Override + public void setOption(final int optID, final Object value) throws SocketException { + try { + switch (optID) { + case SocketOptions.SO_TIMEOUT: + soTimeoutMs = (Integer) value; + return; + case SocketOptions.SO_RCVBUF: + channel.setOption(StandardSocketOptions.SO_RCVBUF, (Integer) value); + return; + case SocketOptions.SO_SNDBUF: + channel.setOption(StandardSocketOptions.SO_SNDBUF, (Integer) value); + return; + } + } catch (final IOException ex) { + throw new RuntimeException(ex); + } + throw new UnsupportedOperationException(); + } + + @Override + protected void accept(final SocketImpl s) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected int available() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void bind(final InetAddress host, final int port) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void connect(final String host, final int port) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void connect(final InetAddress address, final int port) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void connect(final SocketAddress address, final int timeout) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void create(final boolean stream) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void listen(final int backlog) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void sendUrgentData(final int data) throws IOException { + throw new UnsupportedOperationException(); + } + + private class InputStreamAdapter extends InputStream { + private final Selector sel; + private final SelectionKey key; + + private InputStreamAdapter() throws IOException { + this.sel = Selector.open(); + this.key = channel.register(sel, SelectionKey.OP_READ); + } + + @Override + public int read() throws IOException { + final byte[] b = new byte[1]; + final int n = read(b, 0, 1); + return (n == -1) ? -1 : (b[0] & 0xFF); + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + final ByteBuffer buf = ByteBuffer.wrap(b, off, len); + if (sel.select(soTimeoutMs) == 0) { + throw new SocketTimeoutException(); + } + final int read = channel.read(buf); + sel.selectedKeys().clear(); + return read; + } + + @Override + public void close() throws IOException { + key.cancel(); + sel.close(); + channel.close(); + } + } + + private class OutputStreamAdapter extends OutputStream { + private final Selector sel; + private final SelectionKey key; + + private OutputStreamAdapter() throws IOException { + this.sel = Selector.open(); + this.key = channel.register(sel, SelectionKey.OP_WRITE); + } + + @Override + public void write(final int b) throws IOException { + write(new byte[]{ (byte) b}); + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + final ByteBuffer buf = ByteBuffer.wrap(b, off, len); + while (buf.hasRemaining()) { + final int n = channel.write(buf); + if (n == 0) { + if (sel.select(60_000) == 0) { + throw new SocketTimeoutException("write timed out"); + } + sel.selectedKeys().clear(); + } + } + } + + @Override + public void close() throws IOException { + key.cancel(); + sel.close(); + channel.close(); + } + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java index bd8a4a2d92..f7ae4196fe 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java @@ -37,8 +37,11 @@ import java.io.IOException; import java.lang.reflect.Method; +import java.net.ProtocolFamily; import java.net.Socket; import java.net.SocketAddress; +import java.net.StandardProtocolFamily; +import java.nio.channels.SocketChannel; import java.nio.file.Path; /** @@ -70,14 +73,14 @@ private enum Implementation { private static Implementation detectImplementation() { try { - Class.forName(JUNIXSOCKET_SOCKET_CLASS); - LOG.debug("Using JUnixSocket Unix Domain Socket implementation"); - return Implementation.JUNIXSOCKET; + Class.forName(JDK_UNIX_SOCKET_ADDRESS_CLASS); + LOG.debug("Using JDK Unix Domain Socket implementation"); + return Implementation.JDK; } catch (final ClassNotFoundException e) { try { - Class.forName(JDK_UNIX_SOCKET_ADDRESS_CLASS); - LOG.debug("Using JDK Unix Domain Socket implementation"); - return Implementation.JDK; + Class.forName(JUNIXSOCKET_SOCKET_CLASS); + LOG.debug("Using JUnixSocket Unix Domain Socket implementation"); + return Implementation.JUNIXSOCKET; } catch (final ClassNotFoundException e2) { LOG.debug("No Unix Domain Socket implementation found"); return Implementation.NONE; @@ -138,11 +141,17 @@ public Socket createSocket() throws IOException { try { if (IMPLEMENTATION == Implementation.JDK) { - // Java 16+ only supports UDS through the SocketChannel API, but the sync client is coupled - // to the legacy Socket API. In order to use Java sockets, we first need to write an - // adapter, similar to the one provided by JUnixSocket. - throw new UnsupportedOperationException("JEP 380 Unix domain sockets are not supported; use " - + "JUnixSocket"); + // Java 16+ only supports UDS through the SocketChannel API, but the sync client is coupled to the + // legacy Socket API. To facilitate this, we use an adapter, similar to the one provided by JUnixSocket. + try { + final SocketChannel channel = (SocketChannel) SocketChannel.class.getMethod("open", + ProtocolFamily.class) + .invoke(null, StandardProtocolFamily.valueOf("UNIX")); + return new Jep380SocketChannelAdapter(channel); + } catch (final ReflectiveOperationException ex) { + throw new UnsupportedOperationException("JEP 380 Unix domain sockets are not supported; use " + + "JUnixSocket", ex); + } } else { // JUnixSocket implementation final Class socketClass = Class.forName(JUNIXSOCKET_SOCKET_CLASS);