Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hadoop.io.wrappedio;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -29,17 +29,19 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;

/**
* Reflection-friendly access to APIs which are not available in
* some of the older Hadoop versions which libraries still
* compile against.
* <p>
* The intent is to avoid the need for complex reflection operations
* including wrapping of parameter classes, direct instatiation of
* including wrapping of parameter classes, direct instantiation of
* new classes etc.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceStability.Unstable
public final class WrappedIO {

private WrappedIO() {
Expand All @@ -52,12 +54,15 @@ private WrappedIO() {
* @return a number greater than or equal to zero.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws IllegalArgumentException path not valid.
* @throws IOException problems resolving paths
* @throws UncheckedIOException if an IOE was raised.
*/
public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOException {
try (BulkDelete bulk = fs.createBulkDelete(path)) {
return bulk.pageSize();
}
public static int bulkDelete_pageSize(FileSystem fs, Path path) {

return uncheckIOExceptions(() -> {
try (BulkDelete bulk = fs.createBulkDelete(path)) {
return bulk.pageSize();
}
});
}

/**
Expand All @@ -79,15 +84,17 @@ public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOExcepti
* @param paths list of paths which must be absolute and under the base path.
* @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws IOException IO problems including networking, authentication and more.
* @throws UncheckedIOException if an IOE was raised.
* @throws IllegalArgumentException if a path argument is invalid.
*/
public static List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
Path base,
Collection<Path> paths)
throws IOException {
try (BulkDelete bulk = fs.createBulkDelete(base)) {
return bulk.bulkDelete(paths);
}
Path base,
Collection<Path> paths) {

return uncheckIOExceptions(() -> {
try (BulkDelete bulk = fs.createBulkDelete(base)) {
return bulk.bulkDelete(paths);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* raised by the callable and wrapping them as appropriate.
* @param <T> return type.
*/
public final class CommonCallableSupplier<T> implements Supplier {
public final class CommonCallableSupplier<T> implements Supplier<T> {

private static final Logger LOG =
LoggerFactory.getLogger(CommonCallableSupplier.class);
Expand All @@ -57,7 +57,7 @@ public CommonCallableSupplier(final Callable<T> call) {
}

@Override
public Object get() {
public T get() {
try {
return call.call();
} catch (RuntimeException e) {
Expand Down Expand Up @@ -155,4 +155,5 @@ public static void maybeAwaitCompletion(
waitForCompletion(future);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.Supplier;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Functional utilities for IO operations.
*/
@InterfaceAudience.Private
public final class FunctionalIO {

private FunctionalIO() {
}

/**
* Invoke any operation, wrapping IOExceptions with
* {@code UncheckedIOException}.
* @param call callable
* @param <T> type of result
* @return result
* @throws UncheckedIOException if an IOE was raised.
*/
public static <T> T uncheckIOExceptions(CallableRaisingIOE<T> call) {
try {
return call.apply();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* This is similar to {@link CommonCallableSupplier}, except that
* only IOExceptions are caught and wrapped; all other exceptions are
* propagated unchanged.
* @param <T> type of result
*/
private static final class UncheckedIOExceptionSupplier<T> implements Supplier<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any use of this so for though. Assuming it will be used later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest of wrapped IO. and actually just done something identical in #6892


private final CallableRaisingIOE<T> call;

private UncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
this.call = call;
}

@Override
public T get() {
return uncheckIOExceptions(call);
}
}

/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* @param call call to wrap
* @param <T> type of result
* @return a supplier which invokes the call.
*/
public static <T> Supplier<T> toUncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
return new UncheckedIOExceptionSupplier<>(call);
}

/**
* Invoke the supplier, catching any {@code UncheckedIOException} raised,
* extracting the inner IOException and rethrowing it.
* @param call call to invoke
* @param <T> type of result
* @return result
* @throws IOException if the call raised an IOException wrapped by an UncheckedIOException.
*/
public static <T> T extractIOExceptions(Supplier<T> call) throws IOException {
try {
return call.get();
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.io.UncheckedIOException;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.FunctionalIO.extractIOExceptions;
import static org.apache.hadoop.util.functional.FunctionalIO.toUncheckedIOExceptionSupplier;
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;

/**
* Test the functional IO class.
*/
public class TestFunctionalIO extends AbstractHadoopTestBase {

/**
* Verify that IOEs are caught and wrapped.
*/
@Test
public void testUncheckIOExceptions() throws Throwable {
final IOException raised = new IOException("text");
final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () ->
uncheckIOExceptions(() -> {
throw raised;
}));
Assertions.assertThat(ex.getCause())
.describedAs("Cause of %s", ex)
.isSameAs(raised);
}

/**
* Verify that UncheckedIOEs are not double wrapped.
*/
@Test
public void testUncheckIOExceptionsUnchecked() throws Throwable {
final UncheckedIOException raised = new UncheckedIOException(
new IOException("text"));
final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () ->
uncheckIOExceptions(() -> {
throw raised;
}));
Assertions.assertThat(ex)
.describedAs("Propagated Exception %s", ex)
.isSameAs(raised);
}

/**
* Supplier will also wrap IOEs.
*/
@Test
public void testUncheckedSupplier() throws Throwable {
intercept(UncheckedIOException.class, "text", () ->
toUncheckedIOExceptionSupplier(() -> {
throw new IOException("text");
}).get());
}

/**
* The wrap/unwrap code which will be used to invoke operations
* through reflection.
*/
@Test
public void testUncheckAndExtract() throws Throwable {
final IOException raised = new IOException("text");
final IOException ex = intercept(IOException.class, "text", () ->
extractIOExceptions(toUncheckedIOExceptionSupplier(() -> {
throw raised;
})));
Assertions.assertThat(ex)
.describedAs("Propagated Exception %s", ex)
.isSameAs(raised);
}

}