Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* A RocksDB based implementation of SCM Metadata Store.
*
*/
public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
public class SCMMetadataStoreImpl implements SCMMetadataStore {

private Table<Long, DeletedBlocksTransaction> deletedBlocksTable;

Expand All @@ -61,7 +61,7 @@ public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
private Table<PipelineID, Pipeline> pipelineTable;

private static final Logger LOG =
LoggerFactory.getLogger(SCMMetadataStoreRDBImpl.class);
LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
private DBStore store;
private final OzoneConfiguration configuration;
private final AtomicLong txID;
Expand All @@ -72,7 +72,7 @@ public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
* @param config - Ozone Configuration.
* @throws IOException - on Failure.
*/
public SCMMetadataStoreRDBImpl(OzoneConfiguration config)
public SCMMetadataStoreImpl(OzoneConfiguration config)
throws IOException {
this.configuration = config;
start(this.configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
Expand Down Expand Up @@ -503,7 +503,7 @@ private void initalizeMetadataStore(OzoneConfiguration conf,
if(configurator.getMetadataStore() != null) {
scmMetadataStore = configurator.getMetadataStore();
} else {
scmMetadataStore = new SCMMetadataStoreRDBImpl(conf);
scmMetadataStore = new SCMMetadataStoreImpl(conf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void setUp() throws Exception {
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();

scmMetadataStore = new SCMMetadataStoreRDBImpl(conf);
scmMetadataStore = new SCMMetadataStoreImpl(conf);
scmMetadataStore.start(conf);
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
Expand All @@ -61,7 +60,7 @@ public class TestCloseContainerEventHandler {
private static long size;
private static File testDir;
private static EventQueue eventQueue;
private static DBStore dbStore;
private static SCMMetadataStore scmMetadataStore;

@BeforeClass
public static void setUp() throws Exception {
Expand All @@ -75,22 +74,27 @@ public static void setUp() throws Exception {
configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 16);
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();
dbStore =
DBStoreBuilder.createDBStore(configuration, new SCMDBDefinition());
scmMetadataStore = new SCMMetadataStoreImpl(configuration);

pipelineManager =
new SCMPipelineManager(configuration, nodeManager,
SCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue);
scmMetadataStore.getPipelineTable(), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), configuration, eventQueue);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = new SCMContainerManager(configuration,
SCMDBDefinition.CONTAINERS.getTable(dbStore), dbStore, pipelineManager);
containerManager = new SCMContainerManager(
configuration,
scmMetadataStore.getContainerTable(),
scmMetadataStore.getStore(),
pipelineManager);
pipelineManager.triggerPipelineCreation();
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(pipelineManager, containerManager));
new CloseContainerEventHandler(
pipelineManager,
containerManager));
eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
// Move all pipelines created by background from ALLOCATED to OPEN state
Thread.sleep(2000);
Expand All @@ -105,8 +109,8 @@ public static void tearDown() throws Exception {
if (pipelineManager != null) {
pipelineManager.close();
}
if (dbStore != null) {
dbStore.close();
if (scmMetadataStore.getStore() != null) {
scmMetadataStore.getStore().close();
}
FileUtil.fullyDelete(testDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -92,14 +91,14 @@ public static void setUp() throws Exception {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 10);
DBStore dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(conf);
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue());
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
containerManager = new SCMContainerManager(conf,
SCMDBDefinition.CONTAINERS.getTable(dbStore),
dbStore,
scmMetadataStore.getContainerTable(),
scmMetadataStore.getStore(),
pipelineManager);
xceiverClientManager = new XceiverClientManager(conf);
replicationFactor = SCMTestUtils.getReplicationFactor(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,18 @@
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.PathUtils;

import org.apache.commons.io.IOUtils;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import org.junit.Before;
Expand All @@ -67,17 +64,16 @@ public class TestContainerPlacement {

@Rule
public ExpectedException thrown = ExpectedException.none();
private DBStore dbStore;
private SCMMetadataStore scmMetadataStore;

@Before
public void createDbStore() throws IOException {
dbStore =
DBStoreBuilder.createDBStore(getConf(), new SCMDBDefinition());
scmMetadataStore = new SCMMetadataStoreImpl(getConf());
}

@After
public void destroyDBStore() throws Exception {
dbStore.close();
scmMetadataStore.getStore().close();
}
/**
* Returns a new copy of Configuration.
Expand Down Expand Up @@ -120,9 +116,9 @@ SCMContainerManager createContainerManager(ConfigurationSource config,

PipelineManager pipelineManager =
new SCMPipelineManager(config, scmNodeManager,
PIPELINES.getTable(dbStore), eventQueue);
return new SCMContainerManager(config, CONTAINERS.getTable(dbStore),
dbStore,
scmMetadataStore.getPipelineTable(), eventQueue);
return new SCMContainerManager(config, scmMetadataStore.getContainerTable(),
scmMetadataStore.getStore(),
pipelineManager);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;

Expand All @@ -66,7 +65,7 @@ public class TestSCMPipelineManager {
private static MockNodeManager nodeManager;
private static File testDir;
private static OzoneConfiguration conf;
private DBStore store;
private static SCMMetadataStore scmMetadataStore;

@Before
public void setUp() throws Exception {
Expand All @@ -82,13 +81,12 @@ public void setUp() throws Exception {
}
nodeManager = new MockNodeManager(true, 20);

store = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());

scmMetadataStore = new SCMMetadataStoreImpl(conf);
}

@After
public void cleanup() throws Exception {
store.close();
scmMetadataStore.getStore().close();
FileUtil.fullyDelete(testDir);
}

Expand All @@ -97,7 +95,7 @@ public void testPipelineReload() throws IOException {
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf,
nodeManager,
SCMDBDefinition.PIPELINES.getTable(store),
scmMetadataStore.getPipelineTable(),
new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
Expand All @@ -119,7 +117,7 @@ public void testPipelineReload() throws IOException {
// new pipeline manager should be able to load the pipelines from the db
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
Expand Down Expand Up @@ -151,7 +149,7 @@ public void testPipelineReload() throws IOException {
public void testRemovePipeline() throws IOException {
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
Expand All @@ -171,7 +169,7 @@ public void testRemovePipeline() throws IOException {
// new pipeline manager should not be able to load removed pipelines
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
scmMetadataStore.getPipelineTable(), new EventQueue());
try {
pipelineManager.getPipeline(pipeline.getId());
fail("Pipeline should not have been retrieved");
Expand All @@ -188,7 +186,7 @@ public void testPipelineReport() throws IOException {
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
scmMetadataStore.getPipelineTable(), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
Expand Down Expand Up @@ -255,7 +253,7 @@ public void testPipelineCreationFailedMetric() throws Exception {
20);
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManagerMock,
SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManagerMock,
Expand Down Expand Up @@ -315,7 +313,7 @@ public void testPipelineCreationFailedMetric() throws Exception {
public void testActivateDeactivatePipeline() throws IOException {
final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
final PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
Expand Down Expand Up @@ -364,7 +362,7 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
scmMetadataStore.getPipelineTable(), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
Expand All @@ -381,7 +379,7 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
// new pipeline manager loads the pipelines from the db in ALLOCATED state
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
scmMetadataStore.getPipelineTable(), eventQueue);
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
Expand Down Expand Up @@ -427,7 +425,7 @@ public void testScrubPipeline() throws IOException {
EventQueue eventQueue = new EventQueue();
final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
scmMetadataStore.getPipelineTable(), eventQueue);
pipelineManager.allowPipelineCreation();
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
Expand Down Expand Up @@ -471,7 +469,7 @@ public void testPipelineNotCreatedUntilSafeModePrecheck()
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
scmMetadataStore.getPipelineTable(), eventQueue);
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
false);
Expand Down Expand Up @@ -517,7 +515,7 @@ public void testSafeModeUpdatedOnSafemodeExit()
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
scmMetadataStore.getPipelineTable(), eventQueue);
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
false);
Expand Down
Loading