Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;

import org.slf4j.Logger;
Expand All @@ -36,6 +39,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT;

Expand All @@ -48,15 +52,16 @@ public class ScmTopologyClient {
LoggerFactory.getLogger(ScmTopologyClient.class);

private final ScmBlockLocationProtocol scmBlockLocationProtocol;
private final AtomicReference<InnerNode> cache = new AtomicReference<>();
private final AtomicReference<NetworkTopology> cache =
new AtomicReference<>();
private ScheduledExecutorService executorService;

public ScmTopologyClient(
ScmBlockLocationProtocol scmBlockLocationProtocol) {
this.scmBlockLocationProtocol = scmBlockLocationProtocol;
}

public InnerNode getClusterTree() {
public NetworkTopology getClusterMap() {
return requireNonNull(cache.get(),
"ScmBlockLocationClient must have been initialized already.");
}
Expand All @@ -66,7 +71,10 @@ public void start(ConfigurationSource conf) throws IOException {
scmBlockLocationProtocol.getNetworkTopology();
LOG.info("Initial network topology fetched from SCM: {}.",
initialTopology);
cache.set(initialTopology);
cache.set(new NetworkTopologyImpl(conf.get(
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT),
initialTopology));
scheduleNetworkTopologyPoller(conf, Instant.now());
}

Expand Down Expand Up @@ -97,7 +105,7 @@ private void scheduleNetworkTopologyPoller(ConfigurationSource conf,

LOG.debug("Scheduling NetworkTopologyPoller with an initial delay of {}.",
initialDelay);
executorService.scheduleAtFixedRate(() -> checkAndRefresh(),
executorService.scheduleAtFixedRate(() -> checkAndRefresh(conf),
initialDelay.toMillis(), refreshDuration.toMillis(),
TimeUnit.MILLISECONDS);
}
Expand All @@ -110,18 +118,20 @@ public static Duration parseRefreshDuration(ConfigurationSource conf) {
return Duration.ofMillis(refreshDurationInMs);
}

private synchronized void checkAndRefresh() {
InnerNode current = cache.get();
private synchronized void checkAndRefresh(ConfigurationSource conf) {
InnerNode current = (InnerNode) cache.get().getNode(ROOT);
try {
InnerNode newTopology = scmBlockLocationProtocol.getNetworkTopology();
if (!newTopology.equals(current)) {
cache.set(newTopology);
LOG.info("Updated network topology cluster tree fetched from " +
"SCM: {}.", newTopology);
cache.set(new NetworkTopologyImpl(conf.get(
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT),
newTopology));
LOG.info("Updated network topology fetched from SCM: {}.", newTopology);
}
} catch (IOException e) {
throw new UncheckedIOException(
"Error fetching updated network topology cluster tree from SCM", e);
"Error fetching updated network topology from SCM", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.server.OzoneAdmins;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
Expand Down Expand Up @@ -1155,11 +1153,7 @@ public void setScmTopologyClient(
}

public NetworkTopology getClusterMap() {
InnerNode currentTree = scmTopologyClient.getClusterTree();
return new NetworkTopologyImpl(configuration.get(
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT),
currentTree);
return scmTopologyClient.getClusterMap();
}

/**
Expand Down