Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -326,15 +328,17 @@ CompletableFuture<MasterService.Interface> getMasterStub() {
}
}

});
}, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is the correct place to put these timeout configurations.

We should place them into the connection registry implementation.

HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT));
return future;
}, stub -> true, "master stub");
}, stub -> true, "master stub", conf);
}

String getClusterId() {
try {
return registry.getClusterId().get();
} catch (InterruptedException | ExecutionException e) {
return registry.getClusterId().get(conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Error fetching cluster ID: ", e);
}
return null;
Expand Down Expand Up @@ -447,7 +451,8 @@ public CompletableFuture<Hbck> getHbck() {
} else {
future.complete(getHbckInternal(sn));
}
});
}, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT));
return future;
}, "AsyncConnection.getHbck");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -56,9 +57,9 @@ class AsyncMetaRegionLocator {
* replicas. If we do not check the location for the given replica, we will always return the
* cached region locations and cause an infinite loop.
*/
CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload, Configuration conf) {
return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location", conf);
}

private HRegionLocation getCacheLocation(HRegionLocation loc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
.setName("AsyncRegionLocator.getRegionLocations").setTableName(tableName);
return tracedLocationFuture(() -> {
CompletableFuture<RegionLocations> future = isMeta(tableName)
? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
: nonMetaRegionLocator.getRegionLocations(tableName, row,
? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload,
conn.getConfiguration()) : nonMetaRegionLocator.getRegionLocations(tableName, row,
RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
Expand All @@ -160,7 +160,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
// Change it later if the meta table can have more than one regions.
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
CompletableFuture<RegionLocations> locsFuture = isMeta(tableName)
? metaRegionLocator.getRegionLocations(replicaId, reload)
? metaRegionLocator.getRegionLocations(replicaId, reload, conn.getConfiguration())
: nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
addListener(locsFuture, (locs, error) -> {
if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
Expand Down Expand Up @@ -360,7 +361,8 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
registry.close();
future.completeExceptionally(e);
}
});
}, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT));
return future;
}, "ConnectionFactory.createAsyncConnection");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ static int getPriority(TableName tableName) {

static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
AtomicReference<CompletableFuture<T>> futureRef, boolean reload,
Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) {
Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type, Configuration conf) {
for (;;) {
if (!reload) {
T value = cacheRef.get();
Expand Down Expand Up @@ -564,7 +564,8 @@ static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
cacheRef.set(value);
futureRef.set(null);
future.complete(value);
});
}, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT));
return future;
} else {
CompletableFuture<T> future = futureRef.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,8 @@ private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableN
} else {
future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
}
});
}, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT));
return future;
} else {
// For non-meta table, we fetch all locations by scanning hbase:meta table
Expand Down Expand Up @@ -1276,7 +1277,8 @@ private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily
future.complete(ret);
}
});
});
}, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT));
break;
case NORMAL:
addListener(getTableHRegionLocations(tableName), (locations, err) -> {
Expand Down Expand Up @@ -3254,7 +3256,8 @@ GetRegionInfoResponse> adminCall(controller, stub,
}
}
});
});
}, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT));
break;
case NORMAL:
addListener(getTableHRegionLocations(tableName), (locations, err) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ public static void setUp() throws IOException {

@Test(expected = DoNotRetryIOException.class)
public void test() throws IOException {
FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false));
FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false, CONF));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public enum OperationStatusCode {
public static final String ZK_CONNECTION_REGISTRY_CLASS =
"org.apache.hadoop.hbase.client.ZKConnectionRegistry";

public static final String CONNECTION_REGISTRY_API_TIMEOUT =
"hbase.connection.registry.api.timeout";

public static final int DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT = 120000;

/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ public static <T> void addListener(CompletableFuture<T> future,
});
}

public static <T> void addListener(CompletableFuture<T> future,
BiConsumer<? super T, ? super Throwable> action, long timeout) {
Throwable error = null;
T t = null;
try {
t = future.get(timeout, TimeUnit.MILLISECONDS);
} catch (Throwable throwable) {
error = throwable;
}
action.accept(t, unwrapCompletionException(error));
}

/**
* Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
* exception is that we will call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand All @@ -39,8 +41,10 @@ private ClusterConnectionFactory() {
}

private static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
ConnectionRegistry registry, SocketAddress localAddress, User user) throws IOException {
String clusterId = FutureUtils.get(registry.getClusterId());
ConnectionRegistry registry, SocketAddress localAddress, User user ) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

String clusterId = FutureUtils.get(registry.getClusterId(),
conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT,
HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT), TimeUnit.MILLISECONDS);
Class<? extends AsyncClusterConnection> clazz =
conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class,
AsyncClusterConnection.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) {
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId,
boolean reload) throws Exception {
return locator.getRegionLocations(replicaId, reload).get();
final Configuration conf = HBaseConfiguration.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this useless code?

return locator.getRegionLocations(replicaId, reload, conf).get();
}
});
} catch (Exception e) {
Expand Down