diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java index 2e42df957346..5e33eefde6c5 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java @@ -19,7 +19,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.slf4j.Logger; @@ -36,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT; @@ -48,7 +52,8 @@ public class ScmTopologyClient { LoggerFactory.getLogger(ScmTopologyClient.class); private final ScmBlockLocationProtocol scmBlockLocationProtocol; - private final AtomicReference cache = new AtomicReference<>(); + private final AtomicReference cache = + new AtomicReference<>(); private ScheduledExecutorService executorService; public ScmTopologyClient( @@ -56,7 +61,7 @@ public ScmTopologyClient( this.scmBlockLocationProtocol = scmBlockLocationProtocol; } - public InnerNode getClusterTree() { + public NetworkTopology getClusterMap() { return requireNonNull(cache.get(), "ScmBlockLocationClient must have been initialized already."); } @@ -66,7 +71,10 @@ public void start(ConfigurationSource conf) throws IOException { scmBlockLocationProtocol.getNetworkTopology(); LOG.info("Initial network topology fetched from SCM: {}.", initialTopology); - cache.set(initialTopology); + cache.set(new NetworkTopologyImpl(conf.get( + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT), + initialTopology)); scheduleNetworkTopologyPoller(conf, Instant.now()); } @@ -97,7 +105,7 @@ private void scheduleNetworkTopologyPoller(ConfigurationSource conf, LOG.debug("Scheduling NetworkTopologyPoller with an initial delay of {}.", initialDelay); - executorService.scheduleAtFixedRate(() -> checkAndRefresh(), + executorService.scheduleAtFixedRate(() -> checkAndRefresh(conf), initialDelay.toMillis(), refreshDuration.toMillis(), TimeUnit.MILLISECONDS); } @@ -110,18 +118,20 @@ public static Duration parseRefreshDuration(ConfigurationSource conf) { return Duration.ofMillis(refreshDurationInMs); } - private synchronized void checkAndRefresh() { - InnerNode current = cache.get(); + private synchronized void checkAndRefresh(ConfigurationSource conf) { + InnerNode current = (InnerNode) cache.get().getNode(ROOT); try { InnerNode newTopology = scmBlockLocationProtocol.getNetworkTopology(); if (!newTopology.equals(current)) { - cache.set(newTopology); - LOG.info("Updated network topology cluster tree fetched from " + - "SCM: {}.", newTopology); + cache.set(new NetworkTopologyImpl(conf.get( + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT), + newTopology)); + LOG.info("Updated network topology fetched from SCM: {}.", newTopology); } } catch (IOException e) { throw new UncheckedIOException( - "Error fetching updated network topology cluster tree from SCM", e); + "Error fetching updated network topology from SCM", e); } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 52299c820ab9..680ebe063156 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -84,9 +84,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.client.ScmTopologyClient; import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; -import org.apache.hadoop.hdds.scm.net.InnerNode; import org.apache.hadoop.hdds.scm.net.NetworkTopology; -import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.server.OzoneAdmins; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.Table.KeyValue; @@ -1155,11 +1153,7 @@ public void setScmTopologyClient( } public NetworkTopology getClusterMap() { - InnerNode currentTree = scmTopologyClient.getClusterTree(); - return new NetworkTopologyImpl(configuration.get( - ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, - ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT), - currentTree); + return scmTopologyClient.getClusterMap(); } /**