From ca9fcf5201fbedd0d86727bed38f82342e05ca26 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Mon, 28 Apr 2025 16:13:37 -0400 Subject: [PATCH 1/8] Add test for file descriptors --- .../common/socket/TunnelingJdkSocketTest.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 74cca0d4bd1..937250a5f57 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -6,6 +6,7 @@ import datadog.trace.api.Config; import java.io.IOException; import java.io.InputStream; +import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.SocketException; import java.net.StandardProtocolFamily; @@ -15,6 +16,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -119,6 +122,59 @@ public void testBufferSizes() throws Exception { isServerRunning.set(false); } + @Test + public void testFileDescriptorLeak() throws Exception { + if (!Config.get().isJdkSocketEnabled()) { + System.out.println( + "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); + return; + } + long initialCount = getFileDescriptorCount(); + System.out.println("Initial file descriptor count: " + initialCount); + + Path socketPath = getSocketPath(); + UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); + startServer(socketAddress); + TunnelingJdkSocket clientSocket = createClient(socketPath); + + List streams = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + InputStream inputStream = clientSocket.getInputStream(); + streams.add(inputStream); + long currentCount = getFileDescriptorCount(); + System.out.println("Iteration " + i + ", File descriptor count: " + currentCount); + assertTrue(currentCount <= initialCount + 7); // why +7? + } + + clientSocket.close(); + streams.clear(); + isServerRunning.set(false); + + long finalCount = getFileDescriptorCount(); + System.out.println("Final file descriptor count: " + finalCount); + assertTrue(finalCount <= initialCount + 3); // why +3? + } + + private long getFileDescriptorCount() { + try { + Process process = Runtime.getRuntime().exec("lsof -p " + getPid()); + int count = 0; + try (java.io.BufferedReader reader = + new java.io.BufferedReader(new java.io.InputStreamReader(process.getInputStream()))) { + while (reader.readLine() != null) { + count++; + } + } + return count; + } catch (IOException e) { + throw new RuntimeException("Failed to get file descriptor count", e); + } + } + + private String getPid() { + return ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; + } + private Path getSocketPath() throws IOException { Path socketPath = Files.createTempFile("testSocket", null); Files.delete(socketPath); From a098bb0eec445f8bf917400bbdb0805e737456bc Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Mon, 28 Apr 2025 16:14:45 -0400 Subject: [PATCH 2/8] Fix closing technique --- .../common/socket/TunnelingJdkSocket.java | 44 +++++++++++++++---- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index 063cd64c740..b4385fad36b 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -29,6 +29,8 @@ final class TunnelingJdkSocket extends Socket { private InetSocketAddress inetSocketAddress; private SocketChannel unixSocketChannel; + private Selector selector; + private boolean wasBlocking; private int timeout; private boolean shutIn; @@ -196,14 +198,16 @@ public InputStream getInputStream() throws IOException { throw new SocketException("Socket input is shutdown"); } + if (selector == null) { + selector = Selector.open(); + wasBlocking = unixSocketChannel.isBlocking(); + unixSocketChannel.configureBlocking(false); + unixSocketChannel.register(selector, SelectionKey.OP_READ); + } + return new InputStream() { private final ByteBuffer buffer = ByteBuffer.allocate(getStreamBufferSize()); - private final Selector selector = Selector.open(); - - { - unixSocketChannel.configureBlocking(false); - unixSocketChannel.register(selector, SelectionKey.OP_READ); - } + private boolean isClosed = false; @Override public int read() throws IOException { @@ -213,6 +217,9 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { + if (isClosed) { + throw new IOException("Stream is closed."); + } buffer.clear(); int readyChannels = selector.select(timeout); @@ -241,7 +248,15 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public void close() throws IOException { + if (isClosed) { + return; + } + isClosed = true; selector.close(); + selector = null; + if (unixSocketChannel != null) { + unixSocketChannel.configureBlocking(wasBlocking); + } } }; } @@ -254,11 +269,13 @@ public OutputStream getOutputStream() throws IOException { if (!isConnected()) { throw new SocketException("Socket is not connected"); } - if (isInputShutdown()) { + if (isOutputShutdown()) { throw new SocketException("Socket output is shutdown"); } return new OutputStream() { + private boolean isClosed = false; + @Override public void write(int b) throws IOException { byte[] array = ByteBuffer.allocate(4).putInt(b).array(); @@ -267,8 +284,10 @@ public void write(int b) throws IOException { @Override public void write(byte[] b, int off, int len) throws IOException { + if (isClosed) { + throw new IOException("Stream is closed"); + } ByteBuffer buffer = ByteBuffer.wrap(b, off, len); - while (buffer.hasRemaining()) { unixSocketChannel.write(buffer); } @@ -316,7 +335,14 @@ public void close() throws IOException { if (isClosed()) { return; } - if (null != unixSocketChannel) { + if (selector != null) { + selector.close(); + selector = null; + if (unixSocketChannel != null) { + unixSocketChannel.configureBlocking(wasBlocking); + } + } + if (unixSocketChannel != null) { unixSocketChannel.close(); } closed = true; From bfcb079c386c9d9cb9f60c67c6fc51b5c18b0087 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 1 May 2025 15:24:00 -0400 Subject: [PATCH 3/8] Add tests for socket and stream connection and closing --- .../common/socket/TunnelingJdkSocketTest.java | 155 +++++++++++++++--- 1 file changed, 129 insertions(+), 26 deletions(-) diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 937250a5f57..057cc03c937 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -6,6 +6,7 @@ import datadog.trace.api.Config; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.SocketException; @@ -16,8 +17,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -26,7 +25,7 @@ public class TunnelingJdkSocketTest { private static final AtomicBoolean isServerRunning = new AtomicBoolean(false); @Test - public void testTimeout() throws Exception { + public void testSocketConnect() throws Exception { if (!Config.get().isJdkSocketEnabled()) { System.out.println( "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); @@ -36,7 +35,120 @@ public void testTimeout() throws Exception { Path socketPath = getSocketPath(); UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); startServer(socketAddress); - TunnelingJdkSocket clientSocket = createClient(socketPath); + TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); + + assertFalse(clientSocket.isConnected()); + assertFalse(clientSocket.isClosed()); + clientSocket.connect(new InetSocketAddress("localhost", 0)); + assertTrue(clientSocket.isConnected()); + assertFalse(clientSocket.isClosed()); + assertFalse(clientSocket.isInputShutdown()); + assertFalse(clientSocket.isOutputShutdown()); + assertThrows( + SocketException.class, () -> clientSocket.connect(new InetSocketAddress("localhost", 0))); + clientSocket.close(); + assertFalse(clientSocket.isConnected()); + assertTrue(clientSocket.isClosed()); + assertTrue(clientSocket.isInputShutdown()); + assertTrue(clientSocket.isOutputShutdown()); + clientSocket.close(); + + isServerRunning.set(false); + } + + @Test + public void testSocketClose() throws Exception { + if (!Config.get().isJdkSocketEnabled()) { + System.out.println( + "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); + return; + } + + TunnelingJdkSocket clientSocket = createClient(); + InputStream inputStream = clientSocket.getInputStream(); + OutputStream outputStream = clientSocket.getOutputStream(); + + assertFalse(clientSocket.isClosed()); + assertFalse(clientSocket.isInputShutdown()); + assertFalse(clientSocket.isOutputShutdown()); + assertTrue(clientSocket.isConnected()); + clientSocket.close(); + assertTrue(clientSocket.isClosed()); + assertTrue(clientSocket.isInputShutdown()); + assertTrue(clientSocket.isOutputShutdown()); + assertFalse(clientSocket.isConnected()); + assertThrows(IOException.class, () -> inputStream.read()); + assertThrows(IOException.class, () -> outputStream.write(1)); + assertThrows(SocketException.class, () -> clientSocket.getInputStream()); + assertThrows(SocketException.class, () -> clientSocket.getOutputStream()); + clientSocket.close(); + + isServerRunning.set(false); + } + + @Test + public void testInputStreamClose() throws Exception { + if (!Config.get().isJdkSocketEnabled()) { + System.out.println( + "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); + return; + } + + TunnelingJdkSocket clientSocket = createClient(); + InputStream inputStream = clientSocket.getInputStream(); + OutputStream outputStream = clientSocket.getOutputStream(); + + assertFalse(clientSocket.isClosed()); + assertFalse(clientSocket.isInputShutdown()); + assertFalse(clientSocket.isOutputShutdown()); + inputStream.close(); + assertTrue(clientSocket.isClosed()); + assertTrue(clientSocket.isInputShutdown()); + assertTrue(clientSocket.isOutputShutdown()); + assertThrows(IOException.class, () -> inputStream.read()); + assertThrows(IOException.class, () -> outputStream.write(1)); + assertThrows(SocketException.class, () -> clientSocket.getInputStream()); + assertThrows(SocketException.class, () -> clientSocket.getOutputStream()); + + isServerRunning.set(false); + } + + @Test + public void testOutputStreamClose() throws Exception { + if (!Config.get().isJdkSocketEnabled()) { + System.out.println( + "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); + return; + } + + TunnelingJdkSocket clientSocket = createClient(); + InputStream inputStream = clientSocket.getInputStream(); + OutputStream outputStream = clientSocket.getOutputStream(); + + assertFalse(clientSocket.isClosed()); + assertFalse(clientSocket.isInputShutdown()); + assertFalse(clientSocket.isOutputShutdown()); + outputStream.close(); + assertTrue(clientSocket.isClosed()); + assertTrue(clientSocket.isInputShutdown()); + assertTrue(clientSocket.isOutputShutdown()); + assertThrows(IOException.class, () -> inputStream.read()); + assertThrows(IOException.class, () -> outputStream.write(1)); + assertThrows(SocketException.class, () -> clientSocket.getInputStream()); + assertThrows(SocketException.class, () -> clientSocket.getOutputStream()); + + isServerRunning.set(false); + } + + @Test + public void testTimeout() throws Exception { + if (!Config.get().isJdkSocketEnabled()) { + System.out.println( + "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); + return; + } + + TunnelingJdkSocket clientSocket = createClient(); InputStream inputStream = clientSocket.getInputStream(); int testTimeout = 1000; @@ -86,10 +198,7 @@ public void testBufferSizes() throws Exception { return; } - Path socketPath = getSocketPath(); - UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); - startServer(socketAddress); - TunnelingJdkSocket clientSocket = createClient(socketPath); + TunnelingJdkSocket clientSocket = createClient(); assertEquals(TunnelingJdkSocket.DEFAULT_BUFFER_SIZE, clientSocket.getSendBufferSize()); assertEquals(TunnelingJdkSocket.DEFAULT_BUFFER_SIZE, clientSocket.getReceiveBufferSize()); @@ -130,28 +239,19 @@ public void testFileDescriptorLeak() throws Exception { return; } long initialCount = getFileDescriptorCount(); - System.out.println("Initial file descriptor count: " + initialCount); - Path socketPath = getSocketPath(); - UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); - startServer(socketAddress); - TunnelingJdkSocket clientSocket = createClient(socketPath); + TunnelingJdkSocket clientSocket = createClient(); - List streams = new ArrayList<>(); for (int i = 0; i < 100; i++) { InputStream inputStream = clientSocket.getInputStream(); - streams.add(inputStream); long currentCount = getFileDescriptorCount(); - System.out.println("Iteration " + i + ", File descriptor count: " + currentCount); assertTrue(currentCount <= initialCount + 7); // why +7? } clientSocket.close(); - streams.clear(); isServerRunning.set(false); long finalCount = getFileDescriptorCount(); - System.out.println("Final file descriptor count: " + finalCount); assertTrue(finalCount <= initialCount + 3); // why +3? } @@ -175,13 +275,6 @@ private String getPid() { return ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; } - private Path getSocketPath() throws IOException { - Path socketPath = Files.createTempFile("testSocket", null); - Files.delete(socketPath); - socketPath.toFile().deleteOnExit(); - return socketPath; - } - private static void startServer(UnixDomainSocketAddress socketAddress) { Thread serverThread = new Thread( @@ -215,7 +308,17 @@ private static void startServer(UnixDomainSocketAddress socketAddress) { } } - private TunnelingJdkSocket createClient(Path socketPath) throws IOException { + private Path getSocketPath() throws IOException { + Path socketPath = Files.createTempFile("testSocket", null); + Files.delete(socketPath); + socketPath.toFile().deleteOnExit(); + return socketPath; + } + + private TunnelingJdkSocket createClient() throws IOException { + Path socketPath = getSocketPath(); + UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); + startServer(socketAddress); TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); clientSocket.connect(new InetSocketAddress("localhost", 0)); return clientSocket; From d12cf3f990a682b8131c9321d547542d6f1b9cd9 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 1 May 2025 15:26:44 -0400 Subject: [PATCH 4/8] Fix closing technique again --- .../common/socket/TunnelingJdkSocket.java | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index b4385fad36b..ce5b1335b33 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -30,7 +30,6 @@ final class TunnelingJdkSocket extends Socket { private SocketChannel unixSocketChannel; private Selector selector; - private boolean wasBlocking; private int timeout; private boolean shutIn; @@ -200,14 +199,12 @@ public InputStream getInputStream() throws IOException { if (selector == null) { selector = Selector.open(); - wasBlocking = unixSocketChannel.isBlocking(); unixSocketChannel.configureBlocking(false); unixSocketChannel.register(selector, SelectionKey.OP_READ); } return new InputStream() { private final ByteBuffer buffer = ByteBuffer.allocate(getStreamBufferSize()); - private boolean isClosed = false; @Override public int read() throws IOException { @@ -217,8 +214,8 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { - if (isClosed) { - throw new IOException("Stream is closed."); + if (isInputShutdown()) { + throw new IOException("Stream closed"); } buffer.clear(); @@ -248,15 +245,7 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public void close() throws IOException { - if (isClosed) { - return; - } - isClosed = true; - selector.close(); - selector = null; - if (unixSocketChannel != null) { - unixSocketChannel.configureBlocking(wasBlocking); - } + TunnelingJdkSocket.this.close(); } }; } @@ -274,8 +263,6 @@ public OutputStream getOutputStream() throws IOException { } return new OutputStream() { - private boolean isClosed = false; - @Override public void write(int b) throws IOException { byte[] array = ByteBuffer.allocate(4).putInt(b).array(); @@ -284,14 +271,19 @@ public void write(int b) throws IOException { @Override public void write(byte[] b, int off, int len) throws IOException { - if (isClosed) { - throw new IOException("Stream is closed"); + if (isOutputShutdown()) { + throw new IOException("Stream closed"); } ByteBuffer buffer = ByteBuffer.wrap(b, off, len); while (buffer.hasRemaining()) { unixSocketChannel.write(buffer); } } + + @Override + public void close() throws IOException { + TunnelingJdkSocket.this.close(); + } }; } @@ -335,15 +327,19 @@ public void close() throws IOException { if (isClosed()) { return; } + if (!isInputShutdown()) { + shutdownInput(); + } + if (!isOutputShutdown()) { + shutdownOutput(); + } if (selector != null) { selector.close(); selector = null; - if (unixSocketChannel != null) { - unixSocketChannel.configureBlocking(wasBlocking); - } } if (unixSocketChannel != null) { unixSocketChannel.close(); + unixSocketChannel = null; } closed = true; } From 1f7c3dff60de6d2a67ec8984c54c4ffeb55e5a1c Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 1 May 2025 18:57:11 -0400 Subject: [PATCH 5/8] Clean implementation and tests to match expected Socket behavior --- .../common/socket/TunnelingJdkSocket.java | 35 ++++++++++------ .../common/socket/TunnelingJdkSocketTest.java | 42 ++++++------------- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index ce5b1335b33..f7b35321a74 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -91,6 +91,9 @@ public synchronized int getSoTimeout() throws SocketException { @Override public void connect(final SocketAddress endpoint) throws IOException { + if (endpoint == null) { + throw new IllegalArgumentException("Endpoint cannot be null"); + } if (isClosed()) { throw new SocketException("Socket is closed"); } @@ -106,6 +109,12 @@ public void connect(final SocketAddress endpoint) throws IOException { // https://github.com/jnr/jnr-unixsocket/blob/master/src/main/java/jnr/unixsocket/UnixSocket.java#L89-L97 @Override public void connect(final SocketAddress endpoint, final int timeout) throws IOException { + if (endpoint == null) { + throw new IllegalArgumentException("Endpoint cannot be null"); + } + if (timeout < 0) { + throw new IllegalArgumentException("Timeout cannot be negative"); + } if (isClosed()) { throw new SocketException("Socket is closed"); } @@ -123,17 +132,17 @@ public SocketChannel getChannel() { @Override public void setSendBufferSize(int size) throws SocketException { + if (size <= 0) { + throw new IllegalArgumentException("Invalid send buffer size"); + } if (isClosed()) { throw new SocketException("Socket is closed"); } - if (size < 0) { - throw new IllegalArgumentException("Invalid send buffer size"); - } + sendBufferSize = size; try { unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_SNDBUF, size); - sendBufferSize = size; } catch (IOException e) { - throw new SocketException("Failed to set send buffer size"); + throw new SocketException("Failed to set send buffer size socket option"); } } @@ -150,17 +159,17 @@ public int getSendBufferSize() throws SocketException { @Override public void setReceiveBufferSize(int size) throws SocketException { + if (size <= 0) { + throw new IllegalArgumentException("Invalid receive buffer size"); + } if (isClosed()) { throw new SocketException("Socket is closed"); } - if (size < 0) { - throw new IllegalArgumentException("Invalid receive buffer size"); - } + receiveBufferSize = size; try { unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_RCVBUF, size); - receiveBufferSize = size; } catch (IOException e) { - throw new SocketException("Failed to set receive buffer size"); + throw new SocketException("Failed to set receive buffer size socket option"); } } @@ -215,7 +224,7 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { if (isInputShutdown()) { - throw new IOException("Stream closed"); + return -1; } buffer.clear(); @@ -319,6 +328,9 @@ public void shutdownOutput() throws IOException { @Override public InetAddress getInetAddress() { + if (!isConnected()) { + return null; + } return inetSocketAddress.getAddress(); } @@ -339,7 +351,6 @@ public void close() throws IOException { } if (unixSocketChannel != null) { unixSocketChannel.close(); - unixSocketChannel = null; } closed = true; } diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 057cc03c937..df2410fe728 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -25,7 +25,7 @@ public class TunnelingJdkSocketTest { private static final AtomicBoolean isServerRunning = new AtomicBoolean(false); @Test - public void testSocketConnect() throws Exception { + public void testSocketConnectAndClose() throws Exception { if (!Config.get().isJdkSocketEnabled()) { System.out.println( "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); @@ -39,45 +39,25 @@ public void testSocketConnect() throws Exception { assertFalse(clientSocket.isConnected()); assertFalse(clientSocket.isClosed()); + clientSocket.connect(new InetSocketAddress("localhost", 0)); + InputStream inputStream = clientSocket.getInputStream(); + OutputStream outputStream = clientSocket.getOutputStream(); + assertTrue(clientSocket.isConnected()); assertFalse(clientSocket.isClosed()); assertFalse(clientSocket.isInputShutdown()); assertFalse(clientSocket.isOutputShutdown()); assertThrows( SocketException.class, () -> clientSocket.connect(new InetSocketAddress("localhost", 0))); - clientSocket.close(); - assertFalse(clientSocket.isConnected()); - assertTrue(clientSocket.isClosed()); - assertTrue(clientSocket.isInputShutdown()); - assertTrue(clientSocket.isOutputShutdown()); - clientSocket.close(); - - isServerRunning.set(false); - } - @Test - public void testSocketClose() throws Exception { - if (!Config.get().isJdkSocketEnabled()) { - System.out.println( - "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); - return; - } - - TunnelingJdkSocket clientSocket = createClient(); - InputStream inputStream = clientSocket.getInputStream(); - OutputStream outputStream = clientSocket.getOutputStream(); + clientSocket.close(); - assertFalse(clientSocket.isClosed()); - assertFalse(clientSocket.isInputShutdown()); - assertFalse(clientSocket.isOutputShutdown()); assertTrue(clientSocket.isConnected()); - clientSocket.close(); assertTrue(clientSocket.isClosed()); assertTrue(clientSocket.isInputShutdown()); assertTrue(clientSocket.isOutputShutdown()); - assertFalse(clientSocket.isConnected()); - assertThrows(IOException.class, () -> inputStream.read()); + assertEquals(-1, inputStream.read()); assertThrows(IOException.class, () -> outputStream.write(1)); assertThrows(SocketException.class, () -> clientSocket.getInputStream()); assertThrows(SocketException.class, () -> clientSocket.getOutputStream()); @@ -101,11 +81,13 @@ public void testInputStreamClose() throws Exception { assertFalse(clientSocket.isClosed()); assertFalse(clientSocket.isInputShutdown()); assertFalse(clientSocket.isOutputShutdown()); + inputStream.close(); + assertTrue(clientSocket.isClosed()); assertTrue(clientSocket.isInputShutdown()); assertTrue(clientSocket.isOutputShutdown()); - assertThrows(IOException.class, () -> inputStream.read()); + assertEquals(-1, inputStream.read()); assertThrows(IOException.class, () -> outputStream.write(1)); assertThrows(SocketException.class, () -> clientSocket.getInputStream()); assertThrows(SocketException.class, () -> clientSocket.getOutputStream()); @@ -128,11 +110,13 @@ public void testOutputStreamClose() throws Exception { assertFalse(clientSocket.isClosed()); assertFalse(clientSocket.isInputShutdown()); assertFalse(clientSocket.isOutputShutdown()); + outputStream.close(); + assertTrue(clientSocket.isClosed()); assertTrue(clientSocket.isInputShutdown()); assertTrue(clientSocket.isOutputShutdown()); - assertThrows(IOException.class, () -> inputStream.read()); + assertEquals(-1, inputStream.read()); assertThrows(IOException.class, () -> outputStream.write(1)); assertThrows(SocketException.class, () -> clientSocket.getInputStream()); assertThrows(SocketException.class, () -> clientSocket.getOutputStream()); From 3133a7b8d2d4a3ba82e1d50c589086048cf88813 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 2 May 2025 08:42:17 -0400 Subject: [PATCH 6/8] Remove comments --- .../java/datadog/common/socket/TunnelingJdkSocketTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index df2410fe728..76362accb1e 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -229,14 +229,14 @@ public void testFileDescriptorLeak() throws Exception { for (int i = 0; i < 100; i++) { InputStream inputStream = clientSocket.getInputStream(); long currentCount = getFileDescriptorCount(); - assertTrue(currentCount <= initialCount + 7); // why +7? + assertTrue(currentCount <= initialCount + 7); } clientSocket.close(); isServerRunning.set(false); long finalCount = getFileDescriptorCount(); - assertTrue(finalCount <= initialCount + 3); // why +3? + assertTrue(finalCount <= initialCount + 3); } private long getFileDescriptorCount() { From 4e6eb278b86d91f63cc4d8070eadd6a0af12e73f Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 2 May 2025 09:39:21 -0400 Subject: [PATCH 7/8] Change default JDK_SOCKET_ENABLED to true --- internal-api/src/main/java/datadog/trace/api/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index d6526b29716..6f3041ca7d8 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -2022,7 +2022,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) this.apmTracingEnabled = configProvider.getBoolean(GeneralConfig.APM_TRACING_ENABLED, true); - this.jdkSocketEnabled = configProvider.getBoolean(JDK_SOCKET_ENABLED, false); + this.jdkSocketEnabled = configProvider.getBoolean(JDK_SOCKET_ENABLED, true); log.debug("New instance: {}", this); } From c089869839de9294b650cd1e82681fb272eec61d Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 6 May 2025 15:45:21 -0400 Subject: [PATCH 8/8] Address PR comments about socket closing and buffer exceptions --- .../common/socket/TunnelingJdkSocket.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index f7b35321a74..4037252ede4 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -142,7 +142,9 @@ public void setSendBufferSize(int size) throws SocketException { try { unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_SNDBUF, size); } catch (IOException e) { - throw new SocketException("Failed to set send buffer size socket option"); + SocketException se = new SocketException("Failed to set send buffer size socket option"); + se.initCause(e); + throw se; } } @@ -169,7 +171,9 @@ public void setReceiveBufferSize(int size) throws SocketException { try { unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_RCVBUF, size); } catch (IOException e) { - throw new SocketException("Failed to set receive buffer size socket option"); + SocketException se = new SocketException("Failed to set receive buffer size socket option"); + se.initCause(e); + throw se; } } @@ -339,18 +343,31 @@ public void close() throws IOException { if (isClosed()) { return; } - if (!isInputShutdown()) { - shutdownInput(); + // Ignore possible exceptions so that we continue closing the socket + try { + if (!isInputShutdown()) { + shutdownInput(); + } + } catch (IOException e) { } - if (!isOutputShutdown()) { - shutdownOutput(); + try { + if (!isOutputShutdown()) { + shutdownOutput(); + } + } catch (IOException e) { } - if (selector != null) { - selector.close(); - selector = null; + try { + if (selector != null) { + selector.close(); + selector = null; + } + } catch (IOException e) { } - if (unixSocketChannel != null) { - unixSocketChannel.close(); + try { + if (unixSocketChannel != null) { + unixSocketChannel.close(); + } + } catch (IOException e) { } closed = true; }