Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import java.time.Duration;
import javax.annotation.Nullable;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* An optional interface for classes that provide rate limiters.
* For a filesystem source, the operation name SHOULD be one of
* those listed in
* {@link org.apache.hadoop.fs.statistics.StoreStatisticNames}
* if the operation is listed there.
* <p>
* This interfaces is intended to be exported by FileSystems so that
* applications wishing to perform bulk operations may request access
* to a rate limiter <i>which is shared across all threads interacting
* with the store.</i>.
* That is: the rate limiting is global to the specific instance of the
* object implementing this interface.
* <p>
* It is not expected to be shared with other instances of the same
* class, or across processes.
* <p>
* This means it is primarily of benefit when limiting bulk operations
* which can overload an (object) store from a small pool of threads.
* Examples of this can include:
* <ul>
* <li>Bulk delete operations</li>
* <li>Bulk rename operations</li>
* <li>Completing many in-progress uploads</li>
* <li>Deep and wide recursive treewalks</li>
* <li>Reading/prefetching many blocks within a file</li>
* </ul>
* In cluster applications, it is more likely that rate limiting is
* useful during job commit operations, or processes with many threads.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface IORateLimiter {

/**
* Acquire IO capacity.
* <p>
* The implementation may assign different costs to the different
* operations.
* <p>
* If there is not enough space, the permits will be acquired,
* but the subsequent call will block until the capacity has been
* refilled.
* <p>
* The path parameter is used to support stores where there may be different throttling
* under different paths.
* @param operation operation being performed. Must not be null, may be "",
* should be from {@link org.apache.hadoop.fs.statistics.StoreStatisticNames}
* where there is a matching operation.
* @param source path for operations.
* Use "/" for root/store-wide operations.
* @param dest destination path for rename operations or any other operation which
* takes two paths.
* @param requestedCapacity capacity to acquire.
* Must be greater than or equal to 0.
* @return time spent waiting for output.
*/
Duration acquireIOCapacity(
String operation,
Path source,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A multi-delete operation takes a list of paths. Although we have a concept of the base path, I don't think the S3 client cares about every path to be under the base path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3 throttling does as it is per prefix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand this better...
If we have a list of paths on which we are attempting a bulk operation and the only common prefix for them, is the root itself.
Should we acquire IO Capacity for each individual path or for the root path itself??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really good q. will comment below

@Nullable Path dest,
int requestedCapacity);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import org.apache.hadoop.fs.IORateLimiter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.RateLimitingFactory;

import static org.apache.hadoop.util.Preconditions.checkArgument;

/**
* Implementation support for {@link IORateLimiter}.
*/
public final class IORateLimiterSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a wrapper on top of RestrictedRateLimiting with extra operation name validation right?
I think this can be extended to limit per operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the op name and path you can be clever:

  • limit by path
  • use operation name and have a "multiplier" of actual io, to include extra operations made (rename: list, copy, delete). for s3, separate read/write io capacities would need to be requested.
  • consider some free and give a cost of 0


private static final IORateLimiter UNLIMITED = createIORateLimiter(0);

private IORateLimiterSupport() {
}

/**
* Get a rate limiter source which has no rate limiting.
* @return a rate limiter source which has no rate limiting.
*/
public static IORateLimiter unlimited() {
return UNLIMITED;
}

/**
* Create a rate limiter with a fixed capacity.
* @param capacityPerSecond capacity per second.
* @return a rate limiter.
*/
public static IORateLimiter createIORateLimiter(int capacityPerSecond) {
final RateLimiting limiting = RateLimitingFactory.create(capacityPerSecond);
return (operation, source, dest, requestedCapacity) -> {
validateArgs(operation, source, dest, requestedCapacity);
return limiting.acquire(requestedCapacity);
};
}

/**
* Validate the arguments.
* @param operation
* @param source
* @param dest
* @param requestedCapacity
*/
private static void validateArgs(String operation,
Path source,
Path dest,
int requestedCapacity) {
checkArgument(operation != null, "null operation");
checkArgument(source != null, "null source path");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_DELETE = "op_delete";

/** {@value}. */
public static final String OP_DELETE_BULK = "op_delete_bulk";

/** {@value}. */
public static final String OP_DELETE_DIR = "op_delete_dir";

/** {@value}. */
public static final String OP_EXISTS = "op_exists";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
* Can be used to throttle use of object stores where excess load
* will trigger cluster-wide throttling, backoff etc. and so collapse
* performance.
* <p>
* The time waited is returned as a Duration type.
* The google rate limiter implements this by allowing a caller to ask for
* <p>
* The google rate limiter implements rate limiting by allowing a caller to ask for
* more capacity than is available. This will be granted
* but the subsequent request will be blocked if the bucket of
* capacity hasn't let refilled to the point where there is
Expand All @@ -44,8 +46,11 @@ public interface RateLimiting {
* If there is not enough space, the permits will be acquired,
* but the subsequent call will block until the capacity has been
* refilled.
* <p>
* If the capacity is zero, no delay will take place.
* @param requestedCapacity capacity to acquire.
* @return time spent waiting for output.
* Must be greater than or equal to 0.
* @return time spent waiting to acquire the capacity..
*/
Duration acquire(int requestedCapacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;

import static org.apache.hadoop.util.Preconditions.checkArgument;

/**
* Factory for Rate Limiting.
* This should be only place in the code where the guava RateLimiter is imported.
Expand All @@ -50,6 +52,7 @@ private static class NoRateLimiting implements RateLimiting {

@Override
public Duration acquire(int requestedCapacity) {
checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0");
return INSTANTLY;
}
}
Expand All @@ -70,6 +73,11 @@ private RestrictedRateLimiting(int capacityPerSecond) {

@Override
public Duration acquire(int requestedCapacity) {
checkArgument(requestedCapacity >= 0, "requestedCapacity must be >= 0");
if (requestedCapacity == 0) {
// google limiter does not do this.
return INSTANTLY;
}
final double delayMillis = limiter.acquire(requestedCapacity);
return delayMillis == 0
? INSTANTLY
Expand Down
Loading