Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.statistics;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Common statistic names for Filesystem-level statistics,
* including internals.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class FileSystemStatisticNames {

private FileSystemStatisticNames() {
}

/**
* How long did filesystem initialization take?
*/
public static final String FILESYSTEM_INITIALIZATION = "filesystem_initialization";

/**
* How long did filesystem close take?
*/
public static final String FILESYSTEM_CLOSE = "filesystem_close";

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ public final class StoreStatisticNames {
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";

/**
* How long did any store client creation take?
*/
public static final String STORE_CLIENT_CREATION = "store_client_creation";

/** Probe for store existing: {@value}. */
public static final String STORE_EXISTS_PROBE
= "store_exists_probe";
Expand All @@ -200,6 +205,7 @@ public final class StoreStatisticNames {
public static final String STORE_IO_RATE_LIMITED_DURATION
= "store_io_rate_limited_duration";


/**
* A store's equivalent of a paged LIST request was initiated: {@value}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,14 @@ public static <T> T uncheckIOExceptions(CallableRaisingIOE<T> call) {
}
}

/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* This is similar to {@link CommonCallableSupplier}, except that
* only IOExceptions are caught and wrapped; all other exceptions are
* propagated unchanged.
* @param <T> type of result
*/
private static final class UncheckedIOExceptionSupplier<T> implements Supplier<T> {

private final CallableRaisingIOE<T> call;

private UncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
this.call = call;
}

@Override
public T get() {
return uncheckIOExceptions(call);
}
}

/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* @param call call to wrap
* @param <T> type of result
* @return a supplier which invokes the call.
*/
public static <T> Supplier<T> toUncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
return new UncheckedIOExceptionSupplier<>(call);
return () -> uncheckIOExceptions(call);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Future IO Helper methods.
* <p>
Expand All @@ -62,7 +59,6 @@
@InterfaceStability.Unstable
public final class FutureIO {

private static final Logger LOG = LoggerFactory.getLogger(FutureIO.class.getName());
private FutureIO() {
}

Expand Down Expand Up @@ -129,7 +125,6 @@ public static <T> T awaitFuture(final Future<T> future,
* If any future throws an exception during its execution, this method
* extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param <T> type of the result.
* @return the list of future's result, if all went well.
Expand All @@ -140,19 +135,10 @@ public static <T> T awaitFuture(final Future<T> future,
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection)
throws InterruptedIOException, IOException, RuntimeException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get());
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
for (Future<T> future : collection) {
results.add(awaitFuture(future));
}
return results;
}

/**
Expand All @@ -163,7 +149,6 @@ public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection
* the timeout expires, whichever happens first. If any future throws an
* exception during its execution, this method extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param duration timeout duration
* @param <T> type of the result.
Expand All @@ -176,21 +161,12 @@ public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection,
final Duration duration)
throws InterruptedIOException, IOException, RuntimeException,
TimeoutException {
TimeoutException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS));
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
for (Future<T> future : collection) {
results.add(awaitFuture(future, duration.toMillis(), TimeUnit.MILLISECONDS));
}
return results;
}

/**
Expand All @@ -199,7 +175,6 @@ public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection
* This will always raise an exception, either the inner IOException,
* an inner RuntimeException, or a new IOException wrapping the raised
* exception.
*
* @param e exception.
* @param <T> type of return value.
* @return nothing, ever.
Expand Down Expand Up @@ -283,12 +258,11 @@ public static IOException unwrapInnerException(final Throwable e) {
* @param <U> type of builder
* @return the builder passed in.
*/
public static <T, U extends FSBuilder<T, U>>
FSBuilder<T, U> propagateOptions(
final FSBuilder<T, U> builder,
final Configuration conf,
final String optionalPrefix,
final String mandatoryPrefix) {
public static <T, U extends FSBuilder<T, U>> FSBuilder<T, U> propagateOptions(
final FSBuilder<T, U> builder,
final Configuration conf,
final String optionalPrefix,
final String mandatoryPrefix) {
propagateOptions(builder, conf,
optionalPrefix, false);
propagateOptions(builder, conf,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;

/**
* A lazily constructed reference, whose reference
* constructor is a {@link CallableRaisingIOE} so
* may raise IOExceptions.
* <p>
* This {@code constructor} is only invoked on demand
* when the reference is first needed,
* after which the same value is returned.
* @param <T> type of reference
*/
public class LazyAtomicReference<T> implements CallableRaisingIOE<T> {

/**
* Underlying reference.
*/
private final AtomicReference<T> reference = new AtomicReference<>();

/**
* Constructor for lazy creation.
*/
private final CallableRaisingIOE<? extends T> constructor;

/**
* Constructor for this instance.
* @param constructor method to invoke to actually construct the inner object.
*/
public LazyAtomicReference(final CallableRaisingIOE<? extends T> constructor) {
this.constructor = requireNonNull(constructor);
}

/**
* Getter for the constructor.
* @return the constructor class
*/
protected CallableRaisingIOE<? extends T> getConstructor() {
return constructor;
}

/**
* Get the reference.
* Subclasses working with this need to be careful working with this.
* @return the reference.
*/
protected AtomicReference<T> getReference() {
return reference;
}

/**
* Get the value, constructing it if needed.
* @return the value
* @throws IOException on any evaluation failure
*/
public final synchronized T get() throws IOException {
final T v = reference.get();
if (v != null) {
return v;
}
reference.set(constructor.apply());
return reference.get();
}

/**
* Invoke {@link #get()} and convert IOEs to
* UncheckedIOException.
* @return the value
* @throws UncheckedIOException if the constructor raised an IOException.
*/
public final T getUnchecked() throws UncheckedIOException {
return uncheckIOExceptions(this::get);
}

/**
* Is the reference set?
* @return true if the reference has been set.
*/
public final boolean isSet() {
return reference.get() != null;
}

/**
* Invoke {@link #get()}.
* @return the value
* @throws IOException on any evaluation failure
*/
@Override
public final T apply() throws IOException {
return get();
}

@Override
public String toString() {
return "LazyAtomicReference{" +
"reference=" + reference + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

/**
* A subclass of {@link LazyAtomicReference} which
* holds an {@code AutoCloseable} reference and calls {@code close()}
* when it itself is closed.
* @param <T> type of reference.
*/
public class LazyAutoCloseableReference<T extends AutoCloseable>
extends LazyAtomicReference<T> implements AutoCloseable {

/**
* Constructor for this instance.
* @param constructor method to invoke to actually construct the inner object.
*/
public LazyAutoCloseableReference(final CallableRaisingIOE<? extends T> constructor) {
super(constructor);
}

/**
* Close the reference value if it is non-null.
* Sets the reference to null afterwards, even on
* a failure.
* @throws Exception failure to close.
*/
@Override
public synchronized void close() throws Exception {
final T v = getReference().get();
if (v != null) {
try {
v.close();
} finally {
// set the reference to null, even on a failure.
getReference().set(null);
}
}
}
}
Loading