diff --git a/core/src/main/java/org/rnorth/tcpunixsocketproxy/NamedPipeSocket.java b/core/src/main/java/org/rnorth/tcpunixsocketproxy/NamedPipeSocket.java new file mode 100644 index 00000000000..274d345e738 --- /dev/null +++ b/core/src/main/java/org/rnorth/tcpunixsocketproxy/NamedPipeSocket.java @@ -0,0 +1,177 @@ +/* +MariaDB Client for Java + +Copyright (c) 2012-2014 Monty Program Ab. +Copyright (c) 2015-2016 MariaDB Ab. + +This library is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License as published by the Free +Software Foundation; either version 2.1 of the License, or (at your option) +any later version. + +This library is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License +for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this library; if not, write to Monty Program Ab info@montyprogram.com. + +This particular MariaDB Client for Java file is work +derived from a Drizzle-JDBC. Drizzle-JDBC file which is covered by subject to +the following copyright and notice provisions: + +Copyright (c) 2009-2011, Marcus Eriksson + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: +Redistributions of source code must retain the above copyright notice, this list +of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright notice, this +list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +Neither the name of the driver nor the names of its contributors may not be +used to endorse or promote products derived from this software without specific +prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY +OF SUCH DAMAGE. +*/ + +package org.rnorth.tcpunixsocketproxy; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.net.Socket; +import java.net.SocketAddress; + +public class NamedPipeSocket extends Socket { + + String host; + String name; + + RandomAccessFile file; + InputStream is; + OutputStream os; + + public NamedPipeSocket(String host, String name) { + this.host = host; + this.name = name; + } + + @Override + public void close() throws IOException { + if (file != null) { + file.close(); + file = null; + } + } + + @Override + public void connect(SocketAddress endpoint) throws IOException { + connect(endpoint, 0); + } + + /** + * Name pipe connection. + * @param endpoint endPoint + * @param timeout timeout + * @throws IOException exception + */ + public void connect(SocketAddress endpoint, int timeout) throws IOException { + String filename; + if (host == null || host.equals("localhost")) { + filename = "\\\\.\\pipe\\" + name; + } else { + filename = "\\\\" + host + "\\pipe\\" + name; + } + file = new RandomAccessFile(filename, "rw"); + + is = new InputStream() { + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + return file.read(bytes, off, len); + } + + @Override + public int read() throws IOException { + return file.read(); + } + + @Override + public int read(byte[] bytes) throws IOException { + return file.read(bytes); + } + }; + + os = new OutputStream() { + @Override + public void write(byte[] bytes, int off, int len) throws IOException { + file.write(bytes, off, len); + } + + @Override + public void write(int value) throws IOException { + file.write(value); + } + + @Override + public void write(byte[] bytes) throws IOException { + file.write(bytes); + } + }; + } + + public InputStream getInputStream() { + return is; + } + + public OutputStream getOutputStream() { + return os; + } + + public void setTcpNoDelay(boolean bool) { + //do nothing + } + + public void setKeepAlive(boolean bool) { + //do nothing + } + + public void setReceiveBufferSize(int size) { + //do nothing + } + + public void setSendBufferSize(int size) { + //do nothing + } + + public void setSoLinger(boolean bool, int value) { + //do nothing + } + + @Override + public void setSoTimeout(int timeout) { + //do nothing + } + + public void shutdownInput() { + //do nothing + } + + public void shutdownOutput() { + //do nothing + } +} diff --git a/core/src/main/java/org/rnorth/tcpunixsocketproxy/TcpToNamedPipeSocketProxy.java b/core/src/main/java/org/rnorth/tcpunixsocketproxy/TcpToNamedPipeSocketProxy.java new file mode 100644 index 00000000000..226a77245d0 --- /dev/null +++ b/core/src/main/java/org/rnorth/tcpunixsocketproxy/TcpToNamedPipeSocketProxy.java @@ -0,0 +1,51 @@ +package org.rnorth.tcpunixsocketproxy; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; + +@Slf4j +@RequiredArgsConstructor +public class TcpToNamedPipeSocketProxy { + + private final String pipeName; + + private ServerSocket listenSocket; + + private Thread acceptThread; + + public InetSocketAddress start() throws IOException { + + listenSocket = new ServerSocket(); + listenSocket.bind(new InetSocketAddress("localhost", 0)); + + log.debug("Listening on {} and proxying to {}", listenSocket.getLocalSocketAddress(), pipeName); + + acceptThread = new Thread(() -> { + while (!Thread.interrupted()) { + try { + Socket incomingSocket = listenSocket.accept(); + log.debug("Accepting incoming connection from {}", incomingSocket.getRemoteSocketAddress()); + + new ProxyThread(incomingSocket, new NamedPipeSocket(null, pipeName)); + } catch (IOException ignored) { + } + } + }); + acceptThread.start(); + + return (InetSocketAddress) listenSocket.getLocalSocketAddress(); + } + + public void stop() { + acceptThread.interrupt(); + try { + listenSocket.close(); + } catch (IOException ignored) { + } + } +} diff --git a/core/src/main/java/org/testcontainers/dockerclient/NamedPipeSocketClientProviderStrategy.java b/core/src/main/java/org/testcontainers/dockerclient/NamedPipeSocketClientProviderStrategy.java index 6a4f7731fa5..8ecca51f914 100644 --- a/core/src/main/java/org/testcontainers/dockerclient/NamedPipeSocketClientProviderStrategy.java +++ b/core/src/main/java/org/testcontainers/dockerclient/NamedPipeSocketClientProviderStrategy.java @@ -2,47 +2,36 @@ import com.github.dockerjava.core.DefaultDockerClientConfig; import com.github.dockerjava.core.DockerClientConfig; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; import org.jetbrains.annotations.NotNull; +import org.rnorth.tcpunixsocketproxy.TcpToNamedPipeSocketProxy; -import java.io.*; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.URI; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.io.File; public class NamedPipeSocketClientProviderStrategy extends DockerClientProviderStrategy { - private static final String NAMED_PIPE_FILE_NAME = "\\\\.\\pipe\\docker_engine"; + private static final String NAMED_PIPE_NAME = "docker_engine"; + private static final String NAMED_PIPE_FILE_NAME = "\\\\.\\pipe\\" + NAMED_PIPE_NAME; private static final int PING_TIMEOUT_DEFAULT = 5; private static final String PING_TIMEOUT_PROPERTY_NAME = "testcontainers.namedpipesocketprovider.timeout"; @Override public void test() throws InvalidConfigurationException { - File file = new File(NAMED_PIPE_FILE_NAME); - - if (!file.exists()) { + if (!new File(NAMED_PIPE_FILE_NAME).exists()) { throw new InvalidConfigurationException("this strategy only works with named pipe file"); } - NamedPipeProxy proxy = new NamedPipeProxy(file); + TcpToNamedPipeSocketProxy proxy = new TcpToNamedPipeSocketProxy(NAMED_PIPE_NAME); try { int proxyPort = proxy.start().getPort(); config = tryConfiguration("tcp://localhost:" + proxyPort); - LOGGER.info("Accessing named pipe socket via TCP proxy (" + file + " via localhost:" + proxyPort + ")"); + LOGGER.info("Accessing unix domain socket via TCP proxy (" + NAMED_PIPE_FILE_NAME + " via localhost:" + proxyPort + ")"); } catch (Exception e) { proxy.stop(); throw new InvalidConfigurationException("ping failed", e); - } finally { - Runtime.getRuntime().addShutdownHook(new Thread(proxy::stop)); } } @@ -64,121 +53,4 @@ protected DockerClientConfig tryConfiguration(String dockerHost) { return config; } - - @Slf4j - @RequiredArgsConstructor - static class NamedPipeProxy { - - final ExecutorService executorService = Executors.newCachedThreadPool(); - - final File file; - - InetSocketAddress start() throws IOException { - ServerSocket listenSocket = new ServerSocket(); - listenSocket.bind(new InetSocketAddress("localhost", 0)); - log.debug("Listening on {} and proxying to {}", listenSocket.getLocalSocketAddress(), file); - - executorService.submit(() -> { - try { - while (!Thread.interrupted()) { - try { - Socket incomingSocket = listenSocket.accept(); - log.debug("Accepting incoming connection from {}", incomingSocket.getRemoteSocketAddress()); - new Thread(() -> { - try ( - RandomAccessFile randomAccessFile = new RandomAccessFile(NAMED_PIPE_FILE_NAME, "rw"); - ) { - - log.info("Created RandomAccessFile from " + NAMED_PIPE_FILE_NAME); - - Thread threadB = new Thread(() -> { - log.info("Redirecting input"); - try ( - InputStream socketIn = incomingSocket.getInputStream(); - FileOutputStream fileOut = new FileOutputStream(randomAccessFile.getFD()); - ) { - final byte[] buffer = new byte[1024 * 4]; - int n; - while (true) { - if (IOUtils.EOF == (n = socketIn.read(buffer))) { - break; - } - synchronized (randomAccessFile) { - fileOut.write(buffer, 0, n); - } - System.out.println("Sent " + n + " bytes to a file. Value: " + new String(buffer, 0, n)); - } - log.info("Done redirecting input"); - IOUtils.closeQuietly(incomingSocket); - IOUtils.closeQuietly(randomAccessFile); - } catch (Exception e) { - log.warn("input", e); - } - }); - - log.info("Redirecting output"); - try ( - OutputStream socketOut = incomingSocket.getOutputStream(); - FileInputStream fileIn = new FileInputStream(randomAccessFile.getFD()); - ) { - threadB.start(); - - final byte[] buffer = new byte[1024 * 4]; - int n; - while (true) { - synchronized (randomAccessFile) { - if (IOUtils.EOF == (n = fileIn.read(buffer))) { - break; - } - } - System.out.println("Read " + n + " bytes from a file. Value: " + new String(buffer, 0, n)); - socketOut.write(buffer, 0, n); - socketOut.flush(); - } - log.info("Done redirecting output"); - IOUtils.closeQuietly(incomingSocket); - IOUtils.closeQuietly(randomAccessFile); - } catch (Exception e) { - log.warn("output", e); - } - - threadB.join(); - log.info("all futures completed"); - } catch (Exception e) { - log.warn("", e); - } - }).start(); - - } catch (IOException e) { - log.warn("", e); - } - } - - log.info("Thread was interrupted. I'm done."); - } catch (Exception e) { - log.warn("", e); - } finally { - listenSocket.close(); - } - log.info("Done."); - return null; - }); - InetSocketAddress inetSocketAddress = (InetSocketAddress) listenSocket.getLocalSocketAddress(); - - log.info("Pinging..."); - log.info("Ping: " + IOUtils.toString(URI.create("http://localhost:" + inetSocketAddress.getPort() + "/v1.25/_ping"))); - log.info("Ping: " + IOUtils.toString(URI.create("http://localhost:" + inetSocketAddress.getPort() + "/v1.25/_ping"))); - log.info("Ping: " + IOUtils.toString(URI.create("http://localhost:" + inetSocketAddress.getPort() + "/v1.25/_ping"))); - - return inetSocketAddress; - } - - public void stop() { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("", e); - } - } - } }