-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-17543. [ARR] AsyncUtil makes asynchronous code more concise and easier. #6868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
65c5bc4
async util & ut
KeeProMise 518b109
async util
KeeProMise 225106c
warp IOException & add javadoc
KeeProMise 7909041
Revert "warp IOException & add javadoc"
KeeProMise b44a672
Revert "Revert "warp IOException & add javadoc""
KeeProMise 1086f74
result.completeExceptionally(e.getCause()));
KeeProMise 4855cfe
test asyncThrowException
KeeProMise b2dcf36
add javadoc
KeeProMise 0b5bf8e
add FunctionalInterface to FinallyFunction
KeeProMise b5bf0a6
AsyncBiFunction
KeeProMise c5525a4
fix javadoc
KeeProMise a817aa1
feat AsyncForEachRun stack
KeeProMise e445380
inp CatchFunction & AsyncCatchFunction stack
KeeProMise 817d0ec
fix checkstyle
KeeProMise c6324e1
warp ex
0882f73
log use AsyncClass.class
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
89 changes: 89 additions & 0 deletions
89
...bf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hdfs.server.federation.router.async; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.Executor; | ||
|
|
||
| /** | ||
| * Represents a function that accepts a value of type T and produces a result of type R. | ||
| * This interface extends {@link Async} and provides methods to apply the function | ||
| * asynchronously using {@link CompletableFuture}. | ||
| * | ||
| * <p>ApplyFunction is used to implement the following semantics:</p> | ||
| * <pre> | ||
| * {@code | ||
| * T res = doAsync(input); | ||
| * // Can use ApplyFunction | ||
| * R result = thenApply(res); | ||
| * } | ||
| * </pre> | ||
| * | ||
| * @param <T> the type of the input to the function | ||
| * @param <R> the type of the result of the function | ||
| */ | ||
| @FunctionalInterface | ||
| public interface ApplyFunction<T, R> extends Async<R>{ | ||
|
|
||
| /** | ||
| * Applies this function to the given argument. | ||
| * | ||
| * @param t the function argument | ||
| * @return the function result | ||
| * @throws IOException if an I/O error occurs | ||
| */ | ||
| R apply(T t) throws IOException; | ||
|
|
||
| /** | ||
| * Applies this function asynchronously to the result of the given {@link CompletableFuture}. | ||
| * The function is executed on the same thread as the completion of the given future. | ||
| * | ||
| * @param in the input future | ||
| * @return a new future that holds the result of the function application | ||
| */ | ||
| default CompletableFuture<R> apply(CompletableFuture<T> in) { | ||
| return in.thenApply(t -> { | ||
| try { | ||
| return ApplyFunction.this.apply(t); | ||
| } catch (IOException e) { | ||
| throw new CompletionException(e); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Applies this function asynchronously to the result of the given {@link CompletableFuture}, | ||
| * using the specified executor for the asynchronous computation. | ||
| * | ||
| * @param in the input future | ||
| * @param executor the executor to use for the asynchronous computation | ||
| * @return a new future that holds the result of the function application | ||
| */ | ||
| default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) { | ||
| return in.thenApplyAsync(t -> { | ||
| try { | ||
| return ApplyFunction.this.apply(t); | ||
| } catch (IOException e) { | ||
| throw new CompletionException(e); | ||
| } | ||
| }, executor); | ||
| } | ||
|
|
||
| } | ||
85 changes: 85 additions & 0 deletions
85
...p-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hdfs.server.federation.router.async; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
|
|
||
| /** | ||
| * An interface for asynchronous operations, providing utility methods | ||
| * and constants related to asynchronous computations. | ||
| * | ||
| * @param <R> The type of the result of the asynchronous operation | ||
| */ | ||
| public interface Async<R> { | ||
|
|
||
| /** | ||
| * A thread-local variable to store the {@link CompletableFuture} instance for the current thread. | ||
| * <p> | ||
| * <b>Note:</b> After executing an asynchronous method, the thread stores the CompletableFuture | ||
| * of the asynchronous method in the thread's local variable | ||
| */ | ||
| ThreadLocal<CompletableFuture<Object>> CUR_COMPLETABLE_FUTURE | ||
| = new ThreadLocal<>(); | ||
|
|
||
| /** | ||
| * Sets the {@link CompletableFuture} instance for the current thread. | ||
| * | ||
| * @param completableFuture The {@link CompletableFuture} instance to be set | ||
| * @param <T> The type of the result in the CompletableFuture | ||
| */ | ||
| default <T> void setCurCompletableFuture(CompletableFuture<T> completableFuture) { | ||
| CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture); | ||
| } | ||
|
|
||
| /** | ||
| * Gets the {@link CompletableFuture} instance for the current thread. | ||
| * | ||
| * @return The {@link CompletableFuture} instance for the current thread, | ||
| * or {@code null} if not set | ||
| */ | ||
| default CompletableFuture<R> getCurCompletableFuture() { | ||
| return (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get(); | ||
| } | ||
|
|
||
| /** | ||
| * Blocks and retrieves the result of the {@link CompletableFuture} instance | ||
| * for the current thread. | ||
| * | ||
| * @return The result of the CompletableFuture, or {@code null} if the thread was interrupted | ||
| * @throws IOException If the completion exception to the CompletableFuture | ||
| * is an IOException or a subclass of it | ||
| */ | ||
| default R result() throws IOException { | ||
| try { | ||
| CompletableFuture<R> completableFuture = | ||
| (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get(); | ||
| assert completableFuture != null; | ||
| return completableFuture.get(); | ||
| } catch (InterruptedException e) { | ||
| return null; | ||
| } catch (ExecutionException e) { | ||
| Throwable cause = e.getCause(); | ||
| if (cause instanceof IOException) { | ||
| throw (IOException)cause; | ||
| } | ||
| throw new IOException(e); | ||
| } | ||
| } | ||
| } |
147 changes: 147 additions & 0 deletions
147
...c/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hdfs.server.federation.router.async; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.Executor; | ||
|
|
||
| /** | ||
| * The AsyncApplyFunction interface represents a function that | ||
| * asynchronously accepts a value of type T and produces a result | ||
| * of type R. This interface extends {@link ApplyFunction} and is | ||
| * designed to be used with asynchronous computation frameworks, | ||
| * such as Java's {@link java.util.concurrent.CompletableFuture}. | ||
| * | ||
| * <p>An implementation of this interface is expected to perform an | ||
| * asynchronous operation and return a result, which is typically | ||
| * represented as a {@code CompletableFuture<R>}. This allows for | ||
| * non-blocking execution of tasks and is particularly useful for | ||
| * I/O operations or any operation that may take a significant amount | ||
| * of time to complete.</p> | ||
| * | ||
| * <p>AsyncApplyFunction is used to implement the following semantics:</p> | ||
| * <pre> | ||
| * {@code | ||
| * T res = doAsync1(input); | ||
| * // Can use AsyncApplyFunction | ||
| * R result = doAsync2(res); | ||
| * } | ||
| * </pre> | ||
| * | ||
| * @param <T> the type of the input to the function | ||
| * @param <R> the type of the result of the function | ||
| * @see ApplyFunction | ||
| * @see java.util.concurrent.CompletableFuture | ||
| */ | ||
| @FunctionalInterface | ||
| public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> { | ||
|
|
||
| /** | ||
| * Asynchronously applies this function to the given argument. | ||
| * | ||
| * <p>This method is intended to initiate the function application | ||
| * without waiting for the result. It is typically used when the | ||
| * result of the operation is not required immediately or when the | ||
| * operation is part of a larger asynchronous workflow.</p> | ||
| * | ||
| * @param t the function argument | ||
| * @throws IOException if an I/O error occurs during the application | ||
| * of the function | ||
| */ | ||
| void applyAsync(T t) throws IOException; | ||
|
|
||
| /** | ||
| * Synchronously applies this function to the given argument and | ||
| * returns the result. | ||
| * | ||
| * <p>This method waits for the asynchronous operation to complete | ||
| * and returns its result. It is useful when the result is needed | ||
| * immediately and the calling code cannot proceed without it.</p> | ||
| * | ||
| * @param t the function argument | ||
| * @return the result of applying the function to the argument | ||
| * @throws IOException if an I/O error occurs during the application | ||
| * of the function | ||
| */ | ||
| @Override | ||
| default R apply(T t) throws IOException { | ||
| applyAsync(t); | ||
| return result(); | ||
| } | ||
|
|
||
|
|
||
| default CompletableFuture<R> async(T t) throws IOException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest to add some javadoc here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| applyAsync(t); | ||
| CompletableFuture<R> completableFuture = getCurCompletableFuture(); | ||
| assert completableFuture != null; | ||
| return completableFuture; | ||
| } | ||
|
|
||
| /** | ||
| * Asynchronously applies this function to the result of the given | ||
| * CompletableFuture. | ||
| * | ||
| * <p>This method chains the function application to the completion | ||
| * of the input future. It returns a new CompletableFuture that | ||
| * completes with the function's result when the input future | ||
| * completes.</p> | ||
| * | ||
| * @param in the input future | ||
| * @return a new CompletableFuture that holds the result of the | ||
| * function application | ||
| */ | ||
| @Override | ||
| default CompletableFuture<R> apply(CompletableFuture<T> in) { | ||
| return in.thenCompose(t -> { | ||
| try { | ||
| return async(t); | ||
| } catch (IOException e) { | ||
| throw new CompletionException(e); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Asynchronously applies this function to the result of the given | ||
| * CompletableFuture, using the specified executor for the | ||
| * asynchronous computation. | ||
| * | ||
| * <p>This method allows for more control over the execution | ||
| * context of the asynchronous operation, such as running the | ||
| * operation in a separate thread or thread pool.</p> | ||
| * | ||
| * @param in the input future | ||
| * @param executor the executor to use for the asynchronous | ||
| * computation | ||
| * @return a new CompletableFuture that holds the result of the | ||
| * function application | ||
| */ | ||
| @Override | ||
| default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) { | ||
| return in.thenComposeAsync(t -> { | ||
| try { | ||
| return async(t); | ||
| } catch (IOException e) { | ||
| throw new CompletionException(e); | ||
| } | ||
| }, executor); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the extra blank line.