From 94d73d3d26240b05944f44bf6d7f01a23412620d Mon Sep 17 00:00:00 2001 From: Ryan Schmitt Date: Sat, 28 Jun 2025 13:12:14 -0700 Subject: [PATCH] Enable JEP 380 UDS support for sync client This change enables the sync client to use standard library Unix-domain sockets on Java 16+ without a JUnixSocket dependency, as is already supported by the async client. JEP 380 only exposes Unix-domain socket support through the `SocketChannel` API, but the sync client is tightly coupled to the legacy `java.net.Socket` API. The idea behind this change is to wrap the UDS `SocketChannel` in a subclass of `java.net.Socket` that acts as a translation layer between the legacy socket API and `SocketChannel`. In particular, the adapter adds support for socket timeouts by translating blocking reads to non-blocking reads and supplying the socket timeout value to the `select()` call. After this change, HttpClient will use the Java standard library by default. JUnixSocket will only be loaded on older versions of Java that lack standard library support for UDS, and even then only for the synchronous client, as the JUnixSocket-provided `SocketChannel` cannot be used with the JDK-provided `Selector` used by `IOReactor`. --- .../socket/Jep380SocketChannelAdapter.java | 95 +++++++ .../Jep380SocketChannelImplAdapter.java | 232 ++++++++++++++++++ .../http/socket/UnixDomainSocketFactory.java | 31 ++- 3 files changed, 347 insertions(+), 11 deletions(-) create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelAdapter.java create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelImplAdapter.java diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelAdapter.java b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelAdapter.java new file mode 100644 index 0000000000..b124e6bb14 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelAdapter.java @@ -0,0 +1,95 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +final class Jep380SocketChannelAdapter extends Socket { + private final SocketChannel channel; + private final Jep380SocketChannelImplAdapter adapter; + + Jep380SocketChannelAdapter(final SocketChannel channel) throws IOException { + this(channel, new Jep380SocketChannelImplAdapter(channel)); + } + + private Jep380SocketChannelAdapter(final SocketChannel channel, final Jep380SocketChannelImplAdapter adapter) { + this.channel = channel; + this.adapter = adapter; + } + + @Override + public void connect(final SocketAddress endpoint, final int timeout) throws IOException { + channel.connect(endpoint); + } + + @Override + public void connect(final SocketAddress endpoint) throws IOException { + channel.connect(endpoint); + } + + @Override + public boolean isConnected() { + return channel.isConnected(); + } + + @Override + public InputStream getInputStream() throws IOException { + return adapter.getInputStream(); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return adapter.getOutputStream(); + } + + @Override + public boolean isClosed() { + return !channel.isOpen(); + } + + @Override + public void close() throws IOException { + adapter.close(); + } + + @Override + public int getSoTimeout() throws SocketException { + return adapter.soTimeoutMs; + } + + @Override + public void setSoTimeout(final int timeout) throws SocketException { + adapter.soTimeoutMs = timeout; + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelImplAdapter.java b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelImplAdapter.java new file mode 100644 index 0000000000..c35a491dd7 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/Jep380SocketChannelImplAdapter.java @@ -0,0 +1,232 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketImpl; +import java.net.SocketOptions; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + +final class Jep380SocketChannelImplAdapter extends SocketImpl { + private final SocketChannel channel; + volatile int soTimeoutMs = 0; + + public Jep380SocketChannelImplAdapter(final SocketChannel channel) throws IOException { + this.channel = channel; + channel.configureBlocking(false); + } + + @Override + protected void close() throws IOException { + channel.close(); + } + + @Override + protected InputStream getInputStream() throws IOException { + return new InputStreamAdapter(); + } + + @Override + protected OutputStream getOutputStream() throws IOException { + return new OutputStreamAdapter(); + } + + @Override + public Object getOption(final int optID) throws SocketException { + try { + switch (optID) { + case SocketOptions.SO_TIMEOUT: + return soTimeoutMs; + case SocketOptions.SO_RCVBUF: + return channel.getOption(StandardSocketOptions.SO_RCVBUF); + case SocketOptions.SO_SNDBUF: + return channel.getOption(StandardSocketOptions.SO_SNDBUF); + } + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + throw new UnsupportedOperationException("getOption: " + optID); + } + + @Override + public void setOption(final int optID, final Object value) throws SocketException { + try { + switch (optID) { + case SocketOptions.SO_TIMEOUT: + soTimeoutMs = (Integer) value; + return; + case SocketOptions.SO_RCVBUF: + channel.setOption(StandardSocketOptions.SO_RCVBUF, (Integer) value); + return; + case SocketOptions.SO_SNDBUF: + channel.setOption(StandardSocketOptions.SO_SNDBUF, (Integer) value); + return; + } + } catch (final IOException ex) { + throw new RuntimeException(ex); + } + throw new UnsupportedOperationException(); + } + + @Override + protected void accept(final SocketImpl s) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected int available() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void bind(final InetAddress host, final int port) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void connect(final String host, final int port) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void connect(final InetAddress address, final int port) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void connect(final SocketAddress address, final int timeout) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void create(final boolean stream) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void listen(final int backlog) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void sendUrgentData(final int data) throws IOException { + throw new UnsupportedOperationException(); + } + + private class InputStreamAdapter extends InputStream { + private final Selector sel; + private final SelectionKey key; + + private InputStreamAdapter() throws IOException { + this.sel = Selector.open(); + this.key = channel.register(sel, SelectionKey.OP_READ); + } + + @Override + public int read() throws IOException { + final byte[] b = new byte[1]; + final int n = read(b, 0, 1); + return (n == -1) ? -1 : (b[0] & 0xFF); + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + final ByteBuffer buf = ByteBuffer.wrap(b, off, len); + if (sel.select(soTimeoutMs) == 0) { + throw new SocketTimeoutException(); + } + final int read = channel.read(buf); + sel.selectedKeys().clear(); + return read; + } + + @Override + public void close() throws IOException { + key.cancel(); + sel.close(); + channel.close(); + } + } + + private class OutputStreamAdapter extends OutputStream { + private final Selector sel; + private final SelectionKey key; + + private OutputStreamAdapter() throws IOException { + this.sel = Selector.open(); + this.key = channel.register(sel, SelectionKey.OP_WRITE); + } + + @Override + public void write(final int b) throws IOException { + write(new byte[]{ (byte) b}); + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + final ByteBuffer buf = ByteBuffer.wrap(b, off, len); + while (buf.hasRemaining()) { + final int n = channel.write(buf); + if (n == 0) { + if (sel.select(60_000) == 0) { + throw new SocketTimeoutException("write timed out"); + } + sel.selectedKeys().clear(); + } + } + } + + @Override + public void close() throws IOException { + key.cancel(); + sel.close(); + channel.close(); + } + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java index bd8a4a2d92..f7ae4196fe 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/socket/UnixDomainSocketFactory.java @@ -37,8 +37,11 @@ import java.io.IOException; import java.lang.reflect.Method; +import java.net.ProtocolFamily; import java.net.Socket; import java.net.SocketAddress; +import java.net.StandardProtocolFamily; +import java.nio.channels.SocketChannel; import java.nio.file.Path; /** @@ -70,14 +73,14 @@ private enum Implementation { private static Implementation detectImplementation() { try { - Class.forName(JUNIXSOCKET_SOCKET_CLASS); - LOG.debug("Using JUnixSocket Unix Domain Socket implementation"); - return Implementation.JUNIXSOCKET; + Class.forName(JDK_UNIX_SOCKET_ADDRESS_CLASS); + LOG.debug("Using JDK Unix Domain Socket implementation"); + return Implementation.JDK; } catch (final ClassNotFoundException e) { try { - Class.forName(JDK_UNIX_SOCKET_ADDRESS_CLASS); - LOG.debug("Using JDK Unix Domain Socket implementation"); - return Implementation.JDK; + Class.forName(JUNIXSOCKET_SOCKET_CLASS); + LOG.debug("Using JUnixSocket Unix Domain Socket implementation"); + return Implementation.JUNIXSOCKET; } catch (final ClassNotFoundException e2) { LOG.debug("No Unix Domain Socket implementation found"); return Implementation.NONE; @@ -138,11 +141,17 @@ public Socket createSocket() throws IOException { try { if (IMPLEMENTATION == Implementation.JDK) { - // Java 16+ only supports UDS through the SocketChannel API, but the sync client is coupled - // to the legacy Socket API. In order to use Java sockets, we first need to write an - // adapter, similar to the one provided by JUnixSocket. - throw new UnsupportedOperationException("JEP 380 Unix domain sockets are not supported; use " - + "JUnixSocket"); + // Java 16+ only supports UDS through the SocketChannel API, but the sync client is coupled to the + // legacy Socket API. To facilitate this, we use an adapter, similar to the one provided by JUnixSocket. + try { + final SocketChannel channel = (SocketChannel) SocketChannel.class.getMethod("open", + ProtocolFamily.class) + .invoke(null, StandardProtocolFamily.valueOf("UNIX")); + return new Jep380SocketChannelAdapter(channel); + } catch (final ReflectiveOperationException ex) { + throw new UnsupportedOperationException("JEP 380 Unix domain sockets are not supported; use " + + "JUnixSocket", ex); + } } else { // JUnixSocket implementation final Class socketClass = Class.forName(JUNIXSOCKET_SOCKET_CLASS);