-
Notifications
You must be signed in to change notification settings - Fork 587
HDDS-12852. Implement a sliding window counter utility #8498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
2d6175a
HDDS-12852. Implement a sliding window based failure counter utility
ptlrs b5d51ff
HDDS-12852. Update tests
ptlrs 429052f
HDDS-12852. refactor from a failure sliding window to a generic slidi…
ptlrs b0d2f68
HDDS-12852. Remove unused methods from SlidingWindow class
ptlrs 94665ba
Merge remote-tracking branch 'upstream/master' into HDDS-12852-slidin…
ptlrs 863b364
HDDS-12852. Remove unused expiryDuration field from SlidingWindow class
ptlrs 0944203
HDDS-12852. Add expiryDuration field initialization in SlidingWindow …
ptlrs c9efc73
HDDS-12852. Remove unused expiryDuration field from SlidingWindow class
ptlrs 36a32ed
HDDS-12852. Add support for custom Clock in SlidingWindow and refacto…
ptlrs 08a522c
HDDS-12852. Refactor SlidingWindow class and relocate to common utils…
ptlrs 6fc39bd
HDDS-12852. Add getNumEvents method to SlidingWindow and improve test…
ptlrs f05613a
HDDS-12852. Rename isFull to isExceeded in SlidingWindow and update t…
ptlrs 394a482
HDDS-12852. Add @VisibleForTesting annotation and refactor test initi…
ptlrs 823aa5f
HDDS-12852. Add getNumEventsInWindow method to SlidingWindow
ptlrs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
75 changes: 75 additions & 0 deletions
75
...r-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/SlidingWindow.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.ozone.container.common.utils; | ||
|
|
||
| import java.util.ArrayDeque; | ||
| import java.util.Deque; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| /** | ||
| * A time-based sliding window implementation that tracks only failed test results within a specified time duration. | ||
| * It determines failure based on a configured tolerance threshold. | ||
| * | ||
| * The queue saves one failure more than the configured tolerance threshold, | ||
| * so that the window can be considered failed. | ||
| */ | ||
| public class SlidingWindow { | ||
| private final long windowDuration; | ||
| private final TimeUnit timeUnit; | ||
| private final int failureTolerance; | ||
| private final Deque<Long> failureTimestamps; | ||
|
|
||
| /** | ||
| * @param failureTolerance the number of failures that can be tolerated before the window is considered failed | ||
| * @param windowDuration the duration of the sliding window | ||
| * @param timeUnit the time unit of the window duration | ||
| */ | ||
| public SlidingWindow(int failureTolerance, long windowDuration, TimeUnit timeUnit) { | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.windowDuration = windowDuration; | ||
| this.timeUnit = timeUnit; | ||
| this.failureTolerance = failureTolerance; | ||
| // If the failure tolerance is high, we limit the queue size to 100 as we want to control the memory usage | ||
| this.failureTimestamps = new ArrayDeque<>(Math.min(failureTolerance + 1, 100)); | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| public synchronized void add(boolean result) { | ||
| if (!result) { | ||
| if (failureTolerance > 0 && failureTimestamps.size() > failureTolerance) { | ||
| failureTimestamps.remove(); | ||
| } | ||
| long currentTime = System.currentTimeMillis(); | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| failureTimestamps.addLast(currentTime); | ||
| } | ||
|
|
||
| removeExpiredFailures(); | ||
| } | ||
|
|
||
| public synchronized boolean isFailed() { | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| removeExpiredFailures(); | ||
| return failureTimestamps.size() > failureTolerance; | ||
| } | ||
|
|
||
| private void removeExpiredFailures() { | ||
| long currentTime = System.currentTimeMillis(); | ||
| long expirationThreshold = currentTime - timeUnit.toMillis(windowDuration); | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| while (!failureTimestamps.isEmpty() && failureTimestamps.peek() < expirationThreshold) { | ||
| failureTimestamps.remove(); | ||
| } | ||
| } | ||
| } | ||
234 changes: 234 additions & 0 deletions
234
...rvice/src/test/java/org/apache/hadoop/ozone/container/common/utils/TestSlidingWindow.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,234 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.ozone.container.common.utils; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.Timeout; | ||
|
|
||
| /** | ||
| * Tests for {@link SlidingWindow} class. | ||
| */ | ||
| public class TestSlidingWindow { | ||
|
|
||
| private SlidingWindow slidingWindow; | ||
|
|
||
| @BeforeEach | ||
| public void setup() { | ||
| slidingWindow = new SlidingWindow(3, 5, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAddSuccessfulResult() { | ||
| for (int i = 0; i < 10; i++) { | ||
| slidingWindow.add(true); | ||
| assertFalse(slidingWindow.isFailed()); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testAddFailedResult() { | ||
| for (int i = 0; i < 3; i++) { | ||
| slidingWindow.add(false); | ||
| assertFalse(slidingWindow.isFailed()); | ||
| } | ||
|
|
||
| // Adding one more failed result should mark as failed | ||
| slidingWindow.add(false); | ||
| assertTrue(slidingWindow.isFailed()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testMixedResults() { | ||
| slidingWindow.add(false); | ||
| slidingWindow.add(false); | ||
| slidingWindow.add(false); | ||
| assertFalse(slidingWindow.isFailed()); | ||
|
|
||
| // Add successful result - should not affect failure count | ||
| slidingWindow.add(true); | ||
| assertFalse(slidingWindow.isFailed()); | ||
|
|
||
| // Add one more failed result - should mark as failed | ||
| slidingWindow.add(false); | ||
| assertTrue(slidingWindow.isFailed()); | ||
|
|
||
| // Add more successful results - should not affect failure status | ||
| slidingWindow.add(true); | ||
| slidingWindow.add(true); | ||
| assertTrue(slidingWindow.isFailed()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFailureExpiration() throws InterruptedException { | ||
| slidingWindow = new SlidingWindow(2, 500, TimeUnit.MILLISECONDS); | ||
|
|
||
| // Add failed results to reach failure threshold | ||
| slidingWindow.add(false); | ||
| slidingWindow.add(false); | ||
| slidingWindow.add(false); | ||
| assertTrue(slidingWindow.isFailed()); | ||
|
|
||
| // Wait for failures to expire | ||
| Thread.sleep(600); | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| assertFalse(slidingWindow.isFailed()); | ||
|
|
||
| // Add one more failure - should not be enough to mark as failed | ||
| slidingWindow.add(false); | ||
| assertFalse(slidingWindow.isFailed()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testPartialExpiration() throws InterruptedException { | ||
| slidingWindow = new SlidingWindow(3, 1, TimeUnit.SECONDS); | ||
|
|
||
| slidingWindow.add(false); | ||
| slidingWindow.add(false); | ||
| slidingWindow.add(false); | ||
| slidingWindow.add(false); | ||
| assertTrue(slidingWindow.isFailed()); | ||
|
|
||
| Thread.sleep(600); | ||
| slidingWindow.add(false); // this will remove the oldest failure as the window is full | ||
|
|
||
| // Wait for the oldest failures to expire | ||
| Thread.sleep(500); | ||
| assertFalse(slidingWindow.isFailed()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testZeroFailureTolerance() { | ||
| // Window with zero failure tolerance | ||
| SlidingWindow zeroToleranceWindow = new SlidingWindow(0, 5, TimeUnit.SECONDS); | ||
|
|
||
| // Any failure should mark as failed | ||
| zeroToleranceWindow.add(false); | ||
| assertTrue(zeroToleranceWindow.isFailed()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testHighFailureTolerance() { | ||
| SlidingWindow highToleranceWindow = new SlidingWindow(10, 5, TimeUnit.SECONDS); | ||
|
|
||
| // Add failures less than tolerance | ||
| for (int i = 0; i < 10; i++) { | ||
| highToleranceWindow.add(false); | ||
| assertFalse(highToleranceWindow.isFailed()); | ||
| } | ||
|
|
||
| // Add one more to reach tolerance | ||
| highToleranceWindow.add(false); | ||
| assertTrue(highToleranceWindow.isFailed()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFailureQueueManagement() { | ||
| SlidingWindow window = new SlidingWindow(3, 5, TimeUnit.SECONDS); | ||
|
|
||
| // Add more failures than the tolerance | ||
| for (int i = 0; i < 10; i++) { | ||
| window.add(false); | ||
| } | ||
|
|
||
| // Should be failed | ||
| assertTrue(window.isFailed()); | ||
|
|
||
| // Add successful results - should not affect failure status | ||
| for (int i = 0; i < 5; i++) { | ||
| window.add(true); | ||
| } | ||
|
|
||
| // Should still be failed | ||
| assertTrue(window.isFailed()); | ||
| } | ||
|
|
||
| @Test | ||
| @Timeout(value = 10) | ||
| public void testConcurrentAccess() throws InterruptedException { | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Create a sliding window with tolerance of 5 | ||
| final SlidingWindow concurrentWindow = new SlidingWindow(5, 5, TimeUnit.SECONDS); | ||
| final int threadCount = 10; | ||
| final int operationsPerThread = 100; | ||
| final ExecutorService executor = Executors.newFixedThreadPool(threadCount); | ||
| final CountDownLatch startLatch = new CountDownLatch(1); | ||
| final CountDownLatch finishLatch = new CountDownLatch(threadCount); | ||
| final AtomicBoolean hasError = new AtomicBoolean(false); | ||
|
|
||
| // Create and submit tasks | ||
| for (int i = 0; i < threadCount; i++) { | ||
| final int threadId = i; | ||
| executor.submit(() -> { | ||
| try { | ||
| startLatch.await(); // Wait for all threads to be ready | ||
| for (int j = 0; j < operationsPerThread; j++) { | ||
| // Alternate between adding success and failure based on thread ID and iteration | ||
| boolean result = (threadId + j) % 2 == 0; | ||
| concurrentWindow.add(result); | ||
| // Check failure status occasionally | ||
| if (j % 10 == 0) { | ||
| concurrentWindow.isFailed(); | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| hasError.set(true); | ||
| e.printStackTrace(); | ||
| } finally { | ||
| finishLatch.countDown(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| // Start all threads | ||
| startLatch.countDown(); | ||
|
|
||
| // Wait for all threads to finish | ||
| finishLatch.await(); | ||
| executor.shutdown(); | ||
| executor.awaitTermination(5, TimeUnit.SECONDS); | ||
|
|
||
| // Verify no exceptions occurred | ||
| assertFalse(hasError.get(), "Concurrent operations caused errors"); | ||
| } | ||
|
|
||
| @Test | ||
| public void testEdgeCases() { | ||
| // Test with minimum values | ||
| SlidingWindow minWindow = new SlidingWindow(1, 1, TimeUnit.MILLISECONDS); | ||
| minWindow.add(false); | ||
| assertFalse(minWindow.isFailed()); | ||
| minWindow.add(false); | ||
| assertTrue(minWindow.isFailed()); | ||
|
|
||
| // Test with large values | ||
| SlidingWindow maxWindow = new SlidingWindow(Integer.MAX_VALUE - 1, | ||
| Long.MAX_VALUE / 1000, TimeUnit.SECONDS); | ||
| for (int i = 0; i < 100; i++) { | ||
| maxWindow.add(false); | ||
| assertFalse(maxWindow.isFailed()); | ||
| } | ||
| } | ||
| } | ||
19 changes: 19 additions & 0 deletions
19
...er-service/src/test/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| /** Tests for Common container utils. */ | ||
| package org.apache.hadoop.ozone.container.common.utils; |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.