diff --git a/core/pom.xml b/core/pom.xml index 18c337f8e6e0..96d457ac43fb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -20,6 +20,11 @@ 14.0.1 compile + + io.netty + netty-all + 4.0.23.Final + org.eclipse.jetty jetty-jsp diff --git a/core/src/main/java/tachyon/client/RemoteBlockInStream.java b/core/src/main/java/tachyon/client/RemoteBlockInStream.java index 2e687b30ef62..b94688792dbd 100644 --- a/core/src/main/java/tachyon/client/RemoteBlockInStream.java +++ b/core/src/main/java/tachyon/client/RemoteBlockInStream.java @@ -17,7 +17,7 @@ import tachyon.thrift.NetAddress; import tachyon.util.CommonUtils; import tachyon.util.NetworkUtils; -import tachyon.worker.DataServerMessage; +import tachyon.worker.nio.DataServerMessage; /** * BlockInStream for remote block. diff --git a/core/src/main/java/tachyon/client/TachyonFile.java b/core/src/main/java/tachyon/client/TachyonFile.java index eaa4bf1fdfc9..a966888420b9 100644 --- a/core/src/main/java/tachyon/client/TachyonFile.java +++ b/core/src/main/java/tachyon/client/TachyonFile.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -23,7 +24,7 @@ import tachyon.thrift.NetAddress; import tachyon.util.CommonUtils; import tachyon.util.NetworkUtils; -import tachyon.worker.DataServerMessage; +import tachyon.worker.nio.DataServerMessage; /** * Tachyon File. diff --git a/core/src/main/java/tachyon/conf/MasterConf.java b/core/src/main/java/tachyon/conf/MasterConf.java index 78d52aa4564b..3ca2aa3caf6d 100644 --- a/core/src/main/java/tachyon/conf/MasterConf.java +++ b/core/src/main/java/tachyon/conf/MasterConf.java @@ -65,9 +65,8 @@ private MasterConf() { getIntProperty("tachyon.master.heartbeat.interval.ms", Constants.SECOND_MS); SELECTOR_THREADS = getIntProperty("tachyon.master.selector.threads", 3); QUEUE_SIZE_PER_SELECTOR = getIntProperty("tachyon.master.queue.size.per.selector", 3000); - SERVER_THREADS = - getIntProperty("tachyon.master.server.threads", 2 * Runtime.getRuntime() - .availableProcessors()); + SERVER_THREADS = getIntProperty("tachyon.master.server.threads", + 2 * Runtime.getRuntime().availableProcessors()); WORKER_TIMEOUT_MS = getIntProperty("tachyon.master.worker.timeout.ms", 10 * Constants.SECOND_MS); diff --git a/core/src/main/java/tachyon/conf/Utils.java b/core/src/main/java/tachyon/conf/Utils.java index bfe4ec2b4ed2..892d0767283c 100644 --- a/core/src/main/java/tachyon/conf/Utils.java +++ b/core/src/main/java/tachyon/conf/Utils.java @@ -25,6 +25,12 @@ public static boolean getBooleanProperty(String property, boolean defaultValue) return Boolean.valueOf(getProperty(property, defaultValue + "")); } + public static > T getEnumProperty(String property, T defaultValue) { + final String val = getProperty(property, null); + return null == val ? defaultValue + : Enum.valueOf(defaultValue.getDeclaringClass(), val); + } + public static int getIntProperty(String property) { return Integer.valueOf(getProperty(property)); } @@ -33,6 +39,14 @@ public static int getIntProperty(String property, int defaultValue) { return Integer.valueOf(getProperty(property, defaultValue + "")); } + public static Integer getIntegerProperty(String property, Integer defaultValue) { + try { + return Integer.valueOf(getProperty(property, null)); + } catch (NumberFormatException e) { + return defaultValue; + } + } + public static long getLongProperty(String property) { return Long.valueOf(getProperty(property)); } diff --git a/core/src/main/java/tachyon/conf/WorkerConf.java b/core/src/main/java/tachyon/conf/WorkerConf.java index 9d702f2fed88..1d6a81fbbf08 100644 --- a/core/src/main/java/tachyon/conf/WorkerConf.java +++ b/core/src/main/java/tachyon/conf/WorkerConf.java @@ -1,7 +1,11 @@ package tachyon.conf; +import com.google.common.base.Optional; import tachyon.Constants; import tachyon.util.CommonUtils; +import tachyon.worker.NetworkType; +import tachyon.worker.netty.ChannelType; +import tachyon.worker.netty.FileTransferType; import tachyon.util.NetworkUtils; public class WorkerConf extends Utils { @@ -41,6 +45,16 @@ public static synchronized WorkerConf get() { public final int WORKER_PER_THREAD_CHECKPOINT_CAP_MB_SEC; + public final NetworkType NETWORK_TYPE; + public final ChannelType NETTY_CHANNEL_TYPE; + public final FileTransferType NETTY_FILE_TRANSFER_TYPE; + + public final int NETTY_HIGH_WATER_MARK; + public final int NETTY_LOW_WATER_MARK; + public final Optional NETTY_BACKLOG; + public final Optional NETTY_SEND_BUFFER; + public final Optional NETTY_RECIEVE_BUFFER; + private WorkerConf() { MASTER_HOSTNAME = getProperty("tachyon.master.hostname", NetworkUtils.getLocalHostName()); MASTER_PORT = getIntProperty("tachyon.master.port", Constants.DEFAULT_MASTER_PORT); @@ -66,5 +80,22 @@ private WorkerConf() { WORKER_CHECKPOINT_THREADS = getIntProperty("tachyon.worker.checkpoint.threads", 1); WORKER_PER_THREAD_CHECKPOINT_CAP_MB_SEC = getIntProperty("tachyon.worker.per.thread.checkpoint.cap.mb.sec", Constants.SECOND_MS); + + NETWORK_TYPE = getEnumProperty("tachyon.worker.network.type", NetworkType.NETTY); + NETTY_CHANNEL_TYPE = + getEnumProperty("tachyon.worker.network.netty.channel", ChannelType.defaultType()); + NETTY_FILE_TRANSFER_TYPE = + getEnumProperty("tachyon.worker.network.netty.file.transfer", FileTransferType.MAPPED); + + NETTY_HIGH_WATER_MARK = + getIntProperty("tachyon.worker.network.netty.watermark.high", 32 * 1024); + NETTY_LOW_WATER_MARK = getIntProperty("tachyon.worker.network.netty.watermark.low", 8 * 1024); + NETTY_BACKLOG = + Optional.fromNullable(getIntegerProperty("tachyon.worker.network.netty.backlog", null)); + NETTY_SEND_BUFFER = + Optional.fromNullable(getIntegerProperty("tachyon.worker.network.netty.buffer.send", null)); + NETTY_RECIEVE_BUFFER = + Optional.fromNullable(getIntegerProperty("tachyon.worker.network.netty.buffer.recieve", + null)); } } diff --git a/core/src/main/java/tachyon/worker/DataServer.java b/core/src/main/java/tachyon/worker/DataServer.java index 6a6d38dcd4e5..65cbc0b8ed67 100644 --- a/core/src/main/java/tachyon/worker/DataServer.java +++ b/core/src/main/java/tachyon/worker/DataServer.java @@ -1,244 +1,12 @@ package tachyon.worker; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.channels.spi.SelectorProvider; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.log4j.Logger; - -import com.google.common.base.Throwables; - -import tachyon.Constants; -import tachyon.Users; -import tachyon.conf.CommonConf; +import java.io.Closeable; /** - * The Server to serve data file read request from remote machines. The current implementation - * is based on non-blocking NIO. + * Defines how to interact with a server running the data protocol. */ -public class DataServer implements Runnable { - private static final Logger LOG = Logger.getLogger(Constants.LOGGER_TYPE); - - // The host:port combination to listen on - private InetSocketAddress mAddress; - - // The channel on which we will accept connections - private ServerSocketChannel mServerChannel; - - // The selector we will be monitoring. - private Selector mSelector; - - private Map mSendingData = Collections - .synchronizedMap(new HashMap()); - private Map mReceivingData = Collections - .synchronizedMap(new HashMap()); - - // The blocks locker manager. - private final BlocksLocker mBlocksLocker; - - private volatile boolean mShutdown = false; - private volatile boolean mShutdowned = false; - - /** - * Create a data server with direct access to worker storage. - * - * @param address - * The address of the data server. - * @param workerStorage - * The handler of the directly accessed worker storage. - */ - public DataServer(InetSocketAddress address, WorkerStorage workerStorage) { - LOG.info("Starting DataServer @ " + address); - CommonConf.assertValidPort(address); - mAddress = address; - mBlocksLocker = new BlocksLocker(workerStorage, Users.sDATASERVER_USER_ID); - try { - mSelector = initSelector(); - } catch (IOException e) { - LOG.error(e.getMessage() + mAddress, e); - throw Throwables.propagate(e); - } - } - - /** - * Gets the port listening on. - */ - int getPort() { - return mServerChannel.socket().getLocalPort(); - } - - private void accept(SelectionKey key) throws IOException { - // For an accept to be pending the channel must be a server socket channel - ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); - - // Accept the connection and make it non-blocking - SocketChannel socketChannel = serverSocketChannel.accept(); - socketChannel.configureBlocking(false); - - // Register the new SocketChannel with our Selector, indicating we'd like to be notified - // when there is data waiting to be read. - socketChannel.register(mSelector, SelectionKey.OP_READ); - } - - /** - * Close the data server. - * - * @throws IOException - */ - public void close() throws IOException { - mShutdown = true; - mServerChannel.close(); - mSelector.close(); - } - - private Selector initSelector() throws IOException { - // Create a new selector - Selector socketSelector = SelectorProvider.provider().openSelector(); - - // Create a new non-blocking server socket channel - mServerChannel = ServerSocketChannel.open(); - mServerChannel.configureBlocking(false); - - // Bind the server socket to the specified address and port - mServerChannel.socket().bind(mAddress); - - // Register the server socket channel, indicating an interest in accepting new connections. - mServerChannel.register(socketSelector, SelectionKey.OP_ACCEPT); - - return socketSelector; - } - - /** - * @return true if the server is closed, false otherwise - */ - public boolean isClosed() { - return mShutdowned; - } - - private void read(SelectionKey key) throws IOException { - SocketChannel socketChannel = (SocketChannel) key.channel(); - - DataServerMessage tMessage; - if (mReceivingData.containsKey(socketChannel)) { - tMessage = mReceivingData.get(socketChannel); - } else { - tMessage = DataServerMessage.createBlockRequestMessage(); - mReceivingData.put(socketChannel, tMessage); - } - - // Attempt to read off the channel - int numRead; - try { - numRead = tMessage.recv(socketChannel); - } catch (IOException e) { - // The remote forcibly closed the connection, cancel the selection key and close the channel. - key.cancel(); - socketChannel.close(); - mReceivingData.remove(socketChannel); - mSendingData.remove(socketChannel); - return; - } - - if (numRead == -1) { - // Remote entity shut the socket down cleanly. Do the same from our end and cancel the - // channel. - key.channel().close(); - key.cancel(); - mReceivingData.remove(socketChannel); - mSendingData.remove(socketChannel); - return; - } - - if (tMessage.isMessageReady()) { - if (tMessage.getBlockId() <= 0) { - LOG.error("Invalid block id " + tMessage.getBlockId()); - return; - } - - key.interestOps(SelectionKey.OP_WRITE); - LOG.info("Get request for " + tMessage.getBlockId()); - int lockId = mBlocksLocker.lock(tMessage.getBlockId()); - DataServerMessage tResponseMessage = - DataServerMessage.createBlockResponseMessage(true, tMessage.getBlockId(), - tMessage.getOffset(), tMessage.getLength()); - tResponseMessage.setLockId(lockId); - mSendingData.put(socketChannel, tResponseMessage); - } - } - - @Override - public void run() { - while (!mShutdown) { - try { - // Wait for an event one of the registered channels. - mSelector.select(); - if (mShutdown) { - break; - } - - // Iterate over the set of keys for which events are available - Iterator selectKeys = mSelector.selectedKeys().iterator(); - while (selectKeys.hasNext()) { - SelectionKey key = selectKeys.next(); - selectKeys.remove(); - - if (!key.isValid()) { - continue; - } - - // Check what event is available and deal with it. - // TODO These should be multi-thread. - if (key.isAcceptable()) { - accept(key); - } else if (key.isReadable()) { - read(key); - } else if (key.isWritable()) { - write(key); - } - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - if (mShutdown) { - break; - } - throw new RuntimeException(e); - } - } - mShutdowned = true; - } - - private void write(SelectionKey key) { - SocketChannel socketChannel = (SocketChannel) key.channel(); - - DataServerMessage sendMessage = mSendingData.get(socketChannel); - - boolean closeChannel = false; - try { - sendMessage.send(socketChannel); - } catch (IOException e) { - closeChannel = true; - LOG.error(e.getMessage()); - } +public interface DataServer extends Closeable { + int getPort(); - if (sendMessage.finishSending() || closeChannel) { - try { - key.channel().close(); - } catch (IOException e) { - LOG.error(e.getMessage()); - } - key.cancel(); - mReceivingData.remove(socketChannel); - mSendingData.remove(socketChannel); - sendMessage.close(); - mBlocksLocker.unlock(Math.abs(sendMessage.getBlockId()), sendMessage.getLockId()); - } - } -} \ No newline at end of file + boolean isClosed(); +} diff --git a/core/src/main/java/tachyon/worker/NetworkType.java b/core/src/main/java/tachyon/worker/NetworkType.java new file mode 100644 index 000000000000..bf2a6fc14e5e --- /dev/null +++ b/core/src/main/java/tachyon/worker/NetworkType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package tachyon.worker; + +/** + * Enum over the different networking implementations. + */ +public enum NetworkType { + NIO, + NETTY +} diff --git a/core/src/main/java/tachyon/worker/TachyonWorker.java b/core/src/main/java/tachyon/worker/TachyonWorker.java index a9a3cf2e03c8..54f5795f655f 100644 --- a/core/src/main/java/tachyon/worker/TachyonWorker.java +++ b/core/src/main/java/tachyon/worker/TachyonWorker.java @@ -13,6 +13,7 @@ import com.google.common.base.Throwables; import tachyon.Constants; +import tachyon.Users; import tachyon.Version; import tachyon.conf.CommonConf; import tachyon.conf.WorkerConf; @@ -22,6 +23,8 @@ import tachyon.thrift.WorkerService; import tachyon.util.CommonUtils; import tachyon.util.NetworkUtils; +import tachyon.worker.netty.NettyDataServer; +import tachyon.worker.nio.NIODataServer; /** * Entry point for a worker daemon. @@ -143,8 +146,6 @@ public static void main(String[] args) throws UnknownHostException { private final DataServer mDataServer; - private Thread mDataServerThread; - private Thread mHeartbeatThread; private volatile boolean mStop = false; @@ -188,10 +189,9 @@ private TachyonWorker(InetSocketAddress masterAddress, InetSocketAddress workerA // (any random free port). // In a production or any real deployment setup, port '0' should not be used as it will make // deployment more complicated. - mDataServer = - new DataServer(new InetSocketAddress(NetworkUtils.getFqdnHost(workerAddress), dataPort), - mWorkerStorage); - mDataServerThread = new Thread(mDataServer); + InetSocketAddress dataAddress = new InetSocketAddress(workerAddress.getHostName(), dataPort); + BlocksLocker blockLocker = new BlocksLocker(mWorkerStorage, Users.sDATASERVER_USER_ID); + mDataServer = createDataServer(dataAddress, blockLocker); mDataPort = mDataServer.getPort(); mHeartbeatThread = new Thread(this); @@ -216,6 +216,18 @@ private TachyonWorker(InetSocketAddress masterAddress, InetSocketAddress workerA mWorkerStorage.initialize(mWorkerAddress); } + private DataServer createDataServer(final InetSocketAddress dataAddress, + final BlocksLocker blockLocker) { + switch (WorkerConf.get().NETWORK_TYPE) { + case NIO: + return new NIODataServer(dataAddress, blockLocker); + case NETTY: + return new NettyDataServer(dataAddress, blockLocker); + default: + throw new AssertionError("Unknown network type: " + WorkerConf.get().NETWORK_TYPE); + } + } + /** * Gets the data port of the worker. For unit tests only. */ @@ -305,7 +317,6 @@ public void run() { * Start the data server thread and heartbeat thread of this TachyonWorker. */ public void start() { - mDataServerThread.start(); mHeartbeatThread.start(); LOG.info("The worker server started @ " + mWorkerAddress); diff --git a/core/src/main/java/tachyon/worker/netty/BlockRequest.java b/core/src/main/java/tachyon/worker/netty/BlockRequest.java new file mode 100644 index 000000000000..83e52d7b2c88 --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/BlockRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +import java.util.List; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +/** + * Request from the client for a given block. To go from netty to this object, + * {@link tachyon.worker.netty.BlockRequest.Decoder} is used. + */ +public final class BlockRequest { + private final long mBlockId; + private final long mOffset; + private final long mLength; + + public BlockRequest(long blockId, long offset, long length) { + mBlockId = blockId; + mOffset = offset; + mLength = length; + } + + public long getBlockId() { + return mBlockId; + } + + public long getOffset() { + return mOffset; + } + + public long getLength() { + return mLength; + } + + /** + * Creates a new {@link tachyon.worker.netty.BlockRequest} from the user's request. + */ + public static final class Decoder extends ByteToMessageDecoder { + private static final int MESSAGE_LENGTH = Shorts.BYTES + Longs.BYTES * 3; + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List out) + throws Exception { + if (in.readableBytes() < MESSAGE_LENGTH) { + return; + } + + // read the type and ignore it. Currently only one type exists + in.readShort(); // == DataServerMessage.DATA_SERVER_REQUEST_MESSAGE; + long blockId = in.readLong(); + long offset = in.readLong(); + long length = in.readLong(); + + out.add(new BlockRequest(blockId, offset, length)); + + // remove this from the pipeline so it won't be called again for this connection + ctx.channel().pipeline().remove(this); + } + } +} diff --git a/core/src/main/java/tachyon/worker/netty/BlockResponse.java b/core/src/main/java/tachyon/worker/netty/BlockResponse.java new file mode 100644 index 000000000000..24d3149feee1 --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/BlockResponse.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.List; + +import tachyon.conf.WorkerConf; +import tachyon.worker.nio.DataServerMessage; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.handler.codec.MessageToMessageEncoder; + +/** + * When a user sends a {@link tachyon.worker.netty.BlockRequest}, the response back is of this type. + *

+ * To serialize the response to network, {@link tachyon.worker.netty.BlockResponse.Encoder} is used. + */ +public final class BlockResponse { + private final long mBlockId; + private final long mOffset; + private final long mLength; + private final FileChannel mChannel; + + public BlockResponse(long blockId, long offset, long length, FileChannel channel) { + mBlockId = blockId; + mOffset = offset; + mLength = length; + mChannel = channel; + } + + /** + * Creates a {@link tachyon.worker.netty.BlockResponse} that represents a error case for the given + * block. + */ + public static BlockResponse createErrorResponse(final long blockId) { + return new BlockResponse(-blockId, 0, 0, null); + } + + public long getBlockId() { + return mBlockId; + } + + public long getOffset() { + return mOffset; + } + + public long getLength() { + return mLength; + } + + public FileChannel getChannel() { + return mChannel; + } + + /** + * Encodes a {@link tachyon.worker.netty.BlockResponse} to network. + */ + public static final class Encoder extends MessageToMessageEncoder { + private static final int MESSAGE_LENGTH = Shorts.BYTES + Longs.BYTES * 3; + + @Override + protected void encode(final ChannelHandlerContext ctx, final BlockResponse msg, + final List out) throws Exception { + out.add(createHeader(ctx, msg)); + if (msg.getChannel() != null) { + switch (WorkerConf.get().NETTY_FILE_TRANSFER_TYPE) { + case MAPPED: + MappedByteBuffer data = + msg.getChannel().map(FileChannel.MapMode.READ_ONLY, msg.getOffset(), + msg.getLength()); + out.add(Unpooled.wrappedBuffer(data)); + break; + case TRANSFER: + out.add(new DefaultFileRegion(msg.getChannel(), msg.getOffset(), msg.getLength())); + break; + default: + throw new AssertionError("Unknown file transfer type: " + + WorkerConf.get().NETTY_FILE_TRANSFER_TYPE); + } + } + } + + private ByteBuf createHeader(final ChannelHandlerContext ctx, final BlockResponse msg) { + ByteBuf header = ctx.alloc().buffer(MESSAGE_LENGTH); + header.writeShort(DataServerMessage.DATA_SERVER_RESPONSE_MESSAGE); + header.writeLong(msg.getBlockId()); + header.writeLong(msg.getOffset()); + header.writeLong(msg.getLength()); + return header; + } + } +} diff --git a/core/src/main/java/tachyon/worker/netty/ChannelType.java b/core/src/main/java/tachyon/worker/netty/ChannelType.java new file mode 100644 index 000000000000..bb2737f60fce --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/ChannelType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +import io.netty.channel.epoll.Epoll; + +/** + * What type of netty channel to use. + */ +public enum ChannelType { + NIO, + /** + * Use Linux's epoll for channel api. Only works on linux + */ + EPOLL; + + /** + * Determines the default type to use based off the system. + *

+ * On linux based systems, epoll will be selected for more consistent performance, for everything + * else nio is returned. + */ + public static ChannelType defaultType() { + if (Epoll.isAvailable()) { + return ChannelType.EPOLL; + } else { + return ChannelType.NIO; + } + } +} diff --git a/core/src/main/java/tachyon/worker/netty/ClosableResourceChannelListener.java b/core/src/main/java/tachyon/worker/netty/ClosableResourceChannelListener.java new file mode 100644 index 000000000000..cf4f1dd1128b --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/ClosableResourceChannelListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +import java.io.Closeable; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; + +/** + * A listener that will close the given resource when the operation completes. This class accepts + * null resources. + */ +final class ClosableResourceChannelListener implements ChannelFutureListener { + private final Closeable mResource; + + ClosableResourceChannelListener(Closeable resource) { + mResource = resource; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (mResource != null) { + mResource.close(); + } + } +} diff --git a/core/src/main/java/tachyon/worker/netty/DataServerHandler.java b/core/src/main/java/tachyon/worker/netty/DataServerHandler.java new file mode 100644 index 000000000000..804d1a4882db --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/DataServerHandler.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.apache.log4j.Logger; + +import tachyon.Constants; +import tachyon.conf.WorkerConf; +import tachyon.util.CommonUtils; +import tachyon.worker.BlocksLocker; + +import com.google.common.io.Closeables; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * Main logic for the read path. This class consumes {@link tachyon.worker.netty.BlockRequest} + * messages and returns {@link tachyon.worker.netty.BlockResponse} messages. + */ +@ChannelHandler.Sharable +public final class DataServerHandler extends ChannelInboundHandlerAdapter { + private static final Logger LOG = Logger.getLogger(Constants.LOGGER_TYPE); + + private final BlocksLocker mLocker; + + public DataServerHandler(BlocksLocker locker) { + mLocker = locker; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + // pipeline will make sure this is true + final BlockRequest req = (BlockRequest) msg; + + final long blockId = req.getBlockId(); + final long offset = req.getOffset(); + final long len = req.getLength(); + + final int lockId = mLocker.lock(blockId); + + RandomAccessFile file = null; + try { + validateInput(req); + + String filePath = CommonUtils.concat(WorkerConf.get().DATA_FOLDER, blockId); + LOG.info("Try to response remote request by reading from " + filePath); + + file = new RandomAccessFile(filePath, "r"); + long fileLength = file.length(); + validateBounds(req, fileLength); + + final long readLength = returnLength(offset, len, fileLength); + + FileChannel channel = file.getChannel(); + ChannelFuture future = + ctx.writeAndFlush(new BlockResponse(blockId, offset, readLength, channel)); + future.addListener(ChannelFutureListener.CLOSE); + future.addListener(new ClosableResourceChannelListener(file)); + LOG.info("Response remote request by reading from " + filePath + " preparation done."); + } catch (Exception e) { + // TODO This is a trick for now. The data may have been removed before remote retrieving. + LOG.error("The file is not here : " + e.getMessage(), e); + BlockResponse resp = BlockResponse.createErrorResponse(blockId); + ChannelFuture future = ctx.writeAndFlush(resp); + future.addListener(ChannelFutureListener.CLOSE); + if (file != null) { + Closeables.closeQuietly(file); + } + } finally { + mLocker.unlock(blockId, lockId); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.warn("Exception thrown while processing request", cause); + ctx.close(); + } + + /** + * Returns how much of a file to read. When {@code len} is {@code -1}, then + * {@code fileLength - offset} is used. + */ + private static long returnLength(final long offset, final long len, final long fileLength) { + if (len == -1) { + return fileLength - offset; + } else { + return len; + } + } + + private static void validateInput(final BlockRequest req) { + if (req.getOffset() < 0) { + throw new IllegalArgumentException("Offset can not be negative: " + req.getOffset()); + } + if (req.getLength() < 0 && req.getLength() != -1) { + String msg = "Length can not be negative except -1: " + req.getLength(); + throw new IllegalArgumentException(msg); + } + } + + private static void validateBounds(final BlockRequest req, final long fileLength) { + if (req.getOffset() > fileLength) { + String msg = + String.format("Offset(%d) is larger than file length(%d)", req.getOffset(), fileLength); + throw new IllegalArgumentException(msg); + } + if (req.getLength() != -1 && req.getOffset() + req.getLength() > fileLength) { + String msg = + String.format("Offset(%d) plus length(%d) is larger than file length(%d)", + req.getOffset(), req.getLength(), fileLength); + throw new IllegalArgumentException(msg); + } + } +} diff --git a/core/src/main/java/tachyon/worker/netty/FileTransferType.java b/core/src/main/java/tachyon/worker/netty/FileTransferType.java new file mode 100644 index 000000000000..721444fa8283 --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/FileTransferType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +/** + * How a read response will transfer block data over the network. There is a difference in speed and + * memory consumption between the two. {@link #MAPPED} is the default since at larger sizes it out + * performs {@link #TRANSFER} + */ +public enum FileTransferType { + /** + * Uses a {@link java.nio.MappedByteBuffer} to transfer data over the network + */ + MAPPED, + + /** + * Uses + * {@link java.nio.channels.FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)} + * to transfer data over the network + */ + TRANSFER +} diff --git a/core/src/main/java/tachyon/worker/netty/NettyDataServer.java b/core/src/main/java/tachyon/worker/netty/NettyDataServer.java new file mode 100644 index 000000000000..bcea262ba9c1 --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/NettyDataServer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.ThreadFactory; + +import tachyon.conf.WorkerConf; +import tachyon.worker.BlocksLocker; +import tachyon.worker.DataServer; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +/** + * Runs a netty server that will response to block requests. + */ +public final class NettyDataServer implements DataServer { + private final ServerBootstrap mBootstrap; + private final ChannelFuture mChannelFuture; + + public NettyDataServer(final SocketAddress address, final BlocksLocker locker) { + mBootstrap = createBootstrap().childHandler(new PipelineHandler(locker)); + + try { + mChannelFuture = mBootstrap.bind(address).sync(); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void close() throws IOException { + mChannelFuture.channel().close().awaitUninterruptibly(); + mBootstrap.group().shutdownGracefully(); + mBootstrap.childGroup().shutdownGracefully(); + } + + /** + * Gets the port listening on. + */ + @Override + public int getPort() { + // according to the docs, a InetSocketAddress is returned and the user must down-cast + return ((InetSocketAddress) mChannelFuture.channel().localAddress()).getPort(); + } + + @Override + public boolean isClosed() { + return mBootstrap.group().isShutdown(); + } + + private static ServerBootstrap createBootstrap() { + final WorkerConf conf = WorkerConf.get(); + ServerBootstrap boot = new ServerBootstrap(); + boot = setupGroups(boot, conf.NETTY_CHANNEL_TYPE); + + // use pooled buffers + boot.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + boot.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + // set write buffer + // this is the default, but its recommended to set it in case of change in future netty. + boot.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, conf.NETTY_HIGH_WATER_MARK); + boot.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, conf.NETTY_LOW_WATER_MARK); + + // more buffer settings + if (conf.NETTY_BACKLOG.isPresent()) { + boot.option(ChannelOption.SO_BACKLOG, conf.NETTY_BACKLOG.get()); + } + if (conf.NETTY_SEND_BUFFER.isPresent()) { + boot.option(ChannelOption.SO_SNDBUF, conf.NETTY_SEND_BUFFER.get()); + } + if (conf.NETTY_RECIEVE_BUFFER.isPresent()) { + boot.option(ChannelOption.SO_RCVBUF, conf.NETTY_RECIEVE_BUFFER.get()); + } + return boot; + } + + /** + * Creates a default {@link io.netty.bootstrap.ServerBootstrap} where the channel and groups are + * preset. Current channel type supported are nio and epoll. + */ + private static ServerBootstrap setupGroups(final ServerBootstrap boot, final ChannelType type) { + ThreadFactory workerFactory = createThreadFactory("data-server-%d"); + EventLoopGroup bossGroup; + EventLoopGroup workerGroup; + switch (type) { + case EPOLL: + bossGroup = new EpollEventLoopGroup(0, workerFactory); + workerGroup = bossGroup; + boot.channel(EpollServerSocketChannel.class); + break; + default: + bossGroup = new NioEventLoopGroup(0, workerFactory); + workerGroup = bossGroup; + boot.channel(NioServerSocketChannel.class); + } + boot.group(bossGroup, workerGroup); + return boot; + } + + private static ThreadFactory createThreadFactory(final String nameFormat) { + return new ThreadFactoryBuilder().setNameFormat(nameFormat).build(); + } +} diff --git a/core/src/main/java/tachyon/worker/netty/PipelineHandler.java b/core/src/main/java/tachyon/worker/netty/PipelineHandler.java new file mode 100644 index 000000000000..dc2ab2818e38 --- /dev/null +++ b/core/src/main/java/tachyon/worker/netty/PipelineHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package tachyon.worker.netty; + +import tachyon.worker.BlocksLocker; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; + +/** + * Adds the block server's pipeline into the channel. + */ +public final class PipelineHandler extends ChannelInitializer { + private final BlocksLocker mLocker; + + public PipelineHandler(BlocksLocker locker) { + mLocker = locker; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("nioChunkedWriter", new ChunkedWriteHandler()); + pipeline.addLast("blockRequestDecoder", new BlockRequest.Decoder()); + pipeline.addLast("blockResponseEncoder", new BlockResponse.Encoder()); + pipeline.addLast("dataServerHandler", new DataServerHandler(mLocker)); + } +} diff --git a/core/src/main/java/tachyon/worker/DataServerMessage.java b/core/src/main/java/tachyon/worker/nio/DataServerMessage.java similarity index 99% rename from core/src/main/java/tachyon/worker/DataServerMessage.java rename to core/src/main/java/tachyon/worker/nio/DataServerMessage.java index 117e0a1e0e87..6756c573db7f 100644 --- a/core/src/main/java/tachyon/worker/DataServerMessage.java +++ b/core/src/main/java/tachyon/worker/nio/DataServerMessage.java @@ -1,4 +1,4 @@ -package tachyon.worker; +package tachyon.worker.nio; import java.io.IOException; import java.io.RandomAccessFile; diff --git a/core/src/main/java/tachyon/worker/nio/NIODataServer.java b/core/src/main/java/tachyon/worker/nio/NIODataServer.java new file mode 100644 index 000000000000..b4e35fc4d53d --- /dev/null +++ b/core/src/main/java/tachyon/worker/nio/NIODataServer.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package tachyon.worker.nio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import com.google.common.io.Closeables; +import org.apache.log4j.Logger; + +import com.google.common.base.Throwables; + +import tachyon.Constants; +import tachyon.conf.CommonConf; +import tachyon.worker.BlocksLocker; +import tachyon.worker.DataServer; + +/** + * The Server to serve data file read request from remote machines. The current implementation + * is based on non-blocking NIO. + */ +public class NIODataServer implements Runnable, DataServer { + private static final Logger LOG = Logger.getLogger(Constants.LOGGER_TYPE); + + // The host:port combination to listen on + private InetSocketAddress mAddress; + + // The channel on which we will accept connections + private ServerSocketChannel mServerChannel; + + // The selector we will be monitoring. + private Selector mSelector; + + private Map mSendingData = Collections + .synchronizedMap(new HashMap()); + private Map mReceivingData = Collections + .synchronizedMap(new HashMap()); + + // The blocks locker manager. + private final BlocksLocker mBlockLocker; + private final Thread mListenerThread; + + private volatile boolean mShutdown = false; + private volatile boolean mShutdowned = false; + + /** + * Create a data server with direct access to worker storage. + * + * @param address + * The address of the data server. + * @param locker + * The lock system for lock blocks. + */ + public NIODataServer(InetSocketAddress address, BlocksLocker locker) { + LOG.info("Starting DataServer @ " + address); + CommonConf.assertValidPort(address); + mAddress = address; + mBlockLocker = locker; + try { + mSelector = initSelector(); + mListenerThread = new Thread(this); + mListenerThread.start(); + } catch (IOException e) { + LOG.error(e.getMessage() + mAddress, e); + throw Throwables.propagate(e); + } + } + + /** + * Gets the port listening on. + */ + @Override + public int getPort() { + return mServerChannel.socket().getLocalPort(); + } + + private void accept(SelectionKey key) throws IOException { + // For an accept to be pending the channel must be a server socket channel + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + + // Accept the connection and make it non-blocking + SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + + // Register the new SocketChannel with our Selector, indicating we'd like to be notified + // when there is data waiting to be read. + socketChannel.register(mSelector, SelectionKey.OP_READ); + } + + /** + * Close the data server. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + mShutdown = true; + mServerChannel.close(); + mSelector.close(); + } + + private Selector initSelector() throws IOException { + // Create a new selector + Selector socketSelector = SelectorProvider.provider().openSelector(); + + // Create a new non-blocking server socket channel + try { + mServerChannel = ServerSocketChannel.open(); + mServerChannel.configureBlocking(false); + + // Bind the server socket to the specified address and port + mServerChannel.socket().bind(mAddress); + + // Register the server socket channel, indicating an interest in accepting new connections. + mServerChannel.register(socketSelector, SelectionKey.OP_ACCEPT); + + return socketSelector; + } catch (IOException e) { + // we wan't to throw the original IO issue, not the close issue, so don't throw + // #close IOException. + Closeables.closeQuietly(socketSelector); + throw e; + } catch (RuntimeException e) { + // we wan't to throw the original IO issue, not the close issue, so don't throw + // #close IOException. + Closeables.closeQuietly(socketSelector); + throw e; + } + } + + /** + * @return true if the server is closed, false otherwise + */ + @Override + public boolean isClosed() { + return mShutdowned; + } + + private void read(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + + DataServerMessage tMessage; + if (mReceivingData.containsKey(socketChannel)) { + tMessage = mReceivingData.get(socketChannel); + } else { + tMessage = DataServerMessage.createBlockRequestMessage(); + mReceivingData.put(socketChannel, tMessage); + } + + // Attempt to read off the channel + int numRead; + try { + numRead = tMessage.recv(socketChannel); + } catch (IOException e) { + // The remote forcibly closed the connection, cancel the selection key and close the channel. + key.cancel(); + socketChannel.close(); + mReceivingData.remove(socketChannel); + mSendingData.remove(socketChannel); + return; + } + + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do the same from our end and cancel the + // channel. + key.channel().close(); + key.cancel(); + mReceivingData.remove(socketChannel); + mSendingData.remove(socketChannel); + return; + } + + if (tMessage.isMessageReady()) { + if (tMessage.getBlockId() <= 0) { + LOG.error("Invalid block id " + tMessage.getBlockId()); + return; + } + + key.interestOps(SelectionKey.OP_WRITE); + LOG.info("Get request for " + tMessage.getBlockId()); + int lockId = mBlockLocker.lock(tMessage.getBlockId()); + DataServerMessage tResponseMessage = + DataServerMessage.createBlockResponseMessage(true, tMessage.getBlockId(), + tMessage.getOffset(), tMessage.getLength()); + tResponseMessage.setLockId(lockId); + mSendingData.put(socketChannel, tResponseMessage); + } + } + + @Override + public void run() { + while (!mShutdown) { + try { + // Wait for an event one of the registered channels. + mSelector.select(); + if (mShutdown) { + break; + } + + // Iterate over the set of keys for which events are available + Iterator selectKeys = mSelector.selectedKeys().iterator(); + while (selectKeys.hasNext()) { + SelectionKey key = selectKeys.next(); + selectKeys.remove(); + + if (!key.isValid()) { + continue; + } + + // Check what event is available and deal with it. + // TODO These should be multi-thread. + if (key.isAcceptable()) { + accept(key); + } else if (key.isReadable()) { + read(key); + } else if (key.isWritable()) { + write(key); + } + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + if (mShutdown) { + break; + } + throw new RuntimeException(e); + } + } + mShutdowned = true; + } + + private void write(SelectionKey key) { + SocketChannel socketChannel = (SocketChannel) key.channel(); + + DataServerMessage sendMessage = mSendingData.get(socketChannel); + + boolean closeChannel = false; + try { + sendMessage.send(socketChannel); + } catch (IOException e) { + closeChannel = true; + LOG.error(e.getMessage()); + } + + if (sendMessage.finishSending() || closeChannel) { + try { + key.channel().close(); + } catch (IOException e) { + LOG.error(e.getMessage()); + } + key.cancel(); + mReceivingData.remove(socketChannel); + mSendingData.remove(socketChannel); + sendMessage.close(); + mBlockLocker.unlock(Math.abs(sendMessage.getBlockId()), sendMessage.getLockId()); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/tachyon/worker/package-info.java b/core/src/main/java/tachyon/worker/package-info.java index 2688951dcd18..fe3ead241f7f 100644 --- a/core/src/main/java/tachyon/worker/package-info.java +++ b/core/src/main/java/tachyon/worker/package-info.java @@ -1,52 +1,57 @@ - /** * Worker process and utils for working with the worker remotely. - * - * Main entry point for the worker is {@link tachyon.worker.TachyonWorker#main(String[])} which - * gets started by the tachyon start scripts. The {@link tachyon.worker.TachyonWorker} class - * spins up the different RPC services (thrift, data) which are mostly wrappers around + * + * Main entry point for the worker is {@link tachyon.worker.TachyonWorker#main(String[])} which gets + * started by the tachyon start scripts. The {@link tachyon.worker.TachyonWorker} class spins up the + * different RPC services (thrift, data) which are mostly wrappers around * {@link tachyon.worker.WorkerStorage}. - * + * *

Services

- * + * *

Thrift

- * - * The thrift service is mostly responsible for metadata operations (with a few operations - * that effect the worker's cached memory). - * + * + * The thrift service is mostly responsible for metadata operations (with a few operations that + * effect the worker's cached memory). + * *

Checkpoint

- * - * The act of moving temporary data into accessible data on {@link tachyon.UnderFileSystem}. This - * is triggered by {@link tachyon.client.WriteType#isThrough()} operations. - * + * + * The act of moving temporary data into accessible data on {@link tachyon.UnderFileSystem}. This is + * triggered by {@link tachyon.client.WriteType#isThrough()} operations. + * * Implementation can be found at {@link tachyon.worker.WorkerStorage#addCheckpoint(long, int)} - * + * *

Cache Block

- * + * * Move's user generated blocks to the tachyon data directory. This operation expects that the * caller is a local (to the node) caller, and that the input are under the user directories. - * + * * Implementation can be found at {@link tachyon.worker.WorkerStorage#cacheBlock(long, long)} - * + * *

Lock / Unlock

- * - * Tachyon supports cacheing blocks to local disk (client side). When this happens, a lock is - * given to the client to protect it from the remote block changing. - * + * + * Tachyon supports cacheing blocks to local disk (client side). When this happens, a lock is given + * to the client to protect it from the remote block changing. + * * Implementation can be found at {@link tachyon.worker.WorkerStorage#lockBlock(long, long)} and * {@link tachyon.worker.WorkerStorage#unlockBlock(long, long)}. - * + * *

Data

- * + * * This service is the main interaction between users and reading blocks. Currently this service * only supports reading blocks (writing is to local disk). - * - * {@link tachyon.worker.DataServer} is the main entrypoint for the service to start and listen on. - * The service is implemented with raw NIO. Take a look at {@link tachyon.worker.DataServer#run()} - * to see what the server does, but to see what the data returned looks like, thats the - * {@link tachyon.worker.DataServerMessage}. This has two different types: read - * {@link tachyon.worker.DataServerMessage#createBlockRequestMessage(long, long, long)} and write - * {@link tachyon.worker.DataServerMessage#createBlockResponseMessage(boolean, long, long, long)} - * + * + * There are two different implementations of this layer: + * {@link tachyon.worker.netty.NettyDataServer} and {@link tachyon.worker.nio.NIODataServer}; netty + * is the default. To support both, a {@link tachyon.worker.DataServer} interface is used that just + * defines how to start/stop, and get port details; to start, object init is used. + * + * The current read protocol is defined in {@link tachyon.worker.nio.DataServerMessage}. This has + * two different types: read + * {@link tachyon.worker.nio.DataServerMessage#createBlockRequestMessage(long, long, long)} and + * write + * {@link tachyon.worker.nio.DataServerMessage#createBlockResponseMessage(boolean, long, long, long)} + * . Side note, the netty implementation does not use this class, but has defined two classes for + * the read and write case: {@link tachyon.worker.netty.BlockRequest}, + * {@link tachyon.worker.netty.BlockResponse}; theses classes are network compatible. */ -package tachyon.worker; \ No newline at end of file +package tachyon.worker; diff --git a/core/src/test/java/tachyon/worker/DataServerTest.java b/core/src/test/java/tachyon/worker/DataServerTest.java index 4e58f141d75b..abaec4ca9a29 100644 --- a/core/src/test/java/tachyon/worker/DataServerTest.java +++ b/core/src/test/java/tachyon/worker/DataServerTest.java @@ -2,116 +2,208 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import tachyon.TestUtils; import tachyon.client.TachyonFS; import tachyon.client.WriteType; import tachyon.master.LocalTachyonCluster; +import tachyon.thrift.ClientBlockInfo; import tachyon.thrift.FileAlreadyExistException; import tachyon.thrift.InvalidPathException; -import tachyon.thrift.NetAddress; +import tachyon.worker.nio.DataServerMessage; /** * Unit tests for tachyon.DataServer. */ +@RunWith(Parameterized.class) public class DataServerTest { private static final int WORKER_CAPACITY_BYTES = 1000; private static final int USER_QUOTA_UNIT_BYTES = 100; + private final NetworkType mType; private LocalTachyonCluster mLocalTachyonCluster = null; private TachyonFS mTFS = null; + public DataServerTest(NetworkType type) { + this.mType = type; + } + @After public final void after() throws Exception { mLocalTachyonCluster.stop(); System.clearProperty("tachyon.user.quota.unit.bytes"); + System.clearProperty("tachyon.worker.network.type"); } @Before public final void before() throws IOException { System.setProperty("tachyon.user.quota.unit.bytes", USER_QUOTA_UNIT_BYTES + ""); + System.setProperty("tachyon.worker.network.type", mType.toString()); mLocalTachyonCluster = new LocalTachyonCluster(WORKER_CAPACITY_BYTES); mLocalTachyonCluster.start(); mTFS = mLocalTachyonCluster.getClient(); } + @Parameterized.Parameters + public static Collection data() { + // creates a new instance of DataServerTest for each network type + List list = new ArrayList(); + for (final NetworkType type : NetworkType.values()) { + list.add(new Object[] { type }); + } + return list; + } + @Test public void readPartialTest1() throws InvalidPathException, FileAlreadyExistException, IOException { int fileId = TestUtils.createByteFile(mTFS, "/testFile", WriteType.MUST_CACHE, 10); - long blockId = mTFS.getBlockId(fileId, 0); - DataServerMessage sendMsg; - sendMsg = DataServerMessage.createBlockRequestMessage(blockId, 0, 6); - NetAddress firstBlock = mTFS.getFileBlocks(fileId).get(0).getLocations().get(0); - SocketChannel socketChannel = - SocketChannel.open(new InetSocketAddress(firstBlock.mHost, firstBlock.mSecondaryPort)); - while (!sendMsg.finishSending()) { - sendMsg.send(socketChannel); - } - DataServerMessage recvMsg = DataServerMessage.createBlockResponseMessage(false, blockId, 0, 6); - while (!recvMsg.isMessageReady()) { - int numRead = recvMsg.recv(socketChannel); - if (numRead == -1) { - break; - } - } - socketChannel.close(); - Assert.assertEquals(TestUtils.getIncreasingByteBuffer(6), recvMsg.getReadOnlyData()); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + final int offset = 0; + final int length = 6; + DataServerMessage recvMsg = request(block, offset, length); + assertValid(recvMsg, length, block.getBlockId(), offset, length); } @Test public void readPartialTest2() throws InvalidPathException, FileAlreadyExistException, IOException { int fileId = TestUtils.createByteFile(mTFS, "/testFile", WriteType.MUST_CACHE, 10); - long blockId = mTFS.getBlockId(fileId, 0); - DataServerMessage sendMsg; - sendMsg = DataServerMessage.createBlockRequestMessage(blockId, 2, 6); - SocketChannel socketChannel = - SocketChannel - .open(new InetSocketAddress( - mTFS.getFileBlocks(fileId).get(0).getLocations().get(0).mHost, mTFS - .getFileBlocks(fileId).get(0).getLocations().get(0).mSecondaryPort)); - while (!sendMsg.finishSending()) { - sendMsg.send(socketChannel); - } - DataServerMessage recvMsg = DataServerMessage.createBlockResponseMessage(false, blockId, 2, 6); - while (!recvMsg.isMessageReady()) { - int numRead = recvMsg.recv(socketChannel); - if (numRead == -1) { - break; - } - } - socketChannel.close(); - Assert.assertEquals(TestUtils.getIncreasingByteBuffer(2, 6), recvMsg.getReadOnlyData()); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + final int offset = 2; + final int length = 6; + DataServerMessage recvMsg = request(block, offset, length); + assertValid(recvMsg, TestUtils.getIncreasingByteBuffer(offset, length), block.getBlockId(), + offset, length); } @Test public void readTest() throws InvalidPathException, FileAlreadyExistException, IOException { - int fileId = TestUtils.createByteFile(mTFS, "/testFile", WriteType.MUST_CACHE, 10); - long blockId = mTFS.getBlockId(fileId, 0); - DataServerMessage sendMsg = DataServerMessage.createBlockRequestMessage(blockId); - SocketChannel socketChannel = - SocketChannel - .open(new InetSocketAddress( - mTFS.getFileBlocks(fileId).get(0).getLocations().get(0).mHost, mTFS - .getFileBlocks(fileId).get(0).getLocations().get(0).mSecondaryPort)); - while (!sendMsg.finishSending()) { - sendMsg.send(socketChannel); + final int length = 10; + int fileId = TestUtils.createByteFile(mTFS, "/testFile", WriteType.MUST_CACHE, length); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + DataServerMessage recvMsg = request(block); + assertValid(recvMsg, length, block.getBlockId(), 0, length); + } + + @Test + public void multiReadTest() throws IOException { + final int length = 20; + int fileId = TestUtils.createByteFile(mTFS, "/multiReadTest", WriteType.MUST_CACHE, length); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + for (int i = 0; i < 10; i ++) { + DataServerMessage recvMsg = request(block); + assertValid(recvMsg, length, block.getBlockId(), 0, length); } - DataServerMessage recvMsg = DataServerMessage.createBlockResponseMessage(false, blockId); - while (!recvMsg.isMessageReady()) { - int numRead = recvMsg.recv(socketChannel); - if (numRead == -1) { - break; + } + + @Test + public void readTooLarge() throws IOException { + final int length = 20; + int fileId = TestUtils.createByteFile(mTFS, "/readTooLarge", WriteType.MUST_CACHE, length); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + DataServerMessage recvMsg = request(block, 0, length * 2); + assertError(recvMsg, block.blockId); + } + + @Test + public void lengthTooSmall() throws IOException { + final int length = 20; + int fileId = TestUtils.createByteFile(mTFS, "/readTooLarge", WriteType.MUST_CACHE, length); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + DataServerMessage recvMsg = request(block, 0, length * -2); + assertError(recvMsg, block.blockId); + } + + @Test + public void tooLargeOffset() throws IOException { + final int length = 10; + int fileId = TestUtils.createByteFile(mTFS, "/readTooLarge", WriteType.MUST_CACHE, length); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + DataServerMessage recvMsg = request(block, length * 2, 1); + assertError(recvMsg, block.blockId); + } + + @Test + public void negativeOffset() throws IOException { + final int length = 10; + int fileId = TestUtils.createByteFile(mTFS, "/readTooLarge", WriteType.MUST_CACHE, length); + ClientBlockInfo block = mTFS.getFileBlocks(fileId).get(0); + DataServerMessage recvMsg = request(block, length * -2, 1); + assertError(recvMsg, block.blockId); + } + + /** + * Asserts that the message back matches the block response protocols for the error case. + */ + private void assertError(final DataServerMessage msg, final long blockId) { + assertValid(msg, 0, -1 * blockId, 0, 0); + } + + /** + * Asserts that the message back matches the block response protocols. + */ + private void assertValid(final DataServerMessage msg, final int expectedSize, + final long blockId, final long offset, final long length) { + assertValid(msg, TestUtils.getIncreasingByteBuffer(expectedSize), blockId, offset, length); + } + + /** + * Asserts that the message back matches the block response protocols. + */ + private void assertValid(final DataServerMessage msg, final ByteBuffer expectedData, + final long blockId, final long offset, final long length) { + Assert.assertEquals(expectedData, msg.getReadOnlyData()); + Assert.assertEquals(blockId, msg.getBlockId()); + Assert.assertEquals(offset, msg.getOffset()); + Assert.assertEquals(length, msg.getLength()); + } + + /** + * Requests a block from the server. This call will read the full block. + */ + private DataServerMessage request(final ClientBlockInfo block) throws IOException { + return request(block, 0, -1); + } + + /** + * Create a new socket to the data port and send a block request. The returned value is + * the response from the server. + */ + private DataServerMessage request(final ClientBlockInfo block, final long offset, + final long length) throws IOException { + DataServerMessage sendMsg = + DataServerMessage.createBlockRequestMessage(block.blockId, offset, length); + SocketChannel socketChannel = + SocketChannel.open(new InetSocketAddress(block.getLocations().get(0).mHost, block + .getLocations().get(0).mSecondaryPort)); + try { + while (!sendMsg.finishSending()) { + sendMsg.send(socketChannel); + } + DataServerMessage recvMsg = + DataServerMessage.createBlockResponseMessage(false, block.blockId, offset, length); + while (!recvMsg.isMessageReady()) { + int numRead = recvMsg.recv(socketChannel); + if (numRead == -1) { + break; + } } + return recvMsg; + } finally { + socketChannel.close(); } - socketChannel.close(); - Assert.assertEquals(TestUtils.getIncreasingByteBuffer(10), recvMsg.getReadOnlyData()); } } diff --git a/docs/Configuration-Settings.md b/docs/Configuration-Settings.md index b6119e6e2ad5..1d8df69c13c8 100644 --- a/docs/Configuration-Settings.md +++ b/docs/Configuration-Settings.md @@ -171,6 +171,47 @@ number. 128 MB Memory capacity of each worker node. + + tachyon.worker.network.type + NETTY + Selects networking stack to run the worker with. Valid options are NETTY and NIO. + + + tachyon.worker.network.netty.channel + EPOLL + Selects netty's channel implementation. On linux, epoll is used; valid options are NIO and EPOLL. + + + tachyon.worker.network.netty.file.transfer + MAPPED + When returning files to the user, select how the data is transferred; valid options are MAPPED (uses java MappedByteBuffer) and TRANSFER (uses Java FileChannel.transferTo). + + + + tachyon.worker.network.netty.watermark.high + 32768 + Determines how many bytes can be in the write queue before channels isWritable is set to false. + + + tachyon.worker.network.netty.watermark.low + 8192 + Once the high watermark limit is reached, the queue must be flushed down to the low watermark before switching back to writable. + + + tachyon.worker.network.netty.backlog + 128 on linux + How many requests can be queued up before new requests are rejected; this value is platform dependent. + + + tachyon.worker.network.netty.buffer.send + platform specific + Sets SO_SNDBUF for the socket; more details can be found in the socket man page. + + + tachyon.worker.network.netty.buffer.recieve + platform specific + Sets SO_RCVBUF for the socket; more details can be found in the socket man page. + # User Configuration @@ -204,4 +245,9 @@ The user configuration specifies values regarding file system access. 1 MB The size of the file buffer to read data from remote Tachyon worker. + + tachyon.worker.network.netty.process.threads + 16 + How many threads to use to process block requests. +