From d2e146e4180311a52a94240922e3daf8f94ec8bd Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 3 Apr 2024 17:25:59 +0100 Subject: [PATCH] HADOOP-19140. [ABFS, S3A] Add IORateLimiter API Adds an API (pulled from #6596) to allow callers to request IO capacity for an named operation with optional source and dest paths. Change-Id: I02aff4d3c90ac299c80f388e88195d69e1049fe0 --- .../org/apache/hadoop/fs/IORateLimiter.java | 90 ++++++++ .../hadoop/fs/impl/IORateLimiterSupport.java | 73 ++++++ .../fs/statistics/StoreStatisticNames.java | 6 + .../org/apache/hadoop/util/RateLimiting.java | 9 +- .../hadoop/util/RateLimitingFactory.java | 8 + .../apache/hadoop/fs/TestIORateLimiter.java | 213 ++++++++++++++++++ 6 files changed, 397 insertions(+), 2 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java new file mode 100644 index 0000000000000..f5174fbcd0c0f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.time.Duration; +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An optional interface for classes that provide rate limiters. + * For a filesystem source, the operation name SHOULD be one of + * those listed in + * {@link org.apache.hadoop.fs.statistics.StoreStatisticNames} + * if the operation is listed there. + *

+ * This interfaces is intended to be exported by FileSystems so that + * applications wishing to perform bulk operations may request access + * to a rate limiter which is shared across all threads interacting + * with the store.. + * That is: the rate limiting is global to the specific instance of the + * object implementing this interface. + *

+ * It is not expected to be shared with other instances of the same + * class, or across processes. + *

+ * This means it is primarily of benefit when limiting bulk operations + * which can overload an (object) store from a small pool of threads. + * Examples of this can include: + *

+ * In cluster applications, it is more likely that rate limiting is + * useful during job commit operations, or processes with many threads. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface IORateLimiter { + + /** + * Acquire IO capacity. + *

+ * The implementation may assign different costs to the different + * operations. + *

+ * If there is not enough space, the permits will be acquired, + * but the subsequent call will block until the capacity has been + * refilled. + *

+ * The path parameter is used to support stores where there may be different throttling + * under different paths. + * @param operation operation being performed. Must not be null, may be "", + * should be from {@link org.apache.hadoop.fs.statistics.StoreStatisticNames} + * where there is a matching operation. + * @param source path for operations. + * Use "/" for root/store-wide operations. + * @param dest destination path for rename operations or any other operation which + * takes two paths. + * @param requestedCapacity capacity to acquire. + * Must be greater than or equal to 0. + * @return time spent waiting for output. + */ + Duration acquireIOCapacity( + String operation, + Path source, + @Nullable Path dest, + int requestedCapacity); + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java new file mode 100644 index 0000000000000..5b330b26f09bc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.IORateLimiter; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.RateLimitingFactory; + +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Implementation support for {@link IORateLimiter}. + */ +public final class IORateLimiterSupport { + + private static final IORateLimiter UNLIMITED = createIORateLimiter(0); + + private IORateLimiterSupport() { + } + + /** + * Get a rate limiter source which has no rate limiting. + * @return a rate limiter source which has no rate limiting. + */ + public static IORateLimiter unlimited() { + return UNLIMITED; + } + + /** + * Create a rate limiter with a fixed capacity. + * @param capacityPerSecond capacity per second. + * @return a rate limiter. + */ + public static IORateLimiter createIORateLimiter(int capacityPerSecond) { + final RateLimiting limiting = RateLimitingFactory.create(capacityPerSecond); + return (operation, source, dest, requestedCapacity) -> { + validateArgs(operation, source, dest, requestedCapacity); + return limiting.acquire(requestedCapacity); + }; + } + + /** + * Validate the arguments. + * @param operation + * @param source + * @param dest + * @param requestedCapacity + */ + private static void validateArgs(String operation, + Path source, + Path dest, + int requestedCapacity) { + checkArgument(operation != null, "null operation"); + checkArgument(source != null, "null source path"); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index 19ee9d1414ecf..d2096e95e6cb0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -63,6 +63,12 @@ public final class StoreStatisticNames { /** {@value}. */ public static final String OP_DELETE = "op_delete"; + /** {@value}. */ + public static final String OP_DELETE_BULK = "op_delete_bulk"; + + /** {@value}. */ + public static final String OP_DELETE_DIR = "op_delete_dir"; + /** {@value}. */ public static final String OP_EXISTS = "op_exists"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java index ae119c0e630f4..367e236dac8b7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java @@ -28,8 +28,10 @@ * Can be used to throttle use of object stores where excess load * will trigger cluster-wide throttling, backoff etc. and so collapse * performance. + *

* The time waited is returned as a Duration type. - * The google rate limiter implements this by allowing a caller to ask for + *

+ * The google rate limiter implements rate limiting by allowing a caller to ask for * more capacity than is available. This will be granted * but the subsequent request will be blocked if the bucket of * capacity hasn't let refilled to the point where there is @@ -44,8 +46,11 @@ public interface RateLimiting { * If there is not enough space, the permits will be acquired, * but the subsequent call will block until the capacity has been * refilled. + *

+ * If the capacity is zero, no delay will take place. * @param requestedCapacity capacity to acquire. - * @return time spent waiting for output. + * Must be greater than or equal to 0. + * @return time spent waiting to acquire the capacity.. */ Duration acquire(int requestedCapacity); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java index 621415456e125..fb5c45f0e0305 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; +import static org.apache.hadoop.util.Preconditions.checkArgument; + /** * Factory for Rate Limiting. * This should be only place in the code where the guava RateLimiter is imported. @@ -50,6 +52,7 @@ private static class NoRateLimiting implements RateLimiting { @Override public Duration acquire(int requestedCapacity) { + checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0"); return INSTANTLY; } } @@ -70,6 +73,11 @@ private RestrictedRateLimiting(int capacityPerSecond) { @Override public Duration acquire(int requestedCapacity) { + checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0"); + if (requestedCapacity == 0) { + // google limiter does not do this. + return INSTANTLY; + } final double delayMillis = limiter.acquire(requestedCapacity); return delayMillis == 0 ? INSTANTLY diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java new file mode 100644 index 0000000000000..43d4e4271f0e3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.time.Duration; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.impl.IORateLimiterSupport; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.RateLimitingFactory; + +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE_BULK; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE_DIR; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test IO rate limiting in {@link RateLimiting} and {@link IORateLimiter}. + *

+ * This includes: illegal arguments, and what if more capacity + * is requested than is available. + */ +public class TestIORateLimiter extends AbstractHadoopTestBase { + + private static final Logger LOG = LoggerFactory.getLogger( + TestIORateLimiter.class); + + public static final Path ROOT = new Path("/"); + + @Test + public void testAcquireCapacity() { + final int size = 10; + final RateLimiting limiter = RateLimitingFactory.create(size); + // do a chain of requests + limiter.acquire(0); + limiter.acquire(1); + limiter.acquire(2); + + // now ask for more than is allowed. This MUST work. + final int excess = size * 2; + limiter.acquire(excess); + assertDelayed(limiter, excess); + } + + @Test + public void testNegativeCapacityRejected() throws Throwable { + final RateLimiting limiter = RateLimitingFactory.create(1); + intercept(IllegalArgumentException.class, () -> + limiter.acquire(-1)); + } + + @Test + public void testNegativeLimiterCapacityRejected() throws Throwable { + intercept(IllegalArgumentException.class, () -> + RateLimitingFactory.create(-1)); + } + + /** + * This is a key behavior: it is acceptable to ask for more capacity + * than the caller has, the initial request must be granted, + * but the followup request must be delayed until enough capacity + * has been restored. + */ + @Test + public void testAcquireExcessCapacity() { + + // create a small limiter + final int size = 10; + final RateLimiting limiter = RateLimitingFactory.create(size); + + // now ask for more than is allowed. This MUST work. + final int excess = size * 2; + // first attempt gets more capacity than arrives every second. + assertNotDelayed(limiter, excess); + // second attempt will block + assertDelayed(limiter, excess); + // third attempt will block + assertDelayed(limiter, size); + // as these are short-cut, no delays. + assertNotDelayed(limiter, 0); + } + + @Test + public void testIORateLimiterWithLimitedCapacity() { + final int size = 10; + final IORateLimiter limiter = IORateLimiterSupport.createIORateLimiter(size); + // this size will use more than can be allocated in a second. + final int excess = size * 2; + // first attempt gets more capacity than arrives every second. + assertNotDelayed(limiter, OP_DELETE_DIR, excess); + // second attempt will block + assertDelayed(limiter, OP_DELETE_BULK, excess); + // third attempt will block + assertDelayed(limiter, OP_DELETE, size); + // as zero capacity requests are short-cut, no delays, ever. + assertNotDelayed(limiter, "", 0); + } + + /** + * Verify the unlimited rate limiter really is unlimited. + */ + @Test + public void testIORateLimiterWithUnlimitedCapacity() { + final IORateLimiter limiter = IORateLimiterSupport.unlimited(); + // this size will use more than can be allocated in a second. + + assertNotDelayed(limiter, "1", 100_000); + assertNotDelayed(limiter, "2", 100_000); + } + + @Test + public void testUnlimitedRejectsNegativeCapacity() throws Exception { + intercept(IllegalArgumentException.class, () -> + IORateLimiterSupport.unlimited().acquireIOCapacity("", ROOT, ROOT, -1)); + } + + @Test + public void testUnlimitedRejectsNullOperation() throws Exception { + intercept(IllegalArgumentException.class, () -> + IORateLimiterSupport.unlimited().acquireIOCapacity(null, ROOT, null, 0)); + } + + @Test + public void testUnlimitedRejectsNullSource() throws Exception { + intercept(IllegalArgumentException.class, () -> + IORateLimiterSupport.unlimited().acquireIOCapacity("", null, null, 1)); + } + + /** + * Assert that a request for a given capacity is delayed. + * There's no assertion on the duration, only that it is greater than 0. + * @param limiter limiter + * @param capacity capacity + */ + private static void assertNotDelayed(final RateLimiting limiter, final int capacity) { + assertZeroDuration(capacity, limiter.acquire(capacity)); + } + + /** + * Assert that a request for a given capacity is delayed. + * There's no assertion on the duration, only that it is greater than 0. + * @param limiter limiter + * @param capacity capacity + */ + private static void assertDelayed(final RateLimiting limiter, final int capacity) { + assertNonZeroDuration(capacity, limiter.acquire(capacity)); + } + + /** + * Assert that a request for a given capacity is not delayed. + * @param limiter limiter + * @param op operation + * @param capacity capacity + */ + private static void assertNotDelayed(IORateLimiter limiter, String op, int capacity) { + assertZeroDuration(capacity, limiter.acquireIOCapacity(op, ROOT, null, capacity)); + } + + /** + * Assert that a request for a given capacity is delayed. + * There's no assertion on the duration, only that it is greater than 0. + * @param limiter limiter + * @param op operation + * @param capacity capacity + */ + private static void assertDelayed(IORateLimiter limiter, String op, int capacity) { + assertNonZeroDuration(capacity, limiter.acquireIOCapacity(op, ROOT, null, capacity)); + } + + /** + * Assert that duration was not zero. + * @param capacity capacity requested + * @param duration duration + */ + private static void assertNonZeroDuration(final int capacity, final Duration duration) { + LOG.info("Delay for {} capacity: {}", capacity, duration); + Assertions.assertThat(duration) + .describedAs("delay for %d capacity", capacity) + .isGreaterThan(Duration.ZERO); + } + + /** + * Assert that duration was zero. + * @param capacity capacity requested + * @param duration duration + */ + private static void assertZeroDuration(final int capacity, final Duration duration) { + Assertions.assertThat(duration) + .describedAs("delay for %d capacity", capacity) + .isEqualTo(Duration.ZERO); + } +} \ No newline at end of file