From 9581c1b9f57e4958adec2a0bfa5fe1f1264162a1 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 12 Jun 2024 13:54:19 +0100 Subject: [PATCH 1/2] HADOOP-19203. WrappedIO BulkDelete API to raise IOEs as UncheckedIOExceptions -WrappedIO methods raise UncheckIOEs -new class org.apache.hadoop.util.functional.FunctionalIO with wrap/unwrap and the ability to generate a java.util.function.Supplier around a CallableRaisingIOE. -Tests Change-Id: Icad3bfa30bd5226a5fb6534227e9e56e4b37d536 --- .../apache/hadoop/io/wrappedio/WrappedIO.java | 37 ++++--- .../functional/CommonCallableSupplier.java | 5 +- .../hadoop/util/functional/FunctionalIO.java | 93 ++++++++++++++++++ .../hadoop/util/functional/FutureIO.java | 2 + .../util/functional/TestFunctionalIO.java | 97 +++++++++++++++++++ 5 files changed, 217 insertions(+), 17 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestFunctionalIO.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java index 286557c2c378c..d6fe311fba866 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java @@ -18,7 +18,7 @@ package org.apache.hadoop.io.wrappedio; -import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -29,17 +29,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions; + /** * Reflection-friendly access to APIs which are not available in * some of the older Hadoop versions which libraries still * compile against. *

* The intent is to avoid the need for complex reflection operations - * including wrapping of parameter classes, direct instatiation of + * including wrapping of parameter classes, direct instantiation of * new classes etc. */ @InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceStability.Unstable public final class WrappedIO { private WrappedIO() { @@ -52,12 +54,15 @@ private WrappedIO() { * @return a number greater than or equal to zero. * @throws UnsupportedOperationException bulk delete under that path is not supported. * @throws IllegalArgumentException path not valid. - * @throws IOException problems resolving paths + * @throws UncheckedIOException if an IOE was raised. */ - public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOException { - try (BulkDelete bulk = fs.createBulkDelete(path)) { - return bulk.pageSize(); - } + public static int bulkDelete_pageSize(FileSystem fs, Path path) { + + return uncheckIOExceptions(() -> { + try (BulkDelete bulk = fs.createBulkDelete(path)) { + return bulk.pageSize(); + } + }); } /** @@ -79,15 +84,17 @@ public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOExcepti * @param paths list of paths which must be absolute and under the base path. * @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message. * @throws UnsupportedOperationException bulk delete under that path is not supported. - * @throws IOException IO problems including networking, authentication and more. + * @throws UncheckedIOException if an IOE was raised. * @throws IllegalArgumentException if a path argument is invalid. */ public static List> bulkDelete_delete(FileSystem fs, - Path base, - Collection paths) - throws IOException { - try (BulkDelete bulk = fs.createBulkDelete(base)) { - return bulk.bulkDelete(paths); - } + Path base, + Collection paths) { + + return uncheckIOExceptions(() -> { + try (BulkDelete bulk = fs.createBulkDelete(base)) { + return bulk.bulkDelete(paths); + } + }); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java index 67299ef96aec6..7a3193efbf0d7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java @@ -41,7 +41,7 @@ * raised by the callable and wrapping them as appropriate. * @param return type. */ -public final class CommonCallableSupplier implements Supplier { +public final class CommonCallableSupplier implements Supplier { private static final Logger LOG = LoggerFactory.getLogger(CommonCallableSupplier.class); @@ -57,7 +57,7 @@ public CommonCallableSupplier(final Callable call) { } @Override - public Object get() { + public T get() { try { return call.call(); } catch (RuntimeException e) { @@ -155,4 +155,5 @@ public static void maybeAwaitCompletion( waitForCompletion(future); } } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java new file mode 100644 index 0000000000000..9ff243ef66ae1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.functional; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.function.Supplier; + +/** + * Functional utilities for IO operations. + */ +public final class FunctionalIO { + + /** + * Invoke any operation, wrapping IOExceptions with + * {@code UncheckedIOException}. + * @param call callable + * @return result + * @param type of result + * @throws UncheckedIOException if an IOE was raised. + */ + public static T uncheckIOExceptions(CallableRaisingIOE call) { + try { + return call.apply(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Wrap a {@link CallableRaisingIOE} as a {@link Supplier}. + * This is similar to {@link CommonCallableSupplier}, except that + * only IOExceptions are caught and wrapped; all other exceptions are + * propagated unchanged. + * + * @param type of result + */ + private static final class UncheckedIOExceptionSupplier implements Supplier { + private final CallableRaisingIOE call; + + private UncheckedIOExceptionSupplier(CallableRaisingIOE call) { + this.call = call; + } + + @Override + public T get() { + return uncheckIOExceptions(call); + } + } + + /** + * Wrap a {@link CallableRaisingIOE} as a {@link Supplier}. + * @param call call to wrap + * @param type of result + * @return a supplier which invokes the call. + */ + public static Supplier toUncheckedIOExceptionSupplier(CallableRaisingIOE call) { + return new UncheckedIOExceptionSupplier<>(call); + } + + /** + * Invoke the supplier, catching any {@code UncheckedIOException} raised, + * extracting the inner IOException and rethrowing it. + * @param call call to invoke + * @return result + * @param type of result + * @throws IOException if the call raised an IOException wrapped by an UncheckedIOException. + */ + public static T extractIOExceptions(Supplier call) throws IOException { + try { + return call.get(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java index 2f043b6499795..bd8f2dbd46d6e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java @@ -32,6 +32,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -354,4 +355,5 @@ public static CompletableFuture eval( } return result; } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestFunctionalIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestFunctionalIO.java new file mode 100644 index 0000000000000..25bdab8ea3203 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestFunctionalIO.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.functional; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.util.functional.FunctionalIO.extractIOExceptions; +import static org.apache.hadoop.util.functional.FunctionalIO.toUncheckedIOExceptionSupplier; +import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions; + +/** + * Test the functional IO class. + */ +public class TestFunctionalIO extends AbstractHadoopTestBase { + + /** + * Verify that IOEs are caught and wrapped. + */ + @Test + public void testUncheckIOExceptions() throws Throwable { + final IOException raised = new IOException("text"); + final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () -> + uncheckIOExceptions(() -> { + throw raised; + })); + Assertions.assertThat(ex.getCause()) + .describedAs("Cause of %s", ex) + .isSameAs(raised); + } + + /** + * Verify that UncheckedIOEs are not double wrapped. + */ + @Test + public void testUncheckIOExceptionsUnchecked() throws Throwable { + final UncheckedIOException raised = new UncheckedIOException( + new IOException("text")); + final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () -> + uncheckIOExceptions(() -> { + throw raised; + })); + Assertions.assertThat(ex) + .describedAs("Propagated Exception %s", ex) + .isSameAs(raised); + } + + /** + * Supplier will also wrap IOEs. + */ + @Test + public void testUncheckedSupplier() throws Throwable { + intercept(UncheckedIOException.class, "text", () -> + toUncheckedIOExceptionSupplier(() -> { + throw new IOException("text"); + }).get()); + } + + /** + * The wrap/unwrap code which will be used to invoke operations + * through reflection. + */ + @Test + public void testUncheckAndExtract() throws Throwable { + final IOException raised = new IOException("text"); + final IOException ex = intercept(IOException.class, "text", () -> + extractIOExceptions(toUncheckedIOExceptionSupplier(() -> { + throw raised; + }))); + Assertions.assertThat(ex) + .describedAs("Propagated Exception %s", ex) + .isSameAs(raised); + } + +} From e9c38f5879c3664b07e3bc80b37db296b772f21d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 13 Jun 2024 12:48:18 +0100 Subject: [PATCH 2/2] HADOOP-19203. Style checking Change-Id: Ica826512b0efc4da0edd45de031116508198a5b2 --- .../apache/hadoop/util/functional/FunctionalIO.java | 12 +++++++++--- .../org/apache/hadoop/util/functional/FutureIO.java | 2 -- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java index 9ff243ef66ae1..6bc4a7103022d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java @@ -22,17 +22,23 @@ import java.io.UncheckedIOException; import java.util.function.Supplier; +import org.apache.hadoop.classification.InterfaceAudience; + /** * Functional utilities for IO operations. */ +@InterfaceAudience.Private public final class FunctionalIO { + private FunctionalIO() { + } + /** * Invoke any operation, wrapping IOExceptions with * {@code UncheckedIOException}. * @param call callable - * @return result * @param type of result + * @return result * @throws UncheckedIOException if an IOE was raised. */ public static T uncheckIOExceptions(CallableRaisingIOE call) { @@ -48,10 +54,10 @@ public static T uncheckIOExceptions(CallableRaisingIOE call) { * This is similar to {@link CommonCallableSupplier}, except that * only IOExceptions are caught and wrapped; all other exceptions are * propagated unchanged. - * * @param type of result */ private static final class UncheckedIOExceptionSupplier implements Supplier { + private final CallableRaisingIOE call; private UncheckedIOExceptionSupplier(CallableRaisingIOE call) { @@ -78,8 +84,8 @@ public static Supplier toUncheckedIOExceptionSupplier(CallableRaisingIOE< * Invoke the supplier, catching any {@code UncheckedIOException} raised, * extracting the inner IOException and rethrowing it. * @param call call to invoke - * @return result * @param type of result + * @return result * @throws IOException if the call raised an IOException wrapped by an UncheckedIOException. */ public static T extractIOExceptions(Supplier call) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java index bd8f2dbd46d6e..2f043b6499795 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java @@ -32,7 +32,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -355,5 +354,4 @@ public static CompletableFuture eval( } return result; } - }