diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml index ed2f8f4dcd61..0c64c8ffede9 100644 --- a/hadoop-hdds/container-service/pom.xml +++ b/hadoop-hdds/container-service/pom.xml @@ -115,6 +115,18 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> org.glassfish.jaxb jaxb-runtime + + io.netty + netty-transport + + + io.netty + netty-codec + + + io.netty + netty-handler + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java new file mode 100644 index 000000000000..69af0ae40ff5 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Streaming binaries to single directory. + */ +public class DirectoryServerDestination implements StreamingDestination { + + private Path root; + + public DirectoryServerDestination(Path path) { + root = path; + } + + @Override + public Path mapToDestination(String name) { + return root.resolve(Paths.get(name)); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java new file mode 100644 index 000000000000..8759d09f7f3d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +/** + * Streaming files from single directory. + */ +public class DirectoryServerSource implements StreamingSource { + + private Path root; + + public DirectoryServerSource(Path root) { + this.root = root; + } + + @Override + /** + * Return logicalNames and real file path to replicate. + * + * @param id name of the subdirectory to replitace relative to root. + */ + public Map getFilesToStream(String id) + throws InterruptedException { + Map files = new HashMap<>(); + try { + Files.walk(root.resolve(id)) + .filter(Files::isRegularFile) + .forEach(path -> { + files.put(root.relativize(path).toString(), path); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return files; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java new file mode 100644 index 000000000000..c0e530abbe3b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ByteProcessor; +import io.netty.util.ReferenceCountUtil; + +/** + * Protocol definition from streaming binary files. + * + * Format of the protocol (TCP/IP): + * + * LOGICAL_NAME SIZE + * ... (binary content) + * LOGICAL_NAME SIZE + * ... (binary content) + * END 0 + */ +public class DirstreamClientHandler extends ChannelInboundHandlerAdapter { + + private final StreamingDestination destination; + private boolean headerMode = true; + private StringBuilder currentFileName = new StringBuilder(); + private RandomAccessFile destFile; + + private FileChannel destFileChannel; + + private long remaining; + + public DirstreamClientHandler(StreamingDestination streamingDestination) { + this.destination = streamingDestination; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws IOException { + try { + ByteBuf buffer = (ByteBuf) msg; + doRead(ctx, buffer); + } finally { + ReferenceCountUtil.release(msg); + } + } + + public void doRead(ChannelHandlerContext ctx, ByteBuf buffer) + throws IOException { + if (headerMode) { + int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer + .readerIndex(); + if (eolPosition > 0) { + headerMode = false; + final ByteBuf name = buffer.readBytes(eolPosition); + currentFileName.append(name + .toString(StandardCharsets.UTF_8)); + name.release(); + buffer.skipBytes(1); + String[] parts = currentFileName.toString().split(" ", 2); + remaining = Long.parseLong(parts[0]); + Path destFilePath = destination.mapToDestination(parts[1]); + final Path destfileParent = destFilePath.getParent(); + if (destfileParent == null) { + throw new IllegalArgumentException("Streaming destination " + + "provider return with invalid path: " + destFilePath); + } + Files.createDirectories(destfileParent); + this.destFile = + new RandomAccessFile(destFilePath.toFile(), "rw"); + destFileChannel = this.destFile.getChannel(); + + } else { + currentFileName + .append(buffer.toString(StandardCharsets.UTF_8)); + } + } + if (!headerMode) { + final int readableBytes = buffer.readableBytes(); + if (remaining >= readableBytes) { + remaining -= + buffer.readBytes(destFileChannel, readableBytes); + } else { + remaining -= buffer.readBytes(destFileChannel, (int) remaining); + currentFileName = new StringBuilder(); + headerMode = true; + destFile.close(); + if (readableBytes > 0) { + doRead(ctx, buffer); + } + } + } + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + try { + if (destFile != null) { + destFile.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + try { + destFileChannel.close(); + destFile.close(); + } catch (IOException e) { + e.printStackTrace(); + } + ctx.close(); + } + + public String getCurrentFileName() { + return currentFileName.toString(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java new file mode 100644 index 000000000000..c609c2c85d62 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.DefaultFileRegion; +import io.netty.util.ByteProcessor; + +/** + * Protocol definition of the streaming. + */ +public class DirstreamServerHandler extends ChannelInboundHandlerAdapter { + + public static final String END_MARKER = "0 END"; + + private final StringBuilder id = new StringBuilder(); + + private StreamingSource source; + + private boolean headerProcessed = false; + + public DirstreamServerHandler(StreamingSource source) { + this.source = source; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (!headerProcessed) { + ByteBuf buffer = (ByteBuf) msg; + int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer + .readerIndex(); + if (eolPosition > 0) { + headerProcessed = true; + id.append(buffer.toString(Charset.defaultCharset())); + } else { + id.append(buffer.toString(0, eolPosition, Charset.defaultCharset())); + } + buffer.release(); + } + + if (headerProcessed) { + final List> entriesToWrite = new ArrayList<>( + source.getFilesToStream(id.toString().trim()).entrySet()); + + writeOneElement(ctx, entriesToWrite, 0); + + } + } + + public void writeOneElement( + ChannelHandlerContext ctx, + List> entriesToWrite, + int i + ) + throws IOException { + final Entry entryToWrite = entriesToWrite.get(i); + Path file = entryToWrite.getValue(); + String name = entryToWrite.getKey(); + long fileSize = Files.size(file); + String identifier = fileSize + " " + name + "\n"; + ByteBuf identifierBuf = + Unpooled.wrappedBuffer(identifier.getBytes( + StandardCharsets.UTF_8)); + + final int currentIndex = i; + + ChannelFuture lastFuture = ctx.writeAndFlush(identifierBuf); + lastFuture.addListener(f -> { + ChannelFuture nextFuture = ctx.writeAndFlush( + new DefaultFileRegion(file.toFile(), 0, fileSize)); + if (currentIndex == entriesToWrite.size() - 1) { + nextFuture.addListener(a -> + ctx.writeAndFlush( + Unpooled.wrappedBuffer( + END_MARKER.getBytes(StandardCharsets.UTF_8))) + .addListener(b -> { + ctx.channel().close(); + })); + } else { + nextFuture.addListener( + a -> writeOneElement(ctx, entriesToWrite, + currentIndex + 1)); + } + }); + + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + cause.printStackTrace(); + if (ctx.channel().isActive()) { + ctx.writeAndFlush("ERR: " + + cause.getClass().getSimpleName() + ": " + + cause.getMessage() + '\n').addListener( + ChannelFutureListener.CLOSE); + } + ctx.close(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java new file mode 100644 index 000000000000..8669fd870782 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.container.stream.DirstreamServerHandler.END_MARKER; + +/** + * Client to stream huge binaries from a streamling server. + */ +public class StreamingClient implements AutoCloseable { + + private final Bootstrap bootstrap; + private final DirstreamClientHandler dirstreamClientHandler; + private EventLoopGroup group; + private int port; + private String host; + + public StreamingClient( + String host, + int port, + StreamingDestination streamingDestination + ) throws InterruptedException { + this.port = port; + this.host = host; + + group = new NioEventLoopGroup(100); + dirstreamClientHandler = new DirstreamClientHandler(streamingDestination); + bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_RCVBUF, 1024 * 1024) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new StringEncoder(CharsetUtil.UTF_8), + dirstreamClientHandler + ); + } + }); + + } + + + public void stream(String id) { + stream(id, 200L, TimeUnit.SECONDS); + } + + public void stream(String id, long timeout, TimeUnit unit) { + try { + Channel channel = bootstrap.connect(host, port).sync().channel(); + channel.writeAndFlush(id + "\n") + .await(timeout, unit); + channel.closeFuture().await(timeout, unit); + if (!dirstreamClientHandler.getCurrentFileName().equals(END_MARKER)) { + throw new RuntimeException("Streaming is failed. Not all files " + + "are streamed. Please check the log of the server." + + " Last (partial?) streamed file: " + + dirstreamClientHandler.getCurrentFileName()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + + @Override + public void close() { + group.shutdownGracefully(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java new file mode 100644 index 000000000000..f91d99dce2dd --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.nio.file.Path; + +/** + * Interface defines the mapping to the destination. + */ +public interface StreamingDestination { + + /** + * Returns destination path to each logical name. + */ + Path mapToDestination(String name); + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java new file mode 100644 index 000000000000..4ceb1569bec0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.net.InetSocketAddress; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Netty based streaming server to replicate files from a directory. + */ +public class StreamingServer implements AutoCloseable { + + private static final Logger LOG = + LoggerFactory.getLogger(StreamingServer.class); + + private int port; + + private StreamingSource source; + + private EventLoopGroup bossGroup; + + private EventLoopGroup workerGroup; + + public StreamingServer( + StreamingSource source, int port + ) { + this.port = port; + this.source = source; + } + + public void start() throws InterruptedException { + ServerBootstrap b = new ServerBootstrap(); + bossGroup = new NioEventLoopGroup(100); + workerGroup = new NioEventLoopGroup(100); + + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new ChunkedWriteHandler(), + new DirstreamServerHandler(source)); + } + }); + + ChannelFuture f = b.bind(port).sync(); + final InetSocketAddress socketAddress = + (InetSocketAddress) f.channel().localAddress(); + port = socketAddress.getPort(); + LOG.info("Started streaming server on " + port); + } + + public void stop() { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + + public int getPort() { + return port; + } + + @Override + public void close() { + stop(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java new file mode 100644 index 000000000000..5fdfc931b99c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.nio.file.Path; +import java.util.Map; + +/** + * Interface to define which files should be replicated for a given id. + */ +public interface StreamingSource { + + /** + * + * @param id: custom identifier + * + * @return map of files which should be copied (logical name -> real path) + */ + Map getFilesToStream(String id) throws InterruptedException; + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/package-info.java new file mode 100644 index 000000000000..8c8ed4e02319 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Streaming API: client and server to move raw binary data. + */ +package org.apache.hadoop.ozone.container.stream; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java new file mode 100644 index 000000000000..6453477fa9ac --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +/** + * Testing stream server. + */ +public class TestStreamingServer { + + private static final String SUBDIR = "test1"; + + private static final byte[] CONTENT = "Stream it if you can" + .getBytes(StandardCharsets.UTF_8); + + @Test + public void simpleStream() throws Exception { + Path sourceDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Path destDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Files.createDirectories(sourceDir.resolve(SUBDIR)); + Files.createDirectories(destDir.resolve(SUBDIR)); + + //GIVEN: generate file + Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT); + + //WHEN: stream subdir + streamDir(sourceDir, destDir, SUBDIR); + + //THEN: compare the files + final byte[] targetContent = Files + .readAllBytes(destDir.resolve(SUBDIR).resolve("file1")); + Assert.assertArrayEquals(CONTENT, targetContent); + + } + + + @Test(expected = RuntimeException.class) + public void failedStream() throws Exception { + Path sourceDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Path destDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Files.createDirectories(sourceDir.resolve(SUBDIR)); + Files.createDirectories(destDir.resolve(SUBDIR)); + + //GIVEN: generate file + Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT); + + //WHEN: stream subdir + streamDir(sourceDir, destDir, "NO_SUCH_ID"); + + //THEN: compare the files + //exception is expected + + } + + @Test(expected = RuntimeException.class) + public void timeout() throws Exception { + Path sourceDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Path destDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Files.createDirectories(sourceDir.resolve(SUBDIR)); + Files.createDirectories(destDir.resolve(SUBDIR)); + + //GIVEN: generate file + Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT); + + //WHEN: stream subdir + try (StreamingServer server = + new StreamingServer(new DirectoryServerSource(sourceDir) { + @Override + public Map getFilesToStream(String id) + throws InterruptedException { + Thread.sleep(3000L); + return super.getFilesToStream(id); + } + }, 0)) { + server.start(); + try (StreamingClient client = + new StreamingClient("localhost", server.getPort(), + new DirectoryServerDestination( + destDir))) { + client.stream(SUBDIR, 1L, TimeUnit.SECONDS); + } + } + + //THEN: compare the files + //exception is expected + + } + + private void streamDir(Path sourceDir, Path destDir, String subdir) + throws InterruptedException { + try (StreamingServer server = new StreamingServer( + new DirectoryServerSource(sourceDir), 0)) { + server.start(); + try (StreamingClient client = + new StreamingClient("localhost", server.getPort(), + new DirectoryServerDestination( + destDir))) { + client.stream(subdir); + } + } + } +} \ No newline at end of file diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index 21714c9602c9..d09937caa547 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -59,7 +59,8 @@ GeneratorOm.class, GeneratorScm.class, GeneratorDatanode.class, - ClosedContainerReplicator.class}, + ClosedContainerReplicator.class, + StreamingGenerator.class}, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true) public class Freon extends GenericCli { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java new file mode 100644 index 000000000000..2cf251510aa5 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.freon; + +import com.codahale.metrics.Timer; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.ozone.container.stream.DirectoryServerDestination; +import org.apache.hadoop.ozone.container.stream.DirectoryServerSource; +import org.apache.hadoop.ozone.container.stream.StreamingClient; +import org.apache.hadoop.ozone.container.stream.StreamingServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.Callable; + +@CommandLine.Command(name = "strmg", + aliases = "streaming-generator", + description = + "Create directory structure and stream them multiple times.", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) +public class StreamingGenerator extends BaseFreonGenerator + implements Callable { + + private static final Logger LOG = + LoggerFactory.getLogger(StreamingGenerator.class); + + @CommandLine.Option(names = {"--root-dir"}, + description = "Directory where the working directories are created", + defaultValue = "/tmp/ozone-streaming") + private Path testRoot; + + @CommandLine.Option(names = {"--files"}, + description = "Number of the files in the test directory " + + "to be generated.", + defaultValue = "50") + private int numberOfFiles; + + @CommandLine.Option(names = {"--size"}, + description = "Size of the generated files.", + defaultValue = "104857600") + private int fileSize; + + + private int port = 1234; + + private String subdir = "dir1"; + private Timer timer; + + + @Override + public Void call() throws Exception { + init(); + + generateBaseData(); + + timer = getMetrics().timer("streaming"); + setThreadNo(1); + runTests(this::copyDir); + + return null; + } + + private void generateBaseData() throws IOException { + Path sourceDir = testRoot.resolve("streaming-0"); + if (Files.exists(sourceDir)) { + deleteDirRecursive(sourceDir); + } + Path subDir = sourceDir.resolve(subdir); + Files.createDirectories(subDir); + ContentGenerator contentGenerator = new ContentGenerator(fileSize, + 1024); + + for (int i = 0; i < numberOfFiles; i++) { + try (FileOutputStream out = new FileOutputStream( + subDir.resolve("file-" + i).toFile()) + ) { + contentGenerator.write(out); + } + } + } + + private void copyDir(long l) { + Path sourceDir = testRoot.resolve("streaming-" + l); + Path destinationDir = testRoot.resolve("streaming-" + (l + 1)); + + try (StreamingServer server = + new StreamingServer(new DirectoryServerSource(sourceDir), + 1234)) { + try { + server.start(); + LOG.info("Starting streaming server on port {} to publish dir {}", + port, sourceDir); + + try (StreamingClient client = + new StreamingClient("localhost", port, + new DirectoryServerDestination( + destinationDir))) { + + timer.time(() -> client.stream(subdir)); + + } + LOG.info("Replication has been finished to {}", sourceDir); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + + deleteDirRecursive(sourceDir); + + } + } + + private void deleteDirRecursive(Path destinationDir) { + try { + FileUtils.forceDelete(destinationDir.toFile()); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/pom.xml b/pom.xml index f74df1798a23..f1ba32f6fa14 100644 --- a/pom.xml +++ b/pom.xml @@ -1031,6 +1031,21 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs netty-all ${netty.version} + + io.netty + netty-transport + ${netty.version} + + + io.netty + netty-codec + ${netty.version} + + + io.netty + netty-handler + ${netty.version} + commons-io