diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/SlidingWindow.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/SlidingWindow.java new file mode 100644 index 000000000000..c12b1b70216a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/SlidingWindow.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.utils; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A time-based sliding window implementation that tracks only failed test results within a specified time duration. + * It determines failure based on a configured tolerance threshold. + * + * The queue saves one failure more than the configured tolerance threshold, + * so that the window can be considered failed. + */ +public class SlidingWindow { + private static final Logger LOG = LoggerFactory.getLogger(SlidingWindow.class); + + private final long windowDuration; + private final TimeUnit timeUnit; + private final int failureTolerance; + private final Deque failureTimestamps; + + /** + * @param failureTolerance the number of failures that can be tolerated before the window is considered failed + * @param windowDuration the duration of the sliding window + * @param timeUnit the time unit of the window duration + */ + public SlidingWindow(int failureTolerance, long windowDuration, TimeUnit timeUnit) { + this.windowDuration = windowDuration; + this.timeUnit = timeUnit; + this.failureTolerance = failureTolerance; + // If the failure tolerance is high, we limit the queue size to 100 as we want to control the memory usage + this.failureTimestamps = new ArrayDeque<>(Math.min(failureTolerance + 1, 100)); + } + + public synchronized void add(boolean result) { + LOG.debug("Received test result: {}", result); + if (!result) { + if (failureTolerance > 0 && failureTimestamps.size() > failureTolerance) { + failureTimestamps.remove(); + } + long currentTime = System.currentTimeMillis(); + failureTimestamps.addLast(currentTime); + } + + removeExpiredFailures(); + } + + public synchronized boolean isFailed() { + removeExpiredFailures(); + LOG.debug("Is failed: {} {}", failureTimestamps.size() > failureTolerance, failureTimestamps); + return failureTimestamps.size() > failureTolerance; + } + + private void removeExpiredFailures() { + long currentTime = System.currentTimeMillis(); + long expirationThreshold = currentTime - timeUnit.toMillis(windowDuration); + + while (!failureTimestamps.isEmpty() && failureTimestamps.peek() < expirationThreshold) { + LOG.debug("Removing expired failure timestamp: {}", failureTimestamps.peek()); + failureTimestamps.remove(); + } + + LOG.debug("Current failure count: {}", failureTimestamps.size()); + } + + public int getFailureTolerance() { + return failureTolerance; + } + + public long getWindowDuration() { + return windowDuration; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + public int getFailureCount() { + return failureTimestamps.size(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index cc9be3892bed..62c741de646c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -25,20 +25,15 @@ import jakarta.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; -import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; -import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache; @@ -102,11 +97,6 @@ public class HddsVolume extends StorageVolume { private AtomicBoolean dbLoaded = new AtomicBoolean(false); private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false); - private final int volumeTestCount; - private final int volumeTestFailureTolerance; - private AtomicInteger volumeTestFailureCount; - private Queue volumeTestResultQueue; - /** * Builder for HddsVolume. */ @@ -139,11 +129,6 @@ private HddsVolume(Builder b) throws IOException { this.volumeInfoMetrics = new VolumeInfoMetrics(b.getVolumeRootStr(), this); - this.volumeTestCount = getDatanodeConfig().getVolumeIOTestCount(); - this.volumeTestFailureTolerance = getDatanodeConfig().getVolumeIOFailureTolerance(); - this.volumeTestFailureCount = new AtomicInteger(0); - this.volumeTestResultQueue = new LinkedList<>(); - initialize(); } else { // Builder is called with failedVolume set, so create a failed volume @@ -151,8 +136,6 @@ private HddsVolume(Builder b) throws IOException { this.setState(VolumeState.FAILED); volumeIOStats = null; volumeInfoMetrics = new VolumeInfoMetrics(b.getVolumeRootStr(), this); - this.volumeTestCount = 0; - this.volumeTestFailureTolerance = 0; } LOG.info("HddsVolume: {}", getReport()); @@ -309,43 +292,6 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) return checkDbHealth(dbFile); } - @VisibleForTesting - public VolumeCheckResult checkDbHealth(File dbFile) throws InterruptedException { - if (volumeTestCount == 0) { - return VolumeCheckResult.HEALTHY; - } - - final boolean isVolumeTestResultHealthy = true; - try (ManagedOptions managedOptions = new ManagedOptions(); - ManagedRocksDB readOnlyDb = ManagedRocksDB.openReadOnly(managedOptions, dbFile.toString())) { - volumeTestResultQueue.add(isVolumeTestResultHealthy); - } catch (Exception e) { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("Check of database for volume " + this + " interrupted."); - } - LOG.warn("Could not open Volume DB located at {}", dbFile, e); - volumeTestResultQueue.add(!isVolumeTestResultHealthy); - volumeTestFailureCount.incrementAndGet(); - } - - if (volumeTestResultQueue.size() > volumeTestCount - && (Boolean.TRUE.equals(volumeTestResultQueue.poll()) != isVolumeTestResultHealthy)) { - volumeTestFailureCount.decrementAndGet(); - } - - if (volumeTestFailureCount.get() > volumeTestFailureTolerance) { - LOG.error("Failed to open the database at \"{}\" for HDDS volume {}: " + - "the last {} runs encountered {} out of {} tolerated failures.", - dbFile, this, volumeTestResultQueue.size(), volumeTestFailureCount.get(), volumeTestFailureTolerance); - return VolumeCheckResult.FAILED; - } - - LOG.debug("Successfully opened the database at \"{}\" for HDDS volume {}: " + - "the last {} runs encountered {} out of {} tolerated failures", - dbFile, this, volumeTestResultQueue.size(), volumeTestFailureTolerance, volumeTestFailureTolerance); - return VolumeCheckResult.HEALTHY; - } - /** * add "delta" bytes to committed space in the volume. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java index a16980842ca6..61620d0e2215 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java @@ -17,7 +17,15 @@ package org.apache.hadoop.ozone.container.common.volume; +import static org.apache.hadoop.ozone.OzoneConsts.WITNESSED_CONTAINER_DB_NAME; + +import jakarta.annotation.Nullable; +import java.io.File; import java.io.IOException; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MetadataVolume represents a volume in datanode for metadata(ratis). @@ -25,7 +33,7 @@ * and volume info for it. */ public class MetadataVolume extends StorageVolume { - + private static final Logger LOG = LoggerFactory.getLogger(MetadataVolume.class); private final VolumeType type = VolumeType.META_VOLUME; protected MetadataVolume(Builder b) throws IOException { @@ -75,4 +83,23 @@ public MetadataVolume build() throws IOException { public String getStorageID() { return ""; } + + @Override + public synchronized VolumeCheckResult check(@Nullable Boolean unused) throws Exception { + VolumeCheckResult result = super.check(unused); + + if (result != VolumeCheckResult.HEALTHY) { + LOG.error("Volume failed health check."); + return result; + } + + // Check that per-volume RocksDB is present. + File dbFile = new File(ServerUtils.getOzoneMetaDirPath(getConf()), WITNESSED_CONTAINER_DB_NAME); + if (!dbFile.exists() || !dbFile.canRead()) { + LOG.warn("Volume {} failed health check. Could not access RocksDB at {}", getStorageDir(), dbFile); + return VolumeCheckResult.FAILED; + } + + return checkDbHealth(dbFile); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java index b75de69aea66..b86bb4944c92 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java @@ -26,19 +26,19 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.LinkedList; import java.util.Objects; import java.util.Optional; import java.util.Properties; -import java.util.Queue; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.SpaceUsageCheckParams; import org.apache.hadoop.hdds.fs.SpaceUsageSource; +import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; @@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.DiskCheckUtil; +import org.apache.hadoop.ozone.container.common.utils.SlidingWindow; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -107,9 +108,7 @@ public abstract class StorageVolume implements Checkable ioTestSlidingWindow; + private SlidingWindow ioTestSlidingWindow; private int healthCheckFileSize; /** @@ -158,9 +157,7 @@ protected StorageVolume(Builder b) throws IOException { this.conf = b.conf; this.dnConf = conf.getObject(DatanodeConfiguration.class); this.ioTestCount = dnConf.getVolumeIOTestCount(); - this.ioFailureTolerance = dnConf.getVolumeIOFailureTolerance(); - this.ioTestSlidingWindow = new LinkedList<>(); - this.currentIOFailureCount = new AtomicInteger(0); + this.ioTestSlidingWindow = new SlidingWindow(dnConf.getVolumeIOFailureTolerance(), 1, TimeUnit.HOURS); this.healthCheckFileSize = dnConf.getVolumeHealthCheckFileSize(); } else { storageDir = new File(b.volumeRootStr); @@ -169,7 +166,6 @@ protected StorageVolume(Builder b) throws IOException { this.storageID = UUID.randomUUID().toString(); this.state = VolumeState.FAILED; this.ioTestCount = 0; - this.ioFailureTolerance = 0; this.conf = null; this.dnConf = null; } @@ -517,6 +513,14 @@ public VolumeSet getVolumeSet() { return this.volumeSet; } + public int getIoTestCount() { + return ioTestCount; + } + + public SlidingWindow getIoTestSlidingWindow() { + return ioTestSlidingWindow; + } + public StorageType getStorageType() { return storageType; } @@ -614,7 +618,7 @@ private void cleanTmpDiskCheckDir() { * check consists of a directory check and an IO check. * * If the directory check fails, the volume check fails immediately. - * The IO check is allows to fail up to {@code ioFailureTolerance} times + * The IO check is allowed to fail up to {@code ioFailureTolerance} times * out of the last {@code ioTestCount} IO checks before this volume check is * failed. Each call to this method runs one IO check. * @@ -638,6 +642,7 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) throw new InterruptedException("Directory check of volume " + this + " interrupted."); } + LOG.error("Directory check of volume {} failed.", this); return VolumeCheckResult.FAILED; } @@ -674,35 +679,21 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) return VolumeCheckResult.HEALTHY; } - // Move the sliding window of IO test results forward 1 by adding the - // latest entry and removing the oldest entry from the window. - // Update the failure counter for the new window. ioTestSlidingWindow.add(diskChecksPassed); - if (!diskChecksPassed) { - currentIOFailureCount.incrementAndGet(); - } - if (ioTestSlidingWindow.size() > ioTestCount && - Objects.equals(ioTestSlidingWindow.poll(), Boolean.FALSE)) { - currentIOFailureCount.decrementAndGet(); - } - // If the failure threshold has been crossed, fail the volume without - // further scans. + // If the failure threshold has been crossed, fail the volume without further scans. // Once the volume is failed, it will not be checked anymore. // The failure counts can be left as is. - if (currentIOFailureCount.get() > ioFailureTolerance) { - LOG.error("Failed IO test for volume {}: the last {} runs " + - "encountered {} out of {} tolerated failures.", this, - ioTestSlidingWindow.size(), currentIOFailureCount, - ioFailureTolerance); + if (ioTestSlidingWindow.isFailed()) { + LOG.error("Failed IO test for volume {}: encountered {} out of {} tolerated failures in the past {} {}.", + this, ioTestSlidingWindow.getFailureCount(), ioTestSlidingWindow.getFailureTolerance(), + ioTestSlidingWindow.getWindowDuration(), ioTestSlidingWindow.getTimeUnit()); return VolumeCheckResult.FAILED; - } else if (LOG.isDebugEnabled()) { - LOG.debug("IO test results for volume {}: the last {} runs encountered " + - "{} out of {} tolerated failures", this, - ioTestSlidingWindow.size(), - currentIOFailureCount, ioFailureTolerance); } + LOG.debug("IO test results for volume {}: encountered {} out of {} tolerated failures", + this, ioTestSlidingWindow.getFailureCount(), ioTestSlidingWindow.getFailureTolerance()); + return VolumeCheckResult.HEALTHY; } @@ -740,4 +731,35 @@ private static SpaceUsageCheckParams getSpaceUsageCheckParams(Builder b) throws return usageCheckFactory.paramsFor(root); } + + @VisibleForTesting + public VolumeCheckResult checkDbHealth(File dbFile) throws InterruptedException { + if (getIoTestCount() == 0) { + return VolumeCheckResult.HEALTHY; + } + + final boolean isVolumeTestResultHealthy = true; + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedRocksDB readOnlyDb = ManagedRocksDB.openReadOnly(managedOptions, dbFile.toString())) { + getIoTestSlidingWindow().add(isVolumeTestResultHealthy); + } catch (Exception e) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Check of database for volume " + this + " interrupted."); + } + LOG.warn("Could not open Volume DB located at {}", dbFile, e); + getIoTestSlidingWindow().add(!isVolumeTestResultHealthy); + } + + if (getIoTestSlidingWindow().isFailed()) { + LOG.error("Failed to open the database at \"{}\" for HDDS volume {}: " + + "encountered {} out of {} tolerated failures.", + dbFile, this, getIoTestSlidingWindow().getFailureCount(), getIoTestSlidingWindow().getFailureTolerance()); + return VolumeCheckResult.FAILED; + } + + LOG.debug("Successfully opened the database at \"{}\" for HDDS volume {}: " + + "encountered {} out of {} tolerated failures", + dbFile, this, getIoTestSlidingWindow().getFailureCount(), getIoTestSlidingWindow().getFailureTolerance()); + return VolumeCheckResult.HEALTHY; + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestSlidingWindow.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestSlidingWindow.java new file mode 100644 index 000000000000..973fc3b3625a --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestSlidingWindow.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.utils; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Tests for {@link SlidingWindow} class. + */ +public class TestSlidingWindow { + + private SlidingWindow slidingWindow; + + @BeforeEach + public void setup() { + slidingWindow = new SlidingWindow(3, 5, TimeUnit.SECONDS); + } + + @Test + public void testAddSuccessfulResult() { + for (int i = 0; i < 10; i++) { + slidingWindow.add(true); + assertFalse(slidingWindow.isFailed()); + } + } + + @Test + public void testAddFailedResult() { + for (int i = 0; i < 3; i++) { + slidingWindow.add(false); + assertFalse(slidingWindow.isFailed()); + } + + // Adding one more failed result should mark as failed + slidingWindow.add(false); + assertTrue(slidingWindow.isFailed()); + } + + @Test + public void testMixedResults() { + slidingWindow.add(false); + slidingWindow.add(false); + slidingWindow.add(false); + assertFalse(slidingWindow.isFailed()); + + // Add successful result - should not affect failure count + slidingWindow.add(true); + assertFalse(slidingWindow.isFailed()); + + // Add one more failed result - should mark as failed + slidingWindow.add(false); + assertTrue(slidingWindow.isFailed()); + + // Add more successful results - should not affect failure status + slidingWindow.add(true); + slidingWindow.add(true); + assertTrue(slidingWindow.isFailed()); + } + + @Test + public void testFailureExpiration() throws InterruptedException { + slidingWindow = new SlidingWindow(2, 500, TimeUnit.MILLISECONDS); + + // Add failed results to reach failure threshold + slidingWindow.add(false); + slidingWindow.add(false); + slidingWindow.add(false); + assertTrue(slidingWindow.isFailed()); + + // Wait for failures to expire + Thread.sleep(600); + + assertFalse(slidingWindow.isFailed()); + + // Add one more failure - should not be enough to mark as failed + slidingWindow.add(false); + assertFalse(slidingWindow.isFailed()); + } + + @Test + public void testPartialExpiration() throws InterruptedException { + slidingWindow = new SlidingWindow(3, 1, TimeUnit.SECONDS); + + slidingWindow.add(false); + slidingWindow.add(false); + slidingWindow.add(false); + slidingWindow.add(false); + assertTrue(slidingWindow.isFailed()); + + Thread.sleep(600); + slidingWindow.add(false); // this will remove the oldest failure as the window is full + + // Wait for the oldest failures to expire + Thread.sleep(500); + assertFalse(slidingWindow.isFailed()); + } + + @Test + public void testZeroFailureTolerance() { + // Window with zero failure tolerance + SlidingWindow zeroToleranceWindow = new SlidingWindow(0, 5, TimeUnit.SECONDS); + + // Any failure should mark as failed + zeroToleranceWindow.add(false); + assertTrue(zeroToleranceWindow.isFailed()); + } + + @Test + public void testHighFailureTolerance() { + SlidingWindow highToleranceWindow = new SlidingWindow(10, 5, TimeUnit.SECONDS); + + // Add failures less than tolerance + for (int i = 0; i < 10; i++) { + highToleranceWindow.add(false); + assertFalse(highToleranceWindow.isFailed()); + } + + // Add one more to reach tolerance + highToleranceWindow.add(false); + assertTrue(highToleranceWindow.isFailed()); + } + + @Test + public void testFailureQueueManagement() { + SlidingWindow window = new SlidingWindow(3, 5, TimeUnit.SECONDS); + + // Add more failures than the tolerance + for (int i = 0; i < 10; i++) { + window.add(false); + } + + // Should be failed + assertTrue(window.isFailed()); + + // Add successful results - should not affect failure status + for (int i = 0; i < 5; i++) { + window.add(true); + } + + // Should still be failed + assertTrue(window.isFailed()); + } + + @Test + @Timeout(value = 10) + public void testConcurrentAccess() throws InterruptedException { + // Create a sliding window with tolerance of 5 + final SlidingWindow concurrentWindow = new SlidingWindow(5, 5, TimeUnit.SECONDS); + final int threadCount = 10; + final int operationsPerThread = 100; + final ExecutorService executor = Executors.newFixedThreadPool(threadCount); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch finishLatch = new CountDownLatch(threadCount); + final AtomicBoolean hasError = new AtomicBoolean(false); + + // Create and submit tasks + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + executor.submit(() -> { + try { + startLatch.await(); // Wait for all threads to be ready + for (int j = 0; j < operationsPerThread; j++) { + // Alternate between adding success and failure based on thread ID and iteration + boolean result = (threadId + j) % 2 == 0; + concurrentWindow.add(result); + // Check failure status occasionally + if (j % 10 == 0) { + concurrentWindow.isFailed(); + } + } + } catch (Exception e) { + hasError.set(true); + e.printStackTrace(); + } finally { + finishLatch.countDown(); + } + }); + } + + // Start all threads + startLatch.countDown(); + + // Wait for all threads to finish + finishLatch.await(); + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + + // Verify no exceptions occurred + assertFalse(hasError.get(), "Concurrent operations caused errors"); + } + + @Test + public void testEdgeCases() { + // Test with minimum values + SlidingWindow minWindow = new SlidingWindow(1, 1, TimeUnit.MILLISECONDS); + minWindow.add(false); + assertFalse(minWindow.isFailed()); + minWindow.add(false); + assertTrue(minWindow.isFailed()); + + // Test with large values + SlidingWindow maxWindow = new SlidingWindow(Integer.MAX_VALUE - 1, + Long.MAX_VALUE / 1000, TimeUnit.SECONDS); + for (int i = 0; i < 100; i++) { + maxWindow.add(false); + assertFalse(maxWindow.isFailed()); + } + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/package-info.java new file mode 100644 index 000000000000..170e20abca8f --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Tests for Common container utils. */ +package org.apache.hadoop.ozone.container.common.utils; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java index 25c6f05585a1..316debf4ae5b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java @@ -21,9 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.File; +import java.lang.reflect.Field; import java.nio.file.Path; import java.nio.file.Paths; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -31,6 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.DiskCheckUtil; +import org.apache.hadoop.ozone.container.common.utils.SlidingWindow; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; @@ -309,8 +312,16 @@ private void testCheckIOUntilFailure(StorageVolume.Builder builder, StorageVolume volume = builder.build(); volume.format(CLUSTER_ID); volume.createTmpDirs(CLUSTER_ID); + // Change timeunit using reflection + Field timeUnitField = SlidingWindow.class.getDeclaredField("timeUnit"); + timeUnitField.setAccessible(true); + timeUnitField.set(volume.getIoTestSlidingWindow(), TimeUnit.SECONDS); + Field timeWindowField = SlidingWindow.class.getDeclaredField("windowDuration"); + timeWindowField.setAccessible(true); + timeWindowField.set(volume.getIoTestSlidingWindow(), ioTestCount); for (int i = 0; i < checkResults.length; i++) { + Thread.sleep(1000); final boolean result = checkResults[i]; final DiskCheckUtil.DiskChecks ioResult = new DiskCheckUtil.DiskChecks() { @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java index f84328fa0e13..f4e972156567 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java @@ -160,6 +160,7 @@ public static void injectContainerMetaDirFailure(File... dirs) { for (File dir : dirs) { if (dir.exists()) { assertTrue(dir.setWritable(false, false)); + assertTrue(dir.setReadable(false, false)); } } } @@ -173,6 +174,7 @@ public static void restoreContainerMetaDirFromFailure(File... dirs) { for (File dir : dirs) { if (dir.exists()) { assertTrue(dir.setWritable(true, true)); + assertTrue(dir.setReadable(true, true)); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/package-info.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/package-info.java new file mode 100644 index 000000000000..9978ad16fb3e --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Tests for datanode. + */ +package org.apache.hadoop.ozone.dn; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java index 7c093abe8cc3..3340ec5705db 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java @@ -239,7 +239,7 @@ void corruptDbFile(boolean schemaV3) throws Exception { /** * {@link HddsVolume#check(Boolean)} will capture the failures injected by this test and not allow the - * test to reach the helper method {@link HddsVolume#checkDbHealth}. + * test to reach the helper method {@link StorageVolume#checkDbHealth}. * As a workaround, we test the helper method directly. * As we test the helper method directly, we cannot test for schemas older than V3. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeMetadataVolumeFailureDetection.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeMetadataVolumeFailureDetection.java new file mode 100644 index 000000000000..487103b9014b --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeMetadataVolumeFailureDetection.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.dn.volume; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION; +import static org.apache.hadoop.ozone.OzoneConsts.WITNESSED_CONTAINER_DB_NAME; + +import java.io.File; +import java.nio.file.Paths; +import java.time.Duration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.container.common.ContainerTestUtils; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.dn.DatanodeTestUtils; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * This class tests datanode can detect failed volumes. + */ +class TestDatanodeMetadataVolumeFailureDetection { + + // witnessed db was introduced in schema V3 so we won't test for older schemas + @ParameterizedTest + @ValueSource(booleans = {true}) + void corruptDbFile(boolean schemaV3) throws Exception { + try (MiniOzoneCluster cluster = newCluster(schemaV3)) { + try (OzoneClient client = cluster.newClient()) { + OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client); + + HddsDatanodeService dn = cluster.getHddsDatanodes().get(0); + OzoneContainer oc = dn.getDatanodeStateMachine().getContainer(); + + File dbDir = null; + if (schemaV3) { + dbDir = Paths.get(ServerUtils.getOzoneMetaDirPath(cluster.getConf()).getAbsolutePath(), + "datanode-1", "meta", WITNESSED_CONTAINER_DB_NAME).toFile(); + } + + MutableVolumeSet metaVolumeSet = oc.getMetaVolumeSet(); + StorageVolume vol0 = metaVolumeSet.getVolumesList().get(0); + + try { + // simulate a problem by removing the read permission of the db dir + DatanodeTestUtils.injectContainerMetaDirFailure(dbDir); + if (schemaV3) { + // remove rocksDB from cache + DatanodeStoreCache.getInstance().removeDB(dbDir.getAbsolutePath()); + } + + metaVolumeSet.checkVolumeAsync(vol0); + DatanodeTestUtils.waitForCheckVolume(metaVolumeSet, 1L); + DatanodeTestUtils.waitForHandleFailedVolume(metaVolumeSet, 1); + } finally { + // restore all + DatanodeTestUtils.restoreContainerMetaDirFromFailure(dbDir); + } + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true}) + void corruptDbFileWithoutDbHandleCacheInvalidation(boolean schemaV3) throws Exception { + try (MiniOzoneCluster cluster = newCluster(schemaV3)) { + try (OzoneClient client = cluster.newClient()) { + OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client); + + HddsDatanodeService dn = cluster.getHddsDatanodes().get(0); + OzoneContainer oc = dn.getDatanodeStateMachine().getContainer(); + + File dbDir = null; + if (schemaV3) { + dbDir = Paths.get(ServerUtils.getOzoneMetaDirPath(cluster.getConf()).getAbsolutePath(), + "datanode-1", "meta", WITNESSED_CONTAINER_DB_NAME).toFile(); + } + + MutableVolumeSet metaVolumeSet = oc.getMetaVolumeSet(); + StorageVolume vol0 = metaVolumeSet.getVolumesList().get(0); + + try { + // simulate a problem by removing the read permission of the db dir + DatanodeTestUtils.injectContainerMetaDirFailure(dbDir); + + metaVolumeSet.checkVolumeAsync(vol0); + DatanodeTestUtils.waitForCheckVolume(metaVolumeSet, 1L); + DatanodeTestUtils.waitForHandleFailedVolume(metaVolumeSet, 1); + } finally { + // restore all + DatanodeTestUtils.restoreContainerMetaDirFromFailure(dbDir); + } + } + } + } + + private static MiniOzoneCluster newCluster(boolean schemaV3) + throws Exception { + OzoneConfiguration ozoneConfig = new OzoneConfiguration(); + ozoneConfig.set(OZONE_SCM_CONTAINER_SIZE, "1GB"); + ozoneConfig.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, + 0, StorageUnit.MB); + ozoneConfig.setInt(OZONE_REPLICATION, 1); + // keep the cache size = 1, so we could trigger io exception on + // reading on-disk db instance + ozoneConfig.setInt(OZONE_CONTAINER_CACHE_SIZE, 1); + if (!schemaV3) { + ContainerTestUtils.disableSchemaV3(ozoneConfig); + } + // set tolerated = 1 + // shorten the gap between successive checks to ease tests + DatanodeConfiguration dnConf = + ozoneConfig.getObject(DatanodeConfiguration.class); + dnConf.setFailedDataVolumesTolerated(1); + // We are corrupting the metadb volume in the tests. If toleration is set to the default value of 1, + // the datanode will shut down and the test will exit without completing. + // To avoid this, we increase the tolerated volume failures. + dnConf.setFailedMetadataVolumesTolerated(10); + dnConf.setDiskCheckMinGap(Duration.ofSeconds(2)); + ozoneConfig.setFromObject(dnConf); + MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(ozoneConfig) + .setNumDatanodes(1) + .build(); + cluster.waitForClusterToBeReady(); + cluster.waitForPipelineTobeReady(ReplicationFactor.ONE, 30000); + + return cluster; + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/package-info.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/package-info.java new file mode 100644 index 000000000000..d2bb170a5415 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Tests for volume related classes. + */ +package org.apache.hadoop.ozone.dn.volume;