Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
Expand Down Expand Up @@ -232,7 +233,6 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
.build();
}
acquireClient();
List<ChunkInfo> chunks;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
Expand All @@ -249,14 +249,32 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
blkIDBuilder.setReplicaIndex(replicaIndex);
}
GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, blkIDBuilder.build(), token);
.getBlock(xceiverClient, VALIDATORS, blkIDBuilder.build(), token);

chunks = response.getBlockData().getChunksList();
return response.getBlockData().getChunksList();
} finally {
releaseClient();
}
}

private static final List<CheckedBiFunction> VALIDATORS
= ContainerProtocolCalls.toValidatorList(
(request, response) -> validate(response));

return chunks;
static void validate(ContainerCommandResponseProto response)
throws IOException {
if (!response.hasGetBlock()) {
throw new IllegalArgumentException("Not GetBlock: response=" + response);
}
final GetBlockResponseProto b = response.getGetBlock();
final List<ChunkInfo> chunks = b.getBlockData().getChunksList();
for (int i = 0; i < chunks.size(); i++) {
final ChunkInfo c = chunks.get(i);
if (c.getLen() <= 0) {
throw new IOException("Failed to get chunkInfo["
+ i + "]: len == " + c.getLen());
}
}
}

protected void acquireClient() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,7 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
ReadChunkResponseProto readChunkResponse;

List<CheckedBiFunction> validators =
ContainerProtocolCalls.getValidatorList();
validators.add(validator);
ContainerProtocolCalls.toValidatorList(validator);

readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -67,6 +69,7 @@

import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,36 +131,78 @@ public static ListBlockResponseProto listBlock(XceiverClientSpi xceiverClient,
return response.getListBlock();
}

static <T> T tryEachDatanode(Pipeline pipeline,
CheckedFunction<DatanodeDetails, T, IOException> op,
Function<DatanodeDetails, String> toErrorMessage)
throws IOException {
final Set<DatanodeDetails> excluded = new HashSet<>();
for (; ;) {
final DatanodeDetails d = pipeline.getClosestNode(excluded);

try {
return op.apply(d);
} catch (IOException e) {
excluded.add(d);
if (excluded.size() < pipeline.size()) {
LOG.warn(toErrorMessage.apply(d)
+ "; will try another datanode.", e);
} else {
throw e;
}
}
}
}

/**
* Calls the container protocol to get a container block.
*
* @param xceiverClient client to perform call
* @param validators functions to validate the response
* @param datanodeBlockID blockID to identify container
* @param token a token for this block (may be null)
* @return container protocol get block response
* @throws IOException if there is an I/O error while performing the call
*/
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
List<CheckedBiFunction> validators,
DatanodeBlockID datanodeBlockID,
Token<? extends TokenIdentifier> token) throws IOException {
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder()
.setBlockID(datanodeBlockID);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();

ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetBlock)
.setContainerID(datanodeBlockID.getContainerID())
.setDatanodeUuid(id)
.setGetBlock(readBlockRequest);
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}

ContainerCommandRequestProto request = builder.build();
return tryEachDatanode(xceiverClient.getPipeline(),
d -> getBlock(xceiverClient, validators, builder, d),
d -> toErrorMessage(datanodeBlockID, d));
}

static String toErrorMessage(DatanodeBlockID blockId, DatanodeDetails d) {
return String.format("Failed to get block #%s in container #%s from %s",
blockId.getLocalID(), blockId.getContainerID(), d);
}

public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID,
Token<? extends TokenIdentifier> token) throws IOException {
return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token);
}

private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
List<CheckedBiFunction> validators,
ContainerCommandRequestProto.Builder builder,
DatanodeDetails datanode) throws IOException {
final ContainerCommandRequestProto request = builder
.setDatanodeUuid(datanode.getUuidString()).build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
xceiverClient.sendCommand(request, validators);
return response.getGetBlock();
}

Expand Down Expand Up @@ -285,40 +330,27 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
final Pipeline pipeline = xceiverClient.getPipeline();
final Set<DatanodeDetails> excluded = new HashSet<>();
for (; ;) {
final DatanodeDetails d = pipeline.getClosestNode(excluded);

try {
return readChunk(xceiverClient, chunk, blockID,
validators, token, readChunkRequest, d);
} catch (IOException e) {
excluded.add(d);
if (excluded.size() < pipeline.size()) {
LOG.warn(toErrorMessage(chunk, blockID, d), e);
} else {
throw e;
}
}
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID())
.setReadChunk(readChunkRequest);
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}

return tryEachDatanode(xceiverClient.getPipeline(),
d -> readChunk(xceiverClient, chunk, blockID,
validators, builder, d),
d -> toErrorMessage(chunk, blockID, d));
}

private static ContainerProtos.ReadChunkResponseProto readChunk(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
List<CheckedBiFunction> validators,
Token<? extends TokenIdentifier> token,
ReadChunkRequestProto.Builder readChunkRequest,
ContainerCommandRequestProto.Builder builder,
DatanodeDetails d) throws IOException {
final String id = d.getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id).setReadChunk(readChunkRequest);
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
ContainerCommandRequestProto request = builder.build();
final ContainerCommandRequestProto request = builder
.setDatanodeUuid(d.getUuidString()).build();
ContainerCommandResponseProto reply =
xceiverClient.sendCommand(request, validators);
final ReadChunkResponseProto response = reply.getReadChunk();
Expand Down Expand Up @@ -687,12 +719,28 @@ public static void validateContainerResponse(
}

public static List<CheckedBiFunction> getValidatorList() {
List<CheckedBiFunction> validators = new ArrayList<>(1);
return VALIDATORS;
}

private static final List<CheckedBiFunction> VALIDATORS = createValidators();

private static List<CheckedBiFunction> createValidators() {
CheckedBiFunction<ContainerProtos.ContainerCommandRequestProto,
ContainerProtos.ContainerCommandResponseProto, IOException>
validator = (request, response) -> validateContainerResponse(response);
return Collections.singletonList(validator);
}

public static List<CheckedBiFunction> toValidatorList(
CheckedBiFunction<ContainerCommandRequestProto,
ContainerCommandResponseProto, IOException> validator) {
final List<CheckedBiFunction> defaults
= ContainerProtocolCalls.getValidatorList();
final List<CheckedBiFunction> validators
= new ArrayList<>(defaults.size() + 1);
validators.addAll(defaults);
validators.add(validator);
return validators;
return Collections.unmodifiableList(validators);
}

public static HashMap<DatanodeDetails, GetBlockResponseProto>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public long getFlushCheckpoint() {
}

private ECKeyOutputStream(Builder builder) {
super(builder.getClientMetrics());
super(builder.getReplicationConfig(), builder.getClientMetrics());
this.config = builder.getClientConfig();
this.bufferPool = builder.getByteBufferPool();
// For EC, cell/chunk size and buffer size can be same for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
Expand Down Expand Up @@ -66,6 +67,7 @@
public class KeyOutputStream extends OutputStream implements Syncable {

private OzoneClientConfig config;
private final ReplicationConfig replication;

/**
* Defines stream action while calling handleFlushOrClose.
Expand Down Expand Up @@ -93,7 +95,9 @@ enum StreamAction {

private OzoneManagerProtocol omClient;

public KeyOutputStream(ContainerClientMetrics clientMetrics) {
public KeyOutputStream(ReplicationConfig replicationConfig,
ContainerClientMetrics clientMetrics) {
this.replication = replicationConfig;
closed = false;
this.retryPolicyMap = HddsClientUtils.getExceptionList()
.stream()
Expand Down Expand Up @@ -141,6 +145,7 @@ public KeyOutputStream(
ContainerClientMetrics clientMetrics
) {
this.config = config;
this.replication = replicationConfig;
blockOutputStreamEntryPool =
new BlockOutputStreamEntryPool(
config,
Expand Down Expand Up @@ -451,6 +456,14 @@ public void hflush() throws IOException {

@Override
public void hsync() throws IOException {
if (replication.getReplicationType() != ReplicationType.RATIS) {
throw new UnsupportedOperationException(
"Replication type is not " + ReplicationType.RATIS);
}
if (replication.getRequiredNodes() <= 1) {
throw new UnsupportedOperationException("The replication factor = "
+ replication.getRequiredNodes() + " <= 1");
}
checkNotClosed();
final long hsyncPos = writeOffset;
handleFlushOrClose(StreamAction.HSYNC);
Expand Down
Loading