Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -144,23 +143,20 @@ protected String featureValueOf(SpanData item) {
};
}

public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
final Matcher<StatusCode> matcher = is(equalTo(statusCode));
return new TypeSafeMatcher<SpanData>() {
@Override
protected boolean matchesSafely(SpanData item) {
final StatusData statusData = item.getStatus();
return statusData != null && statusData.getStatusCode() != null
&& matcher.matches(statusData.getStatusCode());
}

public static Matcher<SpanData> hasStatusWithCode(Matcher<StatusCode> matcher) {
return new FeatureMatcher<SpanData, StatusCode>(matcher, "SpanData with StatusCode that",
"statusWithCode") {
@Override
public void describeTo(Description description) {
description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
protected StatusCode featureValueOf(SpanData actual) {
return actual.getStatus().getStatusCode();
}
};
}

public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
return hasStatusWithCode(is(equalTo(statusCode)));
}

public static Matcher<SpanData> hasTraceId(String traceId) {
return hasTraceId(is(equalTo(traceId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -163,8 +164,9 @@ public boolean scheduleChore(ScheduledChore chore) {
chore.getChoreService().cancelChore(chore);
}
chore.setChoreService(this);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
chore.getPeriod(), chore.getTimeUnit());
ScheduledFuture<?> future =
scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()),
chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit());
scheduledChores.put(chore, future);
return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ private static void endSpan(CompletableFuture<?> future, Span span) {
});
}

/**
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
*/
public static Runnable tracedRunnable(final Runnable runnable, final String spanName) {
return tracedRunnable(runnable, () -> createSpan(spanName));
}

/**
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
*/
public static Runnable tracedRunnable(final Runnable runnable,
final Supplier<Span> spanSupplier) {
// N.B. This method name follows the convention of this class, i.e., tracedFuture, rather than
// the convention of the OpenTelemetry classes, i.e., Context#wrap.
return () -> {
final Span span = spanSupplier.get();
try (final Scope ignored = span.makeCurrent()) {
runnable.run();
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
};
}

/**
* A {@link Runnable} that may also throw.
* @param <T> the type of {@link Throwable} that can be produced.
Expand All @@ -144,11 +169,17 @@ public interface ThrowingRunnable<T extends Throwable> {
void run() throws T;
}

/**
* Trace the execution of {@code runnable}.
*/
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
final String spanName) throws T {
trace(runnable, () -> createSpan(spanName));
}

/**
* Trace the execution of {@code runnable}.
*/
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
final Supplier<Span> spanSupplier) throws T {
Span span = spanSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;

import com.google.errorprone.annotations.RestrictedApi;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.lang.management.MemoryType;
import java.net.BindException;
Expand Down Expand Up @@ -57,6 +60,7 @@
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -231,60 +235,68 @@ protected final void initializeFileSystem() throws IOException {
!canUpdateTableDescriptor(), cacheTableDescriptor());
}

public HBaseServerBase(Configuration conf, String name)
throws ZooKeeperConnectionException, IOException {
public HBaseServerBase(Configuration conf, String name) throws IOException {
super(name); // thread name
this.conf = conf;
this.eventLoopGroupConfig =
NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup");
this.startcode = EnvironmentEdgeManager.currentTime();
this.userProvider = UserProvider.instantiate(conf);
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
this.namedQueueRecorder = createNamedQueueRecord();
this.rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
InetSocketAddress addr = rpcServices.getSocketAddress();
String hostName = StringUtils.isBlank(useThisHostnameInstead)
? addr.getHostName()
: this.useThisHostnameInstead;
serverName = ServerName.valueOf(hostName, addr.getPort(), this.startcode);
// login the zookeeper client principal (if using security)
ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
// login the server principal (if using secure Hadoop)
login(userProvider, hostName);
// init superusers and add the server principal (if using security)
// or process owner as default super user.
Superusers.initialize(conf);
zooKeeper =
new ZKWatcher(conf, getProcessName() + ":" + addr.getPort(), this, canCreateBaseZNode());

this.configurationManager = new ConfigurationManager();
setupWindows(conf, configurationManager);

initializeFileSystem();

this.choreService = new ChoreService(getName(), true);
this.executorService = new ExecutorService(getName());

this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);

if (clusterMode()) {
if (
conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
csm = new ZkCoordinatedStateManager(this);
final Span span = TraceUtil.createSpan("HBaseServerBase.cxtor");
try (Scope ignored = span.makeCurrent()) {
this.conf = conf;
this.eventLoopGroupConfig =
NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup");
this.startcode = EnvironmentEdgeManager.currentTime();
this.userProvider = UserProvider.instantiate(conf);
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
this.namedQueueRecorder = createNamedQueueRecord();
this.rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
InetSocketAddress addr = rpcServices.getSocketAddress();
String hostName = StringUtils.isBlank(useThisHostnameInstead)
? addr.getHostName()
: this.useThisHostnameInstead;
serverName = ServerName.valueOf(hostName, addr.getPort(), this.startcode);
// login the zookeeper client principal (if using security)
ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
// login the server principal (if using secure Hadoop)
login(userProvider, hostName);
// init superusers and add the server principal (if using security)
// or process owner as default super user.
Superusers.initialize(conf);
zooKeeper =
new ZKWatcher(conf, getProcessName() + ":" + addr.getPort(), this, canCreateBaseZNode());

this.configurationManager = new ConfigurationManager();
setupWindows(conf, configurationManager);

initializeFileSystem();

this.choreService = new ChoreService(getName(), true);
this.executorService = new ExecutorService(getName());

this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);

if (clusterMode()) {
if (
conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
csm = new ZkCoordinatedStateManager(this);
} else {
csm = null;
}
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
} else {
csm = null;
clusterStatusTracker = null;
}
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
} else {
csm = null;
clusterStatusTracker = null;
putUpWebUI();
span.setStatus(StatusCode.OK);
} catch (Throwable t) {
TraceUtil.setError(span, t);
throw t;
} finally {
span.end();
}
putUpWebUI();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
Expand Down Expand Up @@ -101,41 +102,43 @@ public MetaRegionLocationCache(ZKWatcher zkWatcher) {
* @param retryCounter controls the number of retries and sleep between retries.
*/
private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
List<String> znodes = null;
while (retryCounter.shouldRetry()) {
try {
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
break;
} catch (KeeperException ke) {
LOG.debug("Error populating initial meta locations", ke);
if (!retryCounter.shouldRetry()) {
// Retries exhausted and watchers not set. This is not a desirable state since the cache
// could remain stale forever. Propagate the exception.
watcher.abort("Error populating meta locations", ke);
return;
}
TraceUtil.trace(() -> {
List<String> znodes = null;
while (retryCounter.shouldRetry()) {
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
break;
} catch (KeeperException ke) {
LOG.debug("Error populating initial meta locations", ke);
if (!retryCounter.shouldRetry()) {
// Retries exhausted and watchers not set. This is not a desirable state since the cache
// could remain stale forever. Propagate the exception.
watcher.abort("Error populating meta locations", ke);
return;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
}
}
}
}
if (znodes == null || znodes.isEmpty()) {
// No meta znodes exist at this point but we registered a watcher on the base znode to listen
// for updates. They will be handled via nodeChildrenChanged().
return;
}
if (znodes.size() == cachedMetaLocations.size()) {
// No new meta znodes got added.
return;
}
for (String znode : znodes) {
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
updateMetaLocation(path, opType);
}
if (znodes == null || znodes.isEmpty()) {
// No meta znodes exist at this point but we registered a watcher on the base znode to
// listen for updates. They will be handled via nodeChildrenChanged().
return;
}
if (znodes.size() == cachedMetaLocations.size()) {
// No new meta znodes got added.
return;
}
for (String znode : znodes) {
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
updateMetaLocation(path, opType);
}
}, "MetaRegionLocationCache.loadMetaLocationsFromZk");
}

/**
Expand Down
Loading