Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ void setLeaderId(UUID leaderId) {
this.leaderId = leaderId;
}

/** @return the number of datanodes in this pipeline. */
public int size() {
return nodeStatus.size();
}

/**
* Returns the list of nodes which form this pipeline.
*
Expand Down Expand Up @@ -216,18 +221,46 @@ public DatanodeDetails getLeaderNode() throws IOException {
}

public DatanodeDetails getFirstNode() throws IOException {
return getFirstNode(null);
}

public DatanodeDetails getFirstNode(Set<DatanodeDetails> excluded)
throws IOException {
if (excluded == null) {
excluded = Collections.emptySet();
}
if (nodeStatus.isEmpty()) {
throw new IOException(String.format("Pipeline=%s is empty", id));
}
return nodeStatus.keySet().iterator().next();
for (DatanodeDetails d : nodeStatus.keySet()) {
if (!excluded.contains(d)) {
return d;
}
}
throw new IOException(String.format(
"All nodes are excluded: Pipeline=%s, excluded=%s", id, excluded));
}

public DatanodeDetails getClosestNode() throws IOException {
return getClosestNode(null);
}

public DatanodeDetails getClosestNode(Set<DatanodeDetails> excluded)
throws IOException {
if (excluded == null) {
excluded = Collections.emptySet();
}
if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) {
LOG.debug("Nodes in order is empty, delegate to getFirstNode");
return getFirstNode();
return getFirstNode(excluded);
}
for (DatanodeDetails d : nodesInOrder.get()) {
if (!excluded.contains(d)) {
return d;
}
}
return nodesInOrder.get().get(0);
throw new IOException(String.format(
"All nodes are excluded: Pipeline=%s, excluded=%s", id, excluded));
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.hadoop.hdds.annotation.InterfaceStability;
Expand All @@ -47,6 +49,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
Expand All @@ -64,12 +67,16 @@

import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of all container protocol calls performed by Container
* clients.
*/
public final class ContainerProtocolCalls {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerProtocolCalls.class);

/**
* There is no need to instantiate this class.
Expand Down Expand Up @@ -278,7 +285,32 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
final Pipeline pipeline = xceiverClient.getPipeline();
final Set<DatanodeDetails> excluded = new HashSet<>();
for (; ;) {
final DatanodeDetails d = pipeline.getClosestNode(excluded);

try {
return readChunk(xceiverClient, chunk, blockID,
validators, token, readChunkRequest, d);
} catch (IOException e) {
excluded.add(d);
if (excluded.size() < pipeline.size()) {
LOG.warn(toErrorMessage(chunk, blockID, d), e);
} else {
throw e;
}
}
}
}

private static ContainerProtos.ReadChunkResponseProto readChunk(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
List<CheckedBiFunction> validators,
Token<? extends TokenIdentifier> token,
ReadChunkRequestProto.Builder readChunkRequest,
DatanodeDetails d) throws IOException {
final String id = d.getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID())
Expand All @@ -289,7 +321,30 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto reply =
xceiverClient.sendCommand(request, validators);
return reply.getReadChunk();
final ReadChunkResponseProto response = reply.getReadChunk();
final long readLen = getLen(response);
if (readLen != chunk.getLen()) {
throw new IOException(toErrorMessage(chunk, blockID, d)
+ ": readLen=" + readLen);
}
return response;
}

static String toErrorMessage(ChunkInfo chunk, BlockID blockId,
DatanodeDetails d) {
return String.format("Failed to read chunk %s (len=%s) %s from %s",
chunk.getChunkName(), chunk.getLen(), blockId, d);
}

static long getLen(ReadChunkResponseProto response) {
if (response.hasData()) {
return response.getData().size();
} else if (response.hasDataBuffers()) {
return response.getDataBuffers() .getBuffersList().stream()
.mapToLong(ByteString::size).sum();
} else {
return -1;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,11 @@ public void hflush() throws IOException {
@Override
public void hsync() throws IOException {
checkNotClosed();
final long hsyncPos = writeOffset;
handleFlushOrClose(StreamAction.HSYNC);
blockOutputStreamEntryPool.hsyncKey(offset);
Preconditions.checkState(offset >= hsyncPos,
"offset = %s < hsyncPos = %s", offset, hsyncPos);
blockOutputStreamEntryPool.hsyncKey(hsyncPos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
Expand All @@ -63,6 +65,8 @@
*/
@Timeout(value = 300)
public class TestHSync {
private static final Logger LOG =
LoggerFactory.getLogger(TestHSync.class);

private static MiniOzoneCluster cluster;
private static OzoneBucket bucket;
Expand Down Expand Up @@ -112,10 +116,11 @@ public void testO3fsHSync() throws Exception {
OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

final Path file = new Path("/file");

try (FileSystem fs = FileSystem.get(CONF)) {
runTestHSync(fs, file);
for (int i = 0; i < 10; i++) {
final Path file = new Path("/file" + i);
runTestHSync(fs, file, 1 << i);
}
}
}

Expand All @@ -129,17 +134,20 @@ public void testOfsHSync() throws Exception {

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final Path file = new Path(dir, "file");

try (FileSystem fs = FileSystem.get(CONF)) {
runTestHSync(fs, file);
for (int i = 0; i < 10; i++) {
final Path file = new Path(dir, "file" + i);
runTestHSync(fs, file, 1 << i);
}
}
}

static void runTestHSync(FileSystem fs, Path file) throws Exception {
static void runTestHSync(FileSystem fs, Path file, int initialDataSize)
throws Exception {
try (StreamWithLength out = new StreamWithLength(
fs.create(file, true))) {
runTestHSync(fs, file, out, 1);
runTestHSync(fs, file, out, initialDataSize);
for (int i = 1; i < 5; i++) {
for (int j = -1; j <= 1; j++) {
int dataSize = (1 << (i * 5)) + j;
Expand Down Expand Up @@ -177,6 +185,8 @@ static void runTestHSync(FileSystem fs, Path file,
StreamWithLength out, int dataSize)
throws Exception {
final long length = out.getLength();
LOG.info("runTestHSync {} with size {}, skipLength={}",
file, dataSize, length);
final byte[] data = new byte[dataSize];
ThreadLocalRandom.current().nextBytes(data);
out.writeAndHsync(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ private void invokeXceiverClientReadChunk(XceiverClientSpi client)
.setBytesPerChecksum(512)
.setType(ContainerProtos.ChecksumType.CRC32)
.build())
.setLen(100)
.setOffset(100)
.setLen(-1)
.setOffset(0)
.build(),
bid,
null, null);
Expand Down