Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -497,4 +497,14 @@ public String toString() {
public DomainSocketWatcher getDomainSocketWatcher() {
return domainSocketWatcher;
}

@VisibleForTesting
public int getShmNum() {
int segments = 0;
for (EndpointShmManager endpointShmManager : datanodes.values()) {
segments +=
endpointShmManager.notFull.size() + endpointShmManager.full.size();
}
return segments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketException;
import java.nio.MappedByteBuffer;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -181,25 +182,52 @@ private class SlotReleaser implements Runnable {

@Override
public void run() {
if (slot == null) {
return;
}
LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
final DfsClientShm shm = (DfsClientShm)slot.getShm();
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
final String path = shmSock.getPath();
DataOutputStream out = null;
boolean success = false;
try (DomainSocket sock = DomainSocket.connect(path);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(sock.getOutputStream()))) {
new Sender(out).releaseShortCircuitFds(slot.getSlotId());
DataInputStream in = new DataInputStream(sock.getInputStream());
ReleaseShortCircuitAccessResponseProto resp =
ReleaseShortCircuitAccessResponseProto.parseFrom(
PBHelperClient.vintPrefixed(in));
if (resp.getStatus() != Status.SUCCESS) {
String error = resp.hasError() ? resp.getError() : "(unknown)";
throw new IOException(resp.getStatus().toString() + ": " + error);
int retries = 2;
try {
while (retries > 0) {
try {
if (domainSocket == null || !domainSocket.isOpen()) {
// we are running in single thread mode, no protection needed for
// domainSocket
domainSocket = DomainSocket.connect(path);
}

out = new DataOutputStream(
new BufferedOutputStream(domainSocket.getOutputStream()));
new Sender(out).releaseShortCircuitFds(slot.getSlotId());
DataInputStream in =
new DataInputStream(domainSocket.getInputStream());
ReleaseShortCircuitAccessResponseProto resp =
ReleaseShortCircuitAccessResponseProto
.parseFrom(PBHelperClient.vintPrefixed(in));
if (resp.getStatus() != Status.SUCCESS) {
String error = resp.hasError() ? resp.getError() : "(unknown)";
throw new IOException(resp.getStatus().toString() + ": " + error);
}

LOG.trace("{}: released {}", this, slot);
success = true;
break;

} catch (SocketException se) {
// the domain socket on datanode may be timed out, we retry once
retries--;
domainSocket.close();
domainSocket = null;
if (retries == 0) {
throw new SocketException("Create domain socket failed");
}
}
}
LOG.trace("{}: released {}", this, slot);
success = true;
} catch (IOException e) {
LOG.warn(ShortCircuitCache.this + ": failed to release "
+ "short-circuit shared memory slot " + slot + " by sending "
Expand All @@ -211,6 +239,8 @@ public void run() {
shmManager.freeSlot(slot);
} else {
shm.getEndpointShmManager().shutdown(shm);
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
domainSocket = null;
}
}
}
Expand Down Expand Up @@ -324,6 +354,8 @@ public interface ShortCircuitReplicaCreator {
*/
private final DfsClientShmManager shmManager;

private DomainSocket domainSocket = null;

public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
return new ShortCircuitCache(
conf.getShortCircuitStreamsCacheSize(),
Expand Down Expand Up @@ -997,6 +1029,9 @@ public void freeSlot(Slot slot) {
* @param slot The slot to release.
*/
public void scheduleSlotReleaser(Slot slot) {
if (slot == null) {
return;
}
Preconditions.checkState(shmManager != null);
releaserExecutor.execute(new SlotReleaser(slot));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,9 @@ boolean accept(HashMap<ShmId, RegisteredShm> segments,
public synchronized boolean visit(Visitor visitor) {
return visitor.accept(segments, slots);
}

@VisibleForTesting
public int getShmNum() {
return segments.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.equalTo;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -910,4 +912,94 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception {
}
}
}

@Test(timeout = 60000)
public void testDomainSocketClosedByDN() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf =
createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
DomainPeer peer = getDomainPeerToDn(conf);
MutableBoolean usedPeer = new MutableBoolean(false);
ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
// Allocating the first shm slot requires using up a peer.
Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");

cluster.getDataNodes().get(0).getShortCircuitRegistry()
.registerSlot(blockId, slot1.getSlotId(), false);

Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");

cluster.getDataNodes().get(0).getShortCircuitRegistry()
.registerSlot(blockId, slot2.getSlotId(), false);

cache.scheduleSlotReleaser(slot1);

Thread.sleep(2000);
cache.scheduleSlotReleaser(slot2);
Thread.sleep(2000);
Assert.assertEquals(0,
cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
} finally {
cluster.shutdown();
}
}

@Test(timeout = 60000)
public void testDNRestart() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
DomainPeer peer = getDomainPeerToDn(conf);
MutableBoolean usedPeer = new MutableBoolean(false);
ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
// Allocating the first shm slot requires using up a peer.
Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");

cluster.getDataNodes().get(0).getShortCircuitRegistry()
.registerSlot(blockId, slot1.getSlotId(), false);

// restart the datanode to invalidate the cache
cluster.restartDataNode(0);
Thread.sleep(1000);
// after the restart, new allocation and release should not be affect
cache.scheduleSlotReleaser(slot1);

Slot slot2 = null;
try {
slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");
} catch (ClosedChannelException ce) {

}
cache.scheduleSlotReleaser(slot2);
Thread.sleep(2000);
Assert.assertEquals(0,
cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
} finally {
cluster.shutdown();
}
}
}