Skip to content
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Evolving support for functional programming/lambda-expressions.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class FunctionsRaisingIOE {

private FunctionsRaisingIOE() {
}

/**
* Function of arity 1 which may raise an IOException.
* @param <T> type of arg1
* @param <R> type of return value.
*/
@FunctionalInterface
public interface FunctionRaisingIOE<T, R> {

R apply(T t) throws IOException;
}

/**
* Function of arity 2 which may raise an IOException.
* @param <T> type of arg1
* @param <U> type of arg2
* @param <R> type of return value.
*/
@FunctionalInterface
public interface BiFunctionRaisingIOE<T, U, R> {

R apply(T t, U u) throws IOException;
}

/**
* This is a callable which only raises an IOException.
* @param <R> return type
*/
@FunctionalInterface
public interface CallableRaisingIOE<R> {

R apply() throws IOException;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -108,20 +109,55 @@ public static <T> T awaitFuture(final Future<T> future,
*/
public static <T> T raiseInnerCause(final ExecutionException e)
throws IOException {
throw unwrapInnerException(e);
}

/**
* Extract the cause of a completion failure and rethrow it if an IOE
* or RTE.
* @param e exception.
* @param <T> type of return value.
* @return nothing, ever.
* @throws IOException either the inner IOException, or a wrapper around
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
public static <T> T raiseInnerCause(final CompletionException e)
throws IOException {
throw unwrapInnerException(e);
}

/**
* From the inner cause of an execution exception, extract the inner cause.
* If it is an RTE: throw immediately.
* If it is an IOE: Return.
* If it is a WrappedIOException: Unwrap and return
* Else: create a new IOException.
*
* Recursively handles wrapped Execution and Completion Exceptions in
* case something very complicated has happened.
* @param e exception.
* @return an IOException extracted or built from the cause.
* @throws RuntimeException if that is the inner cause.
*/
private static IOException unwrapInnerException(final Throwable e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
return (IOException) cause;
} else if (cause instanceof WrappedIOException){
throw ((WrappedIOException) cause).getCause();
return ((WrappedIOException) cause).getCause();
} else if (cause instanceof CompletionException){
return unwrapInnerException(cause);
} else if (cause instanceof ExecutionException){
return unwrapInnerException(cause);
} else if (cause instanceof RuntimeException){
throw (RuntimeException) cause;
} else if (cause != null) {
// other type: wrap with a new IOE
throw new IOException(cause);
return new IOException(cause);
} else {
// this only happens if somebody deliberately raises
// an ExecutionException
throw new IOException(e);
// this only happens if there was no cause.
return new IOException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,12 @@

<property>
<name>fs.s3a.connection.maximum</name>
<value>15</value>
<description>Controls the maximum number of simultaneous connections to S3.</description>
<value>48</value>
<description>Controls the maximum number of simultaneous connections to S3.
This must be bigger than the value of fs.s3a.threads.max so as to stop
threads being blocked waiting for new HTTPS connections.
Why not equal? The AWS SDK transfer manager also uses these connections.
</description>
</property>

<property>
Expand Down Expand Up @@ -1312,7 +1316,7 @@

<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
<value>64</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
Expand All @@ -1326,8 +1330,25 @@

<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
<description>The number of operations which can be queued for execution</description>
<value>32</value>
<description>The number of operations which can be queued for execution.
This is in addition to the number of active threads in fs.s3a.threads.max.
</description>
</property>

<property>
<name>fs.s3a.executor.capacity</name>
<value>16</value>
<description>The maximum number of submitted tasks which is a single
operation (e.g. rename(), delete()) may submit simultaneously for
execution -excluding the IO-heavy block uploads, whose capacity
is set in "fs.s3a.fast.upload.active.blocks"

All tasks are submitted to the shared thread pool whose size is
set in "fs.s3a.threads.max"; the value of capacity should be less than that
of the thread pool itself, as the goal is to stop a single operation
from overloading that thread pool.
</description>
</property>

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testRenamePopulatesFileAncestors() throws IOException {
* @param dst the destination root to move
* @param nestedPath the nested path to move
*/
private void validateAncestorsMoved(Path src, Path dst, String nestedPath)
protected void validateAncestorsMoved(Path src, Path dst, String nestedPath)
throws IOException {
assertIsDirectory(dst);
assertPathDoesNotExist("src path should not exist", path(src + nestedPath));
Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@
<artifactId>aws-java-sdk-bundle</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,15 @@ private Constants() {
public static final String ASSUMED_ROLE_CREDENTIALS_DEFAULT =
SimpleAWSCredentialsProvider.NAME;


// the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove empty line

public static final int DEFAULT_MAX_TOTAL_TASKS = 32;

// number of simultaneous connections to s3
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 48;

// connect to s3 over ssl?
public static final String SECURE_CONNECTIONS =
Expand Down Expand Up @@ -194,10 +200,6 @@ private Constants() {
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60;

// the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
public static final int DEFAULT_MAX_TOTAL_TASKS = 5;

// size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
Expand Down Expand Up @@ -283,6 +285,22 @@ private Constants() {
@InterfaceStability.Unstable
public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;

/**
* The capacity of executor queues for operations other than block
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
* This should be less than {@link #MAX_THREADS} for fair
* submission.
* Value: {@value}.
*/
public static final String EXECUTOR_CAPACITY = "fs.s3a.executor.capacity";

/**
* The capacity of executor queues for operations other than block
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
* Value: {@value}
*/
public static final int DEFAULT_EXECUTOR_CAPACITY = 16;

// Private | PublicRead | PublicReadWrite | AuthenticatedRead |
// LogDeliveryWrite | BucketOwnerRead | BucketOwnerFullControl
public static final String CANNED_ACL = "fs.s3a.acl.default";
Expand Down
Loading