getRatisRoles() throws IOException {
peer.getAddress().concat(isLocal ?
":".concat(RaftProtos.RaftPeerRole.LEADER.toString()) :
":".concat(RaftProtos.RaftPeerRole.FOLLOWER.toString()))
- .concat(":".concat(peer.getId().toString()))));
+ .concat(":".concat(peer.getId().toString()))
+ .concat(":".concat(peerInetAddress.getHostAddress()))));
}
return ratisRoles;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 3a471fbb0b06..38be88a9de1c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetExistContainerWithPipelinesInBatchRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetExistContainerWithPipelinesInBatchResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerCountResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetSafeModeRuleStatusesRequestProto;
@@ -397,6 +398,13 @@ public ScmContainerLocationResponse processRequest(
.setDatanodeUsageInfoResponse(getDatanodeUsageInfo(
request.getDatanodeUsageInfoRequest()))
.build();
+ case GetContainerCount:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setGetContainerCountResponse(getContainerCount(
+ request.getGetContainerCountRequest()))
+ .build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
@@ -831,4 +839,12 @@ public DatanodeUsageInfoResponseProto getDatanodeUsageInfo(
.build();
}
+ public GetContainerCountResponseProto getContainerCount(
+ StorageContainerLocationProtocolProtos.GetContainerCountRequestProto
+ request) throws IOException {
+
+ return GetContainerCountResponseProto.newBuilder()
+ .setContainerCount(impl.getContainerCount())
+ .build();
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 2c6bdd50b820..a763d33ddf57 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -945,6 +945,11 @@ public Token> getContainerToken(ContainerID containerID)
.generateToken(remoteUser.getUserName(), containerID);
}
+ @Override
+ public long getContainerCount() throws IOException {
+ return scm.getContainerManager().getContainers().size();
+ }
+
/**
* Queries a list of Node that match a set of statuses.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
index f7a07616ac6d..ae955ee016fb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
@@ -23,7 +23,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.server.http.BaseHttpServer;
import org.apache.hadoop.ozone.OzoneConsts;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
/**
* HttpServer2 wrapper for the Ozone Storage Container Manager.
@@ -34,7 +34,7 @@ public StorageContainerManagerHttpServer(MutableConfigurationSource conf,
StorageContainerManager scm)
throws IOException {
super(conf, "scm");
- addServlet("dbCheckpoint", OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT,
+ addServlet("dbCheckpoint", OZONE_DB_CHECKPOINT_HTTP_ENDPOINT,
SCMDBCheckpointServlet.class);
getWebAppContext().setAttribute(OzoneConsts.SCM_CONTEXT_ATTRIBUTE, scm);
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
index 71c51d8b0b74..818b23c18cdd 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
@@ -30,7 +30,7 @@
import java.net.InetSocketAddress;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
@@ -153,13 +153,13 @@ public String getOMDBCheckpointEnpointUrl(boolean isHttpPolicy) {
if (isHttpPolicy) {
if (StringUtils.isNotEmpty(getHttpAddress())) {
return "http://" + getHttpAddress() +
- OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT +
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
"?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
}
} else {
if (StringUtils.isNotEmpty(getHttpsAddress())) {
return "https://" + getHttpsAddress() +
- OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT +
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
"?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
}
}
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config
index e3fbb6a16ee8..31050a2355b7 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config
@@ -21,6 +21,7 @@ OZONE-SITE.XML_ozone.csi.socket=/tmp/csi.sock
OZONE-SITE.XML_ozone.om.address=om
OZONE-SITE.XML_ozone.om.http-address=om:9874
+OZONE-SITE.XML_ozone.scm.http-address=scm:9876
OZONE-SITE.XML_ozone.scm.container.size=1GB
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
index 5b2632d6ba84..fa38aad00dd1 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
@@ -38,6 +38,10 @@ OZONE-SITE.XML_ozone.datanode.pipeline.limit=1
OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
OZONE-SITE.XML_ozone.scm.primordial.node.id=scm1
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
+OZONE-SITE.XML_ozone.recon.address=recon:9891
+OZONE-SITE.XML_ozone.recon.http-address=0.0.0.0:9888
+OZONE-SITE.XML_ozone.recon.https-address=0.0.0.0:9889
OZONE_CONF_DIR=/etc/hadoop
OZONE_LOG_DIR=/var/log/hadoop
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index 41c8964baede..4b59e0ecf320 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -19,6 +19,7 @@ CORE-SITE.XML_fs.trash.interval=1
OZONE-SITE.XML_ozone.om.address=om
OZONE-SITE.XML_ozone.om.http-address=om:9874
+OZONE-SITE.XML_ozone.scm.http-address=scm:9876
OZONE-SITE.XML_ozone.scm.container.size=1GB
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config
index 942b45b7adae..9d739a9b5a96 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config
@@ -16,6 +16,7 @@
OZONE-SITE.XML_ozone.om.address=om
OZONE-SITE.XML_ozone.om.http-address=om:9874
+OZONE-SITE.XML_ozone.scm.http-address=scm:9876
OZONE-SITE.XML_ozone.scm.container.size=1GB
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index f34dd0a91446..8dc3ceb4283f 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -19,6 +19,7 @@ CORE-SITE.XML_fs.trash.interval=1
OZONE-SITE.XML_ozone.om.address=om
OZONE-SITE.XML_ozone.om.http-address=om:9874
+OZONE-SITE.XML_ozone.scm.http-address=scm:9876
OZONE-SITE.XML_ozone.scm.container.size=1GB
OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconScmHASnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconScmHASnapshot.java
new file mode 100644
index 000000000000..2067268da5dd
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconScmHASnapshot.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.recon;
+
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Recon SCM HA Snapshot Download implementation.
+ */
+public class TestReconScmHASnapshot {
+
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public Timeout timeout = Timeout.seconds(100);
+ private static OzoneConfiguration conf;
+ private static MiniOzoneCluster cluster;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.setBoolean(OZONE_SCM_HA_ENABLE_KEY, true);
+ conf.setBoolean(
+ ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED, true);
+ conf.setInt(ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD, 0);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 5);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(4)
+ .includeRecon(true)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @Test
+ public void testScmHASnapshot() throws Exception {
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(LoggerFactory.getLogger(
+ ReconStorageContainerManagerFacade.class));
+
+ List reconContainers = cluster.getReconServer()
+ .getReconStorageContainerManager().getContainerManager()
+ .getContainers();
+ assertEquals(0, reconContainers.size());
+
+ ReconNodeManager nodeManager;
+ nodeManager = (ReconNodeManager) cluster.getReconServer()
+ .getReconStorageContainerManager().getScmNodeManager();
+ long keyCountBefore = nodeManager.getNodeDBKeyCount();
+
+ //Stopping Recon to add Containers in SCM
+ cluster.stopRecon();
+
+ ContainerManager containerManager;
+ containerManager = cluster.getStorageContainerManager()
+ .getContainerManager();
+
+ for (int i = 0; i < 10; i++) {
+ containerManager.allocateContainer(new RatisReplicationConfig(
+ HddsProtos.ReplicationFactor.ONE), "testOwner");
+ }
+
+ cluster.startRecon();
+
+ //ContainerCount after Recon DB is updated with SCM DB
+ containerManager = cluster.getStorageContainerManager()
+ .getContainerManager();
+ ContainerManager reconContainerManager = cluster.getReconServer()
+ .getReconStorageContainerManager().getContainerManager();
+ assertTrue(logCapturer.getOutput()
+ .contains("Downloaded SCM Snapshot from Leader SCM"));
+ assertTrue(logCapturer.getOutput()
+ .contains("Recon Container Count: " + reconContainers.size() +
+ ", SCM Container Count: " + containerManager.getContainers().size()));
+ assertEquals(containerManager.getContainers().size(),
+ reconContainerManager.getContainers().size());
+
+ //PipelineCount after Recon DB is updated with SCM DB
+ PipelineManager scmPipelineManager = cluster.getStorageContainerManager()
+ .getPipelineManager();
+ PipelineManager reconPipelineManager = cluster.getReconServer()
+ .getReconStorageContainerManager().getPipelineManager();
+ assertEquals(scmPipelineManager.getPipelines().size(),
+ reconPipelineManager.getPipelines().size());
+
+ //NodeCount after Recon DB updated with SCM DB
+ nodeManager = (ReconNodeManager) cluster.getReconServer()
+ .getReconStorageContainerManager().getScmNodeManager();
+ long keyCountAfter = nodeManager.getNodeDBKeyCount();
+ assertEquals(keyCountAfter, keyCountBefore);
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconScmSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconScmSnapshot.java
new file mode 100644
index 000000000000..d1e02a73711d
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconScmSnapshot.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.recon;
+
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Recon SCM Snapshot Download implementation.
+ */
+public class TestReconScmSnapshot {
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public Timeout timeout = Timeout.seconds(100);
+ private static OzoneConfiguration conf;
+ private static MiniOzoneCluster cluster;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.setBoolean(
+ ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED, true);
+ conf.setInt(ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD, 0);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(4)
+ .includeRecon(true)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @Test
+ public void testScmSnapshot() throws Exception {
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(LoggerFactory.getLogger(
+ ReconStorageContainerManagerFacade.class));
+
+ List reconContainers = cluster.getReconServer()
+ .getReconStorageContainerManager().getContainerManager()
+ .getContainers();
+ assertEquals(0, reconContainers.size());
+
+ ReconNodeManager nodeManager;
+ nodeManager = (ReconNodeManager) cluster.getReconServer()
+ .getReconStorageContainerManager().getScmNodeManager();
+ long keyCountBefore = nodeManager.getNodeDBKeyCount();
+
+ //Stopping Recon to add Containers in SCM
+ cluster.stopRecon();
+
+ ContainerManager containerManager;
+ containerManager = cluster.getStorageContainerManager()
+ .getContainerManager();
+
+ for (int i = 0; i < 10; i++) {
+ containerManager.allocateContainer(new RatisReplicationConfig(
+ HddsProtos.ReplicationFactor.ONE), "testOwner");
+ }
+
+ cluster.startRecon();
+
+ //ContainerCount after Recon DB is updated with SCM DB
+ containerManager = cluster.getStorageContainerManager()
+ .getContainerManager();
+ ContainerManager reconContainerManager = cluster.getReconServer()
+ .getReconStorageContainerManager().getContainerManager();
+ assertTrue(logCapturer.getOutput()
+ .contains("Recon Container Count: " + reconContainers.size() +
+ ", SCM Container Count: " + containerManager.getContainers().size()));
+ assertEquals(containerManager.getContainers().size(),
+ reconContainerManager.getContainers().size());
+
+ //PipelineCount after Recon DB is updated with SCM DB
+ PipelineManager scmPipelineManager = cluster.getStorageContainerManager()
+ .getPipelineManager();
+ PipelineManager reconPipelineManager = cluster.getReconServer()
+ .getReconStorageContainerManager().getPipelineManager();
+ assertEquals(scmPipelineManager.getPipelines().size(),
+ reconPipelineManager.getPipelines().size());
+
+ //NodeCount after Recon DB updated with SCM DB
+ nodeManager = (ReconNodeManager) cluster.getReconServer()
+ .getReconStorageContainerManager().getScmNodeManager();
+ long keyCountAfter = nodeManager.getNodeDBKeyCount();
+ assertEquals(keyCountAfter, keyCountBefore);
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java
index 14ec9ff0cd19..1d3400077fd9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.recon;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
import java.util.HashMap;
import java.util.UUID;
@@ -114,7 +114,7 @@ public void testReconGetsSnapshotFromLeader() throws Exception {
String expectedUrl = "http://" +
(hostname.equals("0.0.0.0") ? "localhost" : hostname) + ":" +
ozoneManager.get().getHttpServer().getHttpAddress().getPort() +
- OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
String snapshotUrl = impl.getOzoneManagerSnapshotUrl();
Assert.assertEquals("OM Snapshot should be requested from the leader.",
expectedUrl, snapshotUrl);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
index f24c00d3d04a..4250e0f8fbdf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
@@ -23,7 +23,7 @@
import org.apache.hadoop.hdds.server.http.BaseHttpServer;
import org.apache.hadoop.ozone.OzoneConsts;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT;
/**
@@ -36,7 +36,7 @@ public OzoneManagerHttpServer(MutableConfigurationSource conf,
super(conf, "ozoneManager");
addServlet("serviceList", OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT,
ServiceListJSONServlet.class);
- addServlet("dbCheckpoint", OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT,
+ addServlet("dbCheckpoint", OZONE_DB_CHECKPOINT_HTTP_ENDPOINT,
OMDBCheckpointServlet.class);
getWebAppContext().setAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE, om);
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 2eeb3235af32..5a013dc9c935 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -41,6 +41,8 @@ private ReconConstants() {
public static final String CONTAINER_KEY_COUNT_TABLE =
"containerKeyCountTable";
+ public static final String RECON_SCM_SNAPSHOT_DB = "scm.snapshot.db";
+
// By default, limit the number of results returned
public static final String DEFAULT_FETCH_COUNT = "1000";
public static final String DEFAULT_BATCH_NUMBER = "1";
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index 66e7a65dbb90..4924cf9dab13 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -124,6 +124,22 @@ public final class ReconServerConfigKeys {
public static final String
OZONE_RECON_METRICS_HTTP_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "10s";
+ public static final String OZONE_RECON_SCM_CONTAINER_THRESHOLD =
+ "ozone.recon.scm.container.threshold";
+ public static final int OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT = 100;
+
+ public static final String OZONE_RECON_SCM_SNAPSHOT_ENABLED =
+ "ozone.recon.scm.snapshot.enabled";
+ public static final boolean OZONE_RECON_SCM_SNAPSHOT_ENABLED_DEFAULT = false;
+
+ public static final String OZONE_RECON_SCM_CONNECTION_TIMEOUT =
+ "ozone.recon.scm.connection.timeout";
+ public static final String OZONE_RECON_SCM_CONNECTION_TIMEOUT_DEFAULT = "5s";
+
+ public static final String OZONE_RECON_SCM_CONNECTION_REQUEST_TIMEOUT =
+ "ozone.recon.scm.connection.request.timeout";
+ public static final String
+ OZONE_RECON_SCM_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "5s";
/**
* Private constructor for utility class.
*/
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index b238278f5091..67c635516f1c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -81,6 +81,7 @@ public ContainerHealthTask(
public synchronized void run() {
try {
while (canRun()) {
+ wait(interval);
long start = Time.monotonicNow();
long currentTime = System.currentTimeMillis();
long existingCount = processExistingDBRecords(currentTime);
@@ -97,7 +98,6 @@ public synchronized void run() {
" processing {} containers.", Time.monotonicNow() - start,
containers.size());
processedContainers.clear();
- wait(interval);
}
} catch (Throwable t) {
LOG.error("Exception in Missing Container task Thread.", t);
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index 0f6d79570f06..41cdc7aa8e0a 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.UUID;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
@@ -294,4 +295,14 @@ private boolean needUpdate(DatanodeDetails datanodeDetails,
return currentTime - getLastHeartbeat(datanodeDetails) >=
reconDatanodeOutdatedTime;
}
+
+ public void reinitialize(Table nodeTable) {
+ this.nodeDB = nodeTable;
+ loadExistingNodes();
+ }
+
+ @VisibleForTesting
+ public long getNodeDBKeyCount() throws IOException {
+ return nodeDB.getEstimatedKeyCount();
+ }
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index d0b0d81efdd9..d41c2d766165 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -18,14 +18,18 @@
package org.apache.hadoop.ozone.recon.scm;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -40,6 +44,7 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
@@ -59,9 +64,15 @@
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
@@ -70,6 +81,8 @@
import com.google.inject.Inject;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,11 +103,11 @@ public class ReconStorageContainerManagerFacade
private final EventQueue eventQueue;
private final SCMContext scmContext;
private final SCMStorageConfig scmStorageConfig;
- private final DBStore dbStore;
private final SCMNodeDetails reconNodeDetails;
private final SCMHAManager scmhaManager;
private final SequenceIdGenerator sequenceIdGen;
+ private DBStore dbStore;
private ReconNodeManager nodeManager;
private ReconPipelineManager pipelineManager;
private ReconContainerManager containerManager;
@@ -249,7 +262,15 @@ public void start() {
"Recon ScmDatanodeProtocol RPC server",
getDatanodeProtocolServer().getDatanodeRpcAddress()));
}
- initializePipelinesFromScm();
+ boolean isSCMSnapshotEnabled = ozoneConfiguration.getBoolean(
+ ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED,
+ ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED_DEFAULT);
+ if(isSCMSnapshotEnabled) {
+ initializeSCMDB();
+ LOG.info("SCM DB initialized");
+ } else {
+ initializePipelinesFromScm();
+ }
getDatanodeProtocolServer().start();
this.reconScmTasks.forEach(ReconScmTask::start);
}
@@ -307,6 +328,117 @@ private void initializePipelinesFromScm() {
}
}
+ private void initializeSCMDB() {
+ try {
+ long scmContainersCount = scmServiceProvider.getContainerCount();
+ long reconContainerCount = containerManager.getContainers().size();
+ long threshold = ozoneConfiguration.getInt(
+ ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD,
+ ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT);
+
+ if(Math.abs(scmContainersCount - reconContainerCount) > threshold) {
+ LOG.info("Recon Container Count: {}, SCM Container Count: {}",
+ reconContainerCount, scmContainersCount);
+ updateReconSCMDBWithNewSnapshot();
+ LOG.info("Updated Recon DB with SCM DB");
+ } else {
+ initializePipelinesFromScm();
+ }
+ } catch (IOException e) {
+ LOG.error("Exception encountered while getting SCM DB.");
+ }
+ }
+
+ public void updateReconSCMDBWithNewSnapshot() throws IOException {
+ DBCheckpoint dbSnapshot;
+ if(!SCMHAUtils.isSCMHAEnabled(ozoneConfiguration)) {
+ dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+ } else {
+ dbSnapshot = scmServiceProvider.getSCMDBSnapshotHA();
+ LOG.info("Downloaded SCM Snapshot from Leader SCM");
+ }
+ if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+ LOG.info("Got new checkpoint from SCM : " +
+ dbSnapshot.getCheckpointLocation());
+ try {
+ initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ } catch (IOException e) {
+ LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ }
+ } else {
+ LOG.error("Null snapshot location got from SCM.");
+ }
+ }
+
+ private void deleteOldSCMDB() throws IOException {
+ if (dbStore != null) {
+ File oldDBLocation = dbStore.getDbLocation();
+ if (oldDBLocation.exists()) {
+ LOG.info("Cleaning up old SCM snapshot db at {}.",
+ oldDBLocation.getAbsolutePath());
+ FileUtils.deleteDirectory(oldDBLocation);
+ }
+ }
+ }
+
+ private void initializeNewRdbStore(File dbFile) throws IOException {
+ try {
+ DBStore newStore = createDBAndAddSCMTablesAndCodecs(
+ dbFile, new ReconSCMDBDefinition());
+ Table nodeTable =
+ ReconSCMDBDefinition.NODES.getTable(dbStore);
+ Table newNodeTable =
+ ReconSCMDBDefinition.NODES.getTable(newStore);
+ TableIterator> iterator = nodeTable.iterator();
+ while (iterator.hasNext()) {
+ KeyValue keyValue = iterator.next();
+ newNodeTable.put(keyValue.getKey(), keyValue.getValue());
+ }
+ sequenceIdGen.reinitialize(
+ ReconSCMDBDefinition.SEQUENCE_ID.getTable(newStore));
+ pipelineManager.reinitialize(
+ ReconSCMDBDefinition.PIPELINES.getTable(newStore));
+ containerManager.reinitialize(
+ ReconSCMDBDefinition.CONTAINERS.getTable(newStore));
+ nodeManager.reinitialize(
+ ReconSCMDBDefinition.NODES.getTable(newStore));
+ deleteOldSCMDB();
+ setDbStore(newStore);
+ File newDb = new File(dbFile.getParent() +
+ OZONE_URI_DELIMITER + ReconSCMDBDefinition.RECON_SCM_DB_NAME);
+ boolean success = dbFile.renameTo(newDb);
+ if (success) {
+ LOG.info("SCM snapshot linked to Recon DB.");
+ }
+ LOG.info("Created SCM DB handle from snapshot at {}.",
+ dbFile.getAbsolutePath());
+ } catch (IOException ioEx) {
+ LOG.error("Unable to initialize Recon SCM DB snapshot store.", ioEx);
+ }
+ }
+
+ private DBStore createDBAndAddSCMTablesAndCodecs(File dbFile,
+ ReconSCMDBDefinition definition) throws IOException {
+ DBStoreBuilder dbStoreBuilder =
+ DBStoreBuilder.newBuilder(ozoneConfiguration)
+ .setName(dbFile.getName())
+ .setPath(dbFile.toPath().getParent());
+ for (DBColumnFamilyDefinition columnFamily :
+ definition.getColumnFamilies()) {
+ dbStoreBuilder.addTable(columnFamily.getName());
+ dbStoreBuilder.addCodec(columnFamily.getKeyType(),
+ columnFamily.getKeyCodec());
+ dbStoreBuilder.addCodec(columnFamily.getValueType(),
+ columnFamily.getValueCodec());
+ }
+ return dbStoreBuilder.build();
+ }
+
+ public void setDbStore(DBStore dbStore) {
+ this.dbStore = dbStore;
+ }
+
@Override
public NodeManager getScmNodeManager() {
return nodeManager;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
index be2c7cb6e7b0..a5cb4f29e2ad 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
/**
* Interface to access SCM endpoints.
@@ -66,4 +67,21 @@ List getExistContainerWithPipelinesInBatch(
*/
List getNodes() throws IOException;
+ /**
+ * Requests SCM for container count.
+ * @return Total number of containers in SCM.
+ */
+ long getContainerCount() throws IOException;
+
+ /**
+ * Requests SCM for DB Snapshot.
+ * @return DBCheckpoint from SCM.
+ */
+ DBCheckpoint getSCMDBSnapshot();
+
+ /**
+ * Requests Leader SCM for DB Snapshot.
+ * @return DBCheckpoint from Leader SCM.
+ */
+ DBCheckpoint getSCMDBSnapshotHA() throws IOException;
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 96263e0882b4..021d0147a018 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -58,7 +58,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_SNAPSHOT_DB;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
@@ -151,11 +151,11 @@ public OzoneManagerServiceProviderImpl(
HttpConfig.Policy policy = HttpConfig.getHttpPolicy(configuration);
omDBSnapshotUrl = "http://" + ozoneManagerHttpAddress +
- OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
if (policy.isHttpsEnabled()) {
omDBSnapshotUrl = "https://" + ozoneManagerHttpsAddress +
- OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
}
boolean flushParam = configuration.getBoolean(
@@ -271,7 +271,7 @@ public String getOzoneManagerSnapshotUrl() throws IOException {
omLeaderUrl = (policy.isHttpsEnabled() ?
"https://" + info.getServiceAddress(Type.HTTPS) :
"http://" + info.getServiceAddress(Type.HTTP)) +
- OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
}
}
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
index 1e609e80720f..398c9819f368 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
@@ -18,18 +18,47 @@
package org.apache.hadoop.ozone.recon.spi.impl;
+import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_AUTH_TYPE;
import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_SCM_SNAPSHOT_DB;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONNECTION_REQUEST_TIMEOUT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONNECTION_TIMEOUT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONNECTION_TIMEOUT_DEFAULT;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.ha.InterSCMGrpcClient;
+import org.apache.hadoop.hdds.scm.ha.SCMSnapshotDownloader;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
+import org.apache.hadoop.hdds.server.http.HttpConfig;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.ratis.proto.RaftProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation for StorageContainerServiceProvider that talks with actual
@@ -38,12 +67,53 @@
public class StorageContainerServiceProviderImpl
implements StorageContainerServiceProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StorageContainerServiceProviderImpl.class);
private StorageContainerLocationProtocol scmClient;
+ private final OzoneConfiguration configuration;
+ private String scmDBSnapshotUrl;
+ private File scmSnapshotDBParentDir;
+ private URLConnectionFactory connectionFactory;
+ private ReconUtils reconUtils;
@Inject
public StorageContainerServiceProviderImpl(
- StorageContainerLocationProtocol scmClient) {
+ StorageContainerLocationProtocol scmClient,
+ ReconUtils reconUtils,
+ OzoneConfiguration configuration) {
+
+ int connectionTimeout = (int) configuration.getTimeDuration(
+ OZONE_RECON_SCM_CONNECTION_TIMEOUT,
+ OZONE_RECON_SCM_CONNECTION_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+ int connectionRequestTimeout = (int) configuration.getTimeDuration(
+ OZONE_RECON_SCM_CONNECTION_REQUEST_TIMEOUT,
+ OZONE_RECON_SCM_CONNECTION_REQUEST_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ connectionFactory =
+ URLConnectionFactory.newDefaultURLConnectionFactory(connectionTimeout,
+ connectionRequestTimeout, configuration);
+
+ String scmHttpAddress = configuration.get(ScmConfigKeys
+ .OZONE_SCM_HTTP_ADDRESS_KEY);
+
+ String scmHttpsAddress = configuration.get(ScmConfigKeys
+ .OZONE_SCM_HTTPS_ADDRESS_KEY);
+
+ HttpConfig.Policy policy = HttpConfig.getHttpPolicy(configuration);
+
+ scmSnapshotDBParentDir = ReconUtils.getReconScmDbDir(configuration);
+
+ scmDBSnapshotUrl = "http://" + scmHttpAddress +
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
+
+ if (policy.isHttpsEnabled()) {
+ scmDBSnapshotUrl = "https://" + scmHttpsAddress +
+ OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
+ }
+
+ this.reconUtils = reconUtils;
this.scmClient = scmClient;
+ this.configuration = configuration;
}
@Override
@@ -74,4 +144,79 @@ public List getNodes() throws IOException {
return scmClient.queryNode(null, null, HddsProtos.QueryScope.CLUSTER,
"", CURRENT_VERSION);
}
+
+ @Override
+ public long getContainerCount() throws IOException {
+ return scmClient.getContainerCount();
+ }
+
+ public String getScmDBSnapshotUrl() {
+ return scmDBSnapshotUrl;
+ }
+
+ private boolean isOmSpnegoEnabled() {
+ return configuration.get(HDDS_SCM_HTTP_AUTH_TYPE, "simple")
+ .equals("kerberos");
+ }
+
+ public DBCheckpoint getSCMDBSnapshot() {
+ String snapshotFileName = RECON_SCM_SNAPSHOT_DB + "_" +
+ System.currentTimeMillis();
+ File targetFile = new File(scmSnapshotDBParentDir, snapshotFileName +
+ ".tar.gz");
+
+ try {
+ SecurityUtil.doAsLoginUser(() -> {
+ try (InputStream inputStream = reconUtils.makeHttpCall(
+ connectionFactory, getScmDBSnapshotUrl(),
+ isOmSpnegoEnabled()).getInputStream()) {
+ FileUtils.copyInputStreamToFile(inputStream, targetFile);
+ }
+ return null;
+ });
+
+ Path untarredDbDir = Paths.get(scmSnapshotDBParentDir.getAbsolutePath(),
+ snapshotFileName);
+ reconUtils.untarCheckpointFile(targetFile, untarredDbDir);
+ FileUtils.deleteQuietly(targetFile);
+ return new RocksDBCheckpoint(untarredDbDir);
+ } catch (IOException e) {
+ LOG.error("Unable to obtain SCM DB Snapshot. ", e);
+ }
+ return null;
+ }
+
+ public DBCheckpoint getSCMDBSnapshotHA() throws IOException {
+ String snapshotFileName = RECON_SCM_SNAPSHOT_DB + "_" +
+ System.currentTimeMillis();
+ File targetFile = new File(scmSnapshotDBParentDir, snapshotFileName +
+ ".tar.gz");
+
+ List ratisRoles = scmClient.getScmInfo().getRatisPeerRoles();
+ for (String ratisRole: ratisRoles) {
+ String[] role = ratisRole.split(":");
+ if(role[2].equals(RaftProtos.RaftPeerRole.LEADER.toString())) {
+ String hostAddress = role[4].trim();
+ int grpcPort = configuration.getInt(
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
+
+ try (SCMSnapshotDownloader downloadClient =
+ new InterSCMGrpcClient(hostAddress, grpcPort,
+ configuration, new SCMCertificateClient(
+ new SecurityConfig(configuration)))) {
+ downloadClient.download(targetFile.toPath()).get();
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Rocks DB checkpoint downloading failed", e);
+ throw new IOException(e);
+ }
+ break;
+ }
+ }
+ Path untarredDbDir = Paths.get(scmSnapshotDBParentDir.getAbsolutePath(),
+ snapshotFileName);
+ reconUtils.untarCheckpointFile(targetFile, untarredDbDir);
+ FileUtils.deleteQuietly(targetFile);
+ return new RocksDBCheckpoint(untarredDbDir);
+ }
}
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestStorageContainerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestStorageContainerServiceProviderImpl.java
index 71c9bc4f0216..f9d3fc41fc22 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestStorageContainerServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestStorageContainerServiceProviderImpl.java
@@ -24,13 +24,18 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.IOException;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -55,6 +60,10 @@ protected void configure() {
try {
StorageContainerLocationProtocol mockScmClient = mock(
StorageContainerLocationProtocol.class);
+ ReconUtils reconUtils = new ReconUtils();
+ File testDir = GenericTestUtils.getRandomizedTestDir();
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
pipelineID = PipelineID.randomId().getProtobuf();
when(mockScmClient.getPipeline(pipelineID))
.thenReturn(mock(Pipeline.class));
@@ -62,6 +71,9 @@ protected void configure() {
.toInstance(mockScmClient);
bind(StorageContainerServiceProvider.class)
.to(StorageContainerServiceProviderImpl.class);
+ bind(OzoneConfiguration.class).
+ toInstance(conf);
+ bind(ReconUtils.class).toInstance(reconUtils);
} catch (Exception e) {
Assert.fail();
}