Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
<version>14.0.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional dependencies might be needed here?
See http://netty.io/wiki/native-transports.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all should contain everything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, all contains the native as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the netty-all includes everything including netty-transport-native-epoll (for linux)

<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jsp</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/tachyon/client/RemoteBlockInStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import tachyon.thrift.NetAddress;
import tachyon.util.CommonUtils;
import tachyon.util.NetworkUtils;
import tachyon.worker.DataServerMessage;
import tachyon.worker.nio.DataServerMessage;

/**
* BlockInStream for remote block.
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/tachyon/client/TachyonFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
Expand All @@ -23,7 +24,7 @@
import tachyon.thrift.NetAddress;
import tachyon.util.CommonUtils;
import tachyon.util.NetworkUtils;
import tachyon.worker.DataServerMessage;
import tachyon.worker.nio.DataServerMessage;

/**
* Tachyon File.
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/java/tachyon/conf/MasterConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ private MasterConf() {
getIntProperty("tachyon.master.heartbeat.interval.ms", Constants.SECOND_MS);
SELECTOR_THREADS = getIntProperty("tachyon.master.selector.threads", 3);
QUEUE_SIZE_PER_SELECTOR = getIntProperty("tachyon.master.queue.size.per.selector", 3000);
SERVER_THREADS =
getIntProperty("tachyon.master.server.threads", 2 * Runtime.getRuntime()
.availableProcessors());
SERVER_THREADS = getIntProperty("tachyon.master.server.threads",
2 * Runtime.getRuntime().availableProcessors());
WORKER_TIMEOUT_MS =
getIntProperty("tachyon.master.worker.timeout.ms", 10 * Constants.SECOND_MS);

Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/tachyon/conf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public static boolean getBooleanProperty(String property, boolean defaultValue)
return Boolean.valueOf(getProperty(property, defaultValue + ""));
}

public static <T extends Enum<T>> T getEnumProperty(String property, T defaultValue) {
final String val = getProperty(property, null);
return null == val ? defaultValue
: Enum.valueOf(defaultValue.getDeclaringClass(), val);
}

public static int getIntProperty(String property) {
return Integer.valueOf(getProperty(property));
}
Expand All @@ -33,6 +39,14 @@ public static int getIntProperty(String property, int defaultValue) {
return Integer.valueOf(getProperty(property, defaultValue + ""));
}

public static Integer getIntegerProperty(String property, Integer defaultValue) {
try {
return Integer.valueOf(getProperty(property, null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the second argument of getProperty is defaultValue

return Integer.valueOf(getProperty(property, defaultValue));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, need a null so can't use the primitive version, nor can I use
a string because my default is a integer already.
On Aug 28, 2014 12:02 AM, "Mubarak Seyed" notifications@github.com wrote:

In core/src/main/java/tachyon/conf/Utils.java:

@@ -47,6 +53,14 @@ public static int getIntProperty(String property, int defaultValue) {
return Integer.valueOf(getProperty(property, defaultValue + ""));
}

  • public static Integer getIntegerProperty(String property, Integer defaultValue) {
  • try {
  •  return Integer.valueOf(getProperty(property, null));
    

I think the second argument of getProperty is defaultValue

return Integer.valueOf(getProperty(property, defaultValue));


Reply to this email directly or view it on GitHub
https://github.com/amplab/tachyon/pull/333/files#r16823837.

} catch (NumberFormatException e) {
return defaultValue;
}
}

public static long getLongProperty(String property) {
return Long.valueOf(getProperty(property));
}
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/java/tachyon/conf/WorkerConf.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package tachyon.conf;

import com.google.common.base.Optional;
import tachyon.Constants;
import tachyon.util.CommonUtils;
import tachyon.worker.NetworkType;
import tachyon.worker.netty.ChannelType;
import tachyon.worker.netty.FileTransferType;
import tachyon.util.NetworkUtils;

public class WorkerConf extends Utils {
Expand Down Expand Up @@ -41,6 +45,16 @@ public static synchronized WorkerConf get() {

public final int WORKER_PER_THREAD_CHECKPOINT_CAP_MB_SEC;

public final NetworkType NETWORK_TYPE;
public final ChannelType NETTY_CHANNEL_TYPE;
public final FileTransferType NETTY_FILE_TRANSFER_TYPE;

public final int NETTY_HIGH_WATER_MARK;
public final int NETTY_LOW_WATER_MARK;
public final Optional<Integer> NETTY_BACKLOG;
public final Optional<Integer> NETTY_SEND_BUFFER;
public final Optional<Integer> NETTY_RECIEVE_BUFFER;

private WorkerConf() {
MASTER_HOSTNAME = getProperty("tachyon.master.hostname", NetworkUtils.getLocalHostName());
MASTER_PORT = getIntProperty("tachyon.master.port", Constants.DEFAULT_MASTER_PORT);
Expand All @@ -66,5 +80,22 @@ private WorkerConf() {
WORKER_CHECKPOINT_THREADS = getIntProperty("tachyon.worker.checkpoint.threads", 1);
WORKER_PER_THREAD_CHECKPOINT_CAP_MB_SEC =
getIntProperty("tachyon.worker.per.thread.checkpoint.cap.mb.sec", Constants.SECOND_MS);

NETWORK_TYPE = getEnumProperty("tachyon.worker.network.type", NetworkType.NETTY);
NETTY_CHANNEL_TYPE =
getEnumProperty("tachyon.worker.network.netty.channel", ChannelType.defaultType());
NETTY_FILE_TRANSFER_TYPE =
getEnumProperty("tachyon.worker.network.netty.file.transfer", FileTransferType.MAPPED);

NETTY_HIGH_WATER_MARK =
getIntProperty("tachyon.worker.network.netty.watermark.high", 32 * 1024);
NETTY_LOW_WATER_MARK = getIntProperty("tachyon.worker.network.netty.watermark.low", 8 * 1024);
NETTY_BACKLOG =
Optional.fromNullable(getIntegerProperty("tachyon.worker.network.netty.backlog", null));
NETTY_SEND_BUFFER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but just noticed that if this is not set then the default sets null?
Do you know what Netty sets the socket send buffer if we don't set manually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, so I would suggest to just update our doc accordingly to make it clear.
Also not sure if you have seen this link http://fasterdata.es.net/host-tuning/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the link

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated config doc to be more descriptive.

Optional.fromNullable(getIntegerProperty("tachyon.worker.network.netty.buffer.send", null));
NETTY_RECIEVE_BUFFER =
Optional.fromNullable(getIntegerProperty("tachyon.worker.network.netty.buffer.recieve",
null));
}
}
244 changes: 6 additions & 238 deletions core/src/main/java/tachyon/worker/DataServer.java
Original file line number Diff line number Diff line change
@@ -1,244 +1,12 @@
package tachyon.worker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.log4j.Logger;

import com.google.common.base.Throwables;

import tachyon.Constants;
import tachyon.Users;
import tachyon.conf.CommonConf;
import java.io.Closeable;

/**
* The Server to serve data file read request from remote machines. The current implementation
* is based on non-blocking NIO.
* Defines how to interact with a server running the data protocol.
*/
public class DataServer implements Runnable {
private static final Logger LOG = Logger.getLogger(Constants.LOGGER_TYPE);

// The host:port combination to listen on
private InetSocketAddress mAddress;

// The channel on which we will accept connections
private ServerSocketChannel mServerChannel;

// The selector we will be monitoring.
private Selector mSelector;

private Map<SocketChannel, DataServerMessage> mSendingData = Collections
.synchronizedMap(new HashMap<SocketChannel, DataServerMessage>());
private Map<SocketChannel, DataServerMessage> mReceivingData = Collections
.synchronizedMap(new HashMap<SocketChannel, DataServerMessage>());

// The blocks locker manager.
private final BlocksLocker mBlocksLocker;

private volatile boolean mShutdown = false;
private volatile boolean mShutdowned = false;

/**
* Create a data server with direct access to worker storage.
*
* @param address
* The address of the data server.
* @param workerStorage
* The handler of the directly accessed worker storage.
*/
public DataServer(InetSocketAddress address, WorkerStorage workerStorage) {
LOG.info("Starting DataServer @ " + address);
CommonConf.assertValidPort(address);
mAddress = address;
mBlocksLocker = new BlocksLocker(workerStorage, Users.sDATASERVER_USER_ID);
try {
mSelector = initSelector();
} catch (IOException e) {
LOG.error(e.getMessage() + mAddress, e);
throw Throwables.propagate(e);
}
}

/**
* Gets the port listening on.
*/
int getPort() {
return mServerChannel.socket().getLocalPort();
}

private void accept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);

// Register the new SocketChannel with our Selector, indicating we'd like to be notified
// when there is data waiting to be read.
socketChannel.register(mSelector, SelectionKey.OP_READ);
}

/**
* Close the data server.
*
* @throws IOException
*/
public void close() throws IOException {
mShutdown = true;
mServerChannel.close();
mSelector.close();
}

private Selector initSelector() throws IOException {
// Create a new selector
Selector socketSelector = SelectorProvider.provider().openSelector();

// Create a new non-blocking server socket channel
mServerChannel = ServerSocketChannel.open();
mServerChannel.configureBlocking(false);

// Bind the server socket to the specified address and port
mServerChannel.socket().bind(mAddress);

// Register the server socket channel, indicating an interest in accepting new connections.
mServerChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

return socketSelector;
}

/**
* @return true if the server is closed, false otherwise
*/
public boolean isClosed() {
return mShutdowned;
}

private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();

DataServerMessage tMessage;
if (mReceivingData.containsKey(socketChannel)) {
tMessage = mReceivingData.get(socketChannel);
} else {
tMessage = DataServerMessage.createBlockRequestMessage();
mReceivingData.put(socketChannel, tMessage);
}

// Attempt to read off the channel
int numRead;
try {
numRead = tMessage.recv(socketChannel);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel the selection key and close the channel.
key.cancel();
socketChannel.close();
mReceivingData.remove(socketChannel);
mSendingData.remove(socketChannel);
return;
}

if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the same from our end and cancel the
// channel.
key.channel().close();
key.cancel();
mReceivingData.remove(socketChannel);
mSendingData.remove(socketChannel);
return;
}

if (tMessage.isMessageReady()) {
if (tMessage.getBlockId() <= 0) {
LOG.error("Invalid block id " + tMessage.getBlockId());
return;
}

key.interestOps(SelectionKey.OP_WRITE);
LOG.info("Get request for " + tMessage.getBlockId());
int lockId = mBlocksLocker.lock(tMessage.getBlockId());
DataServerMessage tResponseMessage =
DataServerMessage.createBlockResponseMessage(true, tMessage.getBlockId(),
tMessage.getOffset(), tMessage.getLength());
tResponseMessage.setLockId(lockId);
mSendingData.put(socketChannel, tResponseMessage);
}
}

@Override
public void run() {
while (!mShutdown) {
try {
// Wait for an event one of the registered channels.
mSelector.select();
if (mShutdown) {
break;
}

// Iterate over the set of keys for which events are available
Iterator<SelectionKey> selectKeys = mSelector.selectedKeys().iterator();
while (selectKeys.hasNext()) {
SelectionKey key = selectKeys.next();
selectKeys.remove();

if (!key.isValid()) {
continue;
}

// Check what event is available and deal with it.
// TODO These should be multi-thread.
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
if (mShutdown) {
break;
}
throw new RuntimeException(e);
}
}
mShutdowned = true;
}

private void write(SelectionKey key) {
SocketChannel socketChannel = (SocketChannel) key.channel();

DataServerMessage sendMessage = mSendingData.get(socketChannel);

boolean closeChannel = false;
try {
sendMessage.send(socketChannel);
} catch (IOException e) {
closeChannel = true;
LOG.error(e.getMessage());
}
public interface DataServer extends Closeable {
int getPort();

if (sendMessage.finishSending() || closeChannel) {
try {
key.channel().close();
} catch (IOException e) {
LOG.error(e.getMessage());
}
key.cancel();
mReceivingData.remove(socketChannel);
mSendingData.remove(socketChannel);
sendMessage.close();
mBlocksLocker.unlock(Math.abs(sendMessage.getBlockId()), sendMessage.getLockId());
}
}
}
boolean isClosed();
}
Loading