Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
private final ECReplicationConfig repConfig;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
private BlockLocationInfo blockInfo;
private final ECBlockInputStreamFactory ecBlockInputStreamFactory;

private BlockExtendedInputStream blockReader;
private boolean reconstructionReader = false;
private boolean refreshedCache = false;
private List<DatanodeDetails> failedLocations = new ArrayList<>();
private boolean closed = false;
private OzoneClientConfig config;
Expand Down Expand Up @@ -130,6 +131,17 @@ private void createBlockReader() {
xceiverClientFactory, refreshFunction, config);
}

private void reCreateBlockReaderWithFreshBlockInfo() {
XceiverClientManager.getXceiverClientMetrics()
.incECReconstructionTotal();
blockInfo = refreshFunction.apply(getBlockID());
refreshedCache = true;

blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
failedLocations, repConfig, blockInfo,
xceiverClientFactory, refreshFunction, config);
}

@Override
public synchronized BlockID getBlockID() {
return blockInfo.getBlockID();
Expand Down Expand Up @@ -167,12 +179,21 @@ public synchronized int read(ByteBuffer buf) throws IOException {
}
} catch (IOException e) {
if (reconstructionReader) {
// If we get an error from the reconstruction reader, there
// is nothing left to try. It will re-try until it has insufficient
// locations internally, so if an error comes here, just re-throw it.
XceiverClientManager.getXceiverClientMetrics()
.incECReconstructionFailsTotal();
throw e;
if (!refreshedCache && e instanceof InsufficientLocationsException) {
// When get key with insufficient nodes amount OM cache filled insufficient amount of nodes.
// When nodes amount become enough OM have old cache.
// In that case every request for this key will be finished with "insufficient nodes error" because of
// old cache.
// Need to refresh cache to get actual information about nodes.
return reReadWithRecreateBlockReader(buf, lastPosition, totalRead);
} else {
// If we get an error from the reconstruction reader with actual cache, there
// is nothing left to try. It will re-try until it has insufficient
// locations internally, so if an error comes here, just re-throw it.
XceiverClientManager.getXceiverClientMetrics()
.incECReconstructionFailsTotal();
throw e;
}
}
if (e instanceof BadDataLocationException) {
String message = "Failing over to reconstruction read due" +
Expand All @@ -195,6 +216,18 @@ public synchronized int read(ByteBuffer buf) throws IOException {
return totalRead;
}

private int reReadWithRecreateBlockReader(ByteBuffer buf, long lastPosition, int totalRead) throws IOException {
blockReader.close();
reCreateBlockReaderWithFreshBlockInfo();

if (lastPosition != 0) {
blockReader.seek(lastPosition);
}
buf.reset();
totalRead += read(buf);
return totalRead;
}

private synchronized void failoverToReconstructionRead(
List<DatanodeDetails> badLocations, long lastPosition)
throws IOException {
Expand Down