From 0d29ca1b8be855286685698dbb0213582cc7ea37 Mon Sep 17 00:00:00 2001 From: Alexandr Juncevich Date: Fri, 2 Aug 2024 15:26:44 +0300 Subject: [PATCH] HDDS-11261. Fix old block info cache when ec reconstruction failed. --- .../client/io/ECBlockInputStreamProxy.java | 47 ++++++++++++++++--- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java index 68a0337cef1d..0ecf057ad7c1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java @@ -52,11 +52,12 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream { private final ECReplicationConfig repConfig; private final XceiverClientFactory xceiverClientFactory; private final Function refreshFunction; - private final BlockLocationInfo blockInfo; + private BlockLocationInfo blockInfo; private final ECBlockInputStreamFactory ecBlockInputStreamFactory; private BlockExtendedInputStream blockReader; private boolean reconstructionReader = false; + private boolean refreshedCache = false; private List failedLocations = new ArrayList<>(); private boolean closed = false; private OzoneClientConfig config; @@ -130,6 +131,17 @@ private void createBlockReader() { xceiverClientFactory, refreshFunction, config); } + private void reCreateBlockReaderWithFreshBlockInfo() { + XceiverClientManager.getXceiverClientMetrics() + .incECReconstructionTotal(); + blockInfo = refreshFunction.apply(getBlockID()); + refreshedCache = true; + + blockReader = ecBlockInputStreamFactory.create(reconstructionReader, + failedLocations, repConfig, blockInfo, + xceiverClientFactory, refreshFunction, config); + } + @Override public synchronized BlockID getBlockID() { return blockInfo.getBlockID(); @@ -167,12 +179,21 @@ public synchronized int read(ByteBuffer buf) throws IOException { } } catch (IOException e) { if (reconstructionReader) { - // If we get an error from the reconstruction reader, there - // is nothing left to try. It will re-try until it has insufficient - // locations internally, so if an error comes here, just re-throw it. - XceiverClientManager.getXceiverClientMetrics() - .incECReconstructionFailsTotal(); - throw e; + if (!refreshedCache && e instanceof InsufficientLocationsException) { + // When get key with insufficient nodes amount OM cache filled insufficient amount of nodes. + // When nodes amount become enough OM have old cache. + // In that case every request for this key will be finished with "insufficient nodes error" because of + // old cache. + // Need to refresh cache to get actual information about nodes. + return reReadWithRecreateBlockReader(buf, lastPosition, totalRead); + } else { + // If we get an error from the reconstruction reader with actual cache, there + // is nothing left to try. It will re-try until it has insufficient + // locations internally, so if an error comes here, just re-throw it. + XceiverClientManager.getXceiverClientMetrics() + .incECReconstructionFailsTotal(); + throw e; + } } if (e instanceof BadDataLocationException) { String message = "Failing over to reconstruction read due" + @@ -195,6 +216,18 @@ public synchronized int read(ByteBuffer buf) throws IOException { return totalRead; } + private int reReadWithRecreateBlockReader(ByteBuffer buf, long lastPosition, int totalRead) throws IOException { + blockReader.close(); + reCreateBlockReaderWithFreshBlockInfo(); + + if (lastPosition != 0) { + blockReader.seek(lastPosition); + } + buf.reset(); + totalRead += read(buf); + return totalRead; + } + private synchronized void failoverToReconstructionRead( List badLocations, long lastPosition) throws IOException {