diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 91684cea455c..c0ace46b4aac 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -110,6 +111,7 @@ public ContainerManagerImpl( .setPipelineManager(pipelineManager) .setRatisServer(scmHaManager.getRatisServer()) .setContainerStore(containerStore) + .setSCMDBTransactionBuffer(scmHaManager.getDBTransactionBuffer()) .build(); this.numContainerPerVolume = conf @@ -117,8 +119,6 @@ public ContainerManagerImpl( ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT); this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); - - } @Override @@ -397,4 +397,9 @@ public void close() throws IOException { protected ContainerStateManagerV2 getContainerStateManager() { return containerStateManager; } + + @VisibleForTesting + public SCMHAManager getSCMHAManager() { + return haManager; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index 68e4d357e454..c26afef6ed41 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.ha.CheckedConsumer; +import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.ExecutionUtil; import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; @@ -99,7 +100,9 @@ public final class ContainerStateManagerImpl /** * Persistent store for Container States. */ - private final Table containerStore; + private Table containerStore; + + private final DBTransactionBuffer transactionBuffer; /** * PipelineManager instance. @@ -135,7 +138,8 @@ public final class ContainerStateManagerImpl */ private ContainerStateManagerImpl(final Configuration conf, final PipelineManager pipelineManager, - final Table containerStore) + final Table containerStore, + final DBTransactionBuffer buffer) throws IOException { this.pipelineManager = pipelineManager; this.containerStore = containerStore; @@ -144,6 +148,7 @@ private ContainerStateManagerImpl(final Configuration conf, this.containers = new ContainerStateMap(); this.lastUsedMap = new ConcurrentHashMap<>(); this.containerStateChangeActions = getContainerStateChangeActions(); + this.transactionBuffer = buffer; initialize(); } @@ -298,12 +303,16 @@ public void addContainer(final ContainerInfoProto containerInfo) try { if (!containers.contains(containerID)) { ExecutionUtil.create(() -> { - containerStore.put(containerID, container); + containerStore.putWithBatch( + transactionBuffer.getCurrentBatchOperation(), + containerID, container); containers.addContainer(container); pipelineManager.addContainerToPipeline(pipelineID, containerID); }).onException(() -> { containers.removeContainer(containerID); - containerStore.delete(containerID); + containerStore.deleteWithBatch( + transactionBuffer.getCurrentBatchOperation(), + containerID); }).execute(); } } finally { @@ -339,13 +348,17 @@ public void updateContainerState(final HddsProtos.ContainerID containerID, if (newState.getNumber() > oldState.getNumber()) { ExecutionUtil.create(() -> { containers.updateState(id, oldState, newState); - containerStore.put(id, containers.getContainerInfo(id)); + containerStore.putWithBatch( + transactionBuffer.getCurrentBatchOperation(), + id, containers.getContainerInfo(id)); }).onException(() -> { - containerStore.put(id, oldInfo); + containerStore.putWithBatch( + transactionBuffer.getCurrentBatchOperation(), + id, oldInfo); containers.updateState(id, newState, oldState); }).execute(); - containerStateChangeActions.getOrDefault(event, info -> {}) - .execute(oldInfo); + containerStateChangeActions.getOrDefault(event, info -> { + }).execute(oldInfo); } } } finally { @@ -475,7 +488,9 @@ public void removeContainer(final HddsProtos.ContainerID id) final ContainerID cid = ContainerID.getFromProtobuf(id); final ContainerInfo containerInfo = containers.getContainerInfo(cid); ExecutionUtil.create(() -> { - containerStore.delete(cid); + containerStore.deleteWithBatch( + transactionBuffer.getCurrentBatchOperation(), + cid); containers.removeContainer(cid); }).onException(() -> containerStore.put(cid, containerInfo)).execute(); } finally { @@ -504,7 +519,12 @@ public static class Builder { private PipelineManager pipelineMgr; private SCMRatisServer scmRatisServer; private Table table; + private DBTransactionBuffer transactionBuffer; + public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) { + this.transactionBuffer = buffer; + return this; + } public Builder setConfiguration(final Configuration config) { conf = config; return this; @@ -533,7 +553,7 @@ public ContainerStateManagerV2 build() throws IOException { Preconditions.checkNotNull(table); final ContainerStateManagerV2 csm = new ContainerStateManagerImpl( - conf, pipelineMgr, table); + conf, pipelineMgr, table, transactionBuffer); final SCMHAInvocationHandler invocationHandler = new SCMHAInvocationHandler(RequestType.CONTAINER, csm, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java new file mode 100644 index 000000000000..ae74939e1157 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ *

http://www.apache.org/licenses/LICENSE-2.0 + *

+ *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.ratis.statemachine.SnapshotInfo; + +import java.io.Closeable; +import java.io.IOException; + +/** + * DB transaction that buffers SCM DB transactions. Call the flush method + * to flush a batch into SCM DB. This buffer also maintains a latest transaction + * info to indicate the information of the latest transaction in the buffer. + */ +public interface DBTransactionBuffer extends Closeable { + + BatchOperation getCurrentBatchOperation(); + + void updateLatestTrxInfo(SCMTransactionInfo info); + + SCMTransactionInfo getLatestTrxInfo(); + + SnapshotInfo getLatestSnapshot(); + + void setLatestSnapshot(SnapshotInfo latestSnapshot); + + void flush() throws IOException; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java new file mode 100644 index 000000000000..2a1d615c5d7a --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.ratis.statemachine.SnapshotInfo; + +import java.io.IOException; + +public class MockDBTransactionBuffer implements DBTransactionBuffer { + private DBStore dbStore; + private BatchOperation currentBatchOperation; + + public MockDBTransactionBuffer() { + } + + public MockDBTransactionBuffer(DBStore store) { + this.dbStore = store; + } + + @Override + public BatchOperation getCurrentBatchOperation() { + if (currentBatchOperation == null) { + if (dbStore != null) { + currentBatchOperation = dbStore.initBatchOperation(); + } else { + currentBatchOperation = new RDBBatchOperation(); + } + } + return currentBatchOperation; + } + + @Override + public void updateLatestTrxInfo(SCMTransactionInfo info) { + + } + + @Override + public SCMTransactionInfo getLatestTrxInfo() { + return null; + } + + @Override + public SnapshotInfo getLatestSnapshot() { + return null; + } + + @Override + public void setLatestSnapshot(SnapshotInfo latestSnapshot) { + + } + + @Override + public void flush() throws IOException { + if (dbStore != null) { + dbStore.commitBatchOperation(currentBatchOperation); + currentBatchOperation.close(); + currentBatchOperation = null; + } + } + + @Override + public void close() throws IOException { + flush(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java index bf25ad53601e..116994ef49e9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java @@ -16,7 +16,6 @@ */ package org.apache.hadoop.hdds.scm.ha; - import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -45,17 +44,28 @@ public final class MockSCMHAManager implements SCMHAManager { private final SCMRatisServer ratisServer; private boolean isLeader; + private DBTransactionBuffer transactionBuffer; public static SCMHAManager getInstance(boolean isLeader) { return new MockSCMHAManager(isLeader); } + public static SCMHAManager getInstance(boolean isLeader, + DBTransactionBuffer buffer) { + return new MockSCMHAManager(isLeader, buffer); + } + /** * Creates MockSCMHAManager instance. */ private MockSCMHAManager(boolean isLeader) { + this(isLeader, new MockDBTransactionBuffer()); + } + + private MockSCMHAManager(boolean isLeader, DBTransactionBuffer buffer) { this.ratisServer = new MockRatisServer(); this.isLeader = isLeader; + this.transactionBuffer = buffer; } @Override @@ -82,6 +92,11 @@ public SCMRatisServer getRatisServer() { return ratisServer; } + @Override + public DBTransactionBuffer getDBTransactionBuffer() { + return transactionBuffer; + } + /** * {@inheritDoc} */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java new file mode 100644 index 000000000000..e5af07666da2 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.ratis.statemachine.SnapshotInfo; + +import java.io.IOException; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; + +/** + * This is a transaction buffer that buffers SCM DB operations for Pipeline and + * Container. When flush this buffer to DB, a transaction info will also be + * written into DB to indicate the term and transaction index for the latest + * operation in DB. + */ +public class SCMDBTransactionBuffer implements DBTransactionBuffer { + private final SCMMetadataStore metadataStore; + private BatchOperation currentBatchOperation; + private SCMTransactionInfo latestTrxInfo; + private SnapshotInfo latestSnapshot; + + public SCMDBTransactionBuffer(SCMMetadataStore store) throws IOException { + this.metadataStore = store; + + // initialize a batch operation during construction time + currentBatchOperation = this.metadataStore.getStore().initBatchOperation(); + latestTrxInfo = store.getTransactionInfoTable().get(TRANSACTION_INFO_KEY); + if (latestTrxInfo == null) { + // transaction table is empty + latestTrxInfo = + SCMTransactionInfo + .builder() + .setTransactionIndex(-1) + .setCurrentTerm(0) + .build(); + } + latestSnapshot = latestTrxInfo.toSnapshotInfo(); + } + + @Override + public BatchOperation getCurrentBatchOperation() { + return currentBatchOperation; + } + + @Override + public void updateLatestTrxInfo(SCMTransactionInfo info) { + if (info.compareTo(this.latestTrxInfo) <= 0) { + throw new IllegalArgumentException( + "Updating DB buffer transaction info by an older transaction info, " + + "current: " + this.latestTrxInfo + ", updating to: " + info); + } + this.latestTrxInfo = info; + } + + @Override + public SCMTransactionInfo getLatestTrxInfo() { + return this.latestTrxInfo; + } + + @Override + public SnapshotInfo getLatestSnapshot() { + return latestSnapshot; + } + + @Override + public void setLatestSnapshot(SnapshotInfo latestSnapshot) { + this.latestSnapshot = latestSnapshot; + } + + @Override + public void flush() throws IOException { + // write latest trx info into trx table in the same batch + Table transactionInfoTable + = metadataStore.getTransactionInfoTable(); + transactionInfoTable.putWithBatch(currentBatchOperation, + TRANSACTION_INFO_KEY, latestTrxInfo); + + metadataStore.getStore().commitBatchOperation(currentBatchOperation); + currentBatchOperation.close(); + this.latestSnapshot = latestTrxInfo.toSnapshotInfo(); + // reset batch operation + currentBatchOperation = metadataStore.getStore().initBatchOperation(); + } + + @Override + public String toString() { + return latestTrxInfo.toString(); + } + + @Override + public void close() throws IOException { + + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java index 8fe6d7fced00..c8b3ff796b46 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java @@ -34,6 +34,11 @@ public interface SCMHAManager { */ SCMRatisServer getRatisServer(); + /** + * Returns DB transaction buffer. + */ + DBTransactionBuffer getDBTransactionBuffer(); + /** * Stops the HA service. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index db5e9373aa9e..b79538044291 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -39,16 +39,18 @@ public class SCMHAManagerImpl implements SCMHAManager { private final SCMRatisServer ratisServer; private final ConfigurationSource conf; + private final SCMDBTransactionBuffer transactionBuffer; /** * Creates SCMHAManager instance. */ public SCMHAManagerImpl(final ConfigurationSource conf, - final StorageContainerManager scm) - throws IOException { + final StorageContainerManager scm) throws IOException { this.conf = conf; + this.transactionBuffer = + new SCMDBTransactionBuffer(scm.getScmMetadataStore()); this.ratisServer = new SCMRatisServerImpl( - conf.getObject(SCMHAConfiguration.class), conf, scm); + conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer); } /** @@ -63,6 +65,11 @@ public SCMRatisServer getRatisServer() { return ratisServer; } + @Override + public DBTransactionBuffer getDBTransactionBuffer() { + return transactionBuffer; + } + /** * {@inheritDoc} */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java index 3a453e31c9d2..15b2fe1e65a1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java @@ -64,9 +64,8 @@ public class SCMRatisServerImpl implements SCMRatisServer { // TODO: Refactor and remove ConfigurationSource and use only // SCMHAConfiguration. SCMRatisServerImpl(final SCMHAConfiguration haConf, - final ConfigurationSource conf, - final StorageContainerManager scm) - throws IOException { + final ConfigurationSource conf, final StorageContainerManager scm, + final DBTransactionBuffer buffer) throws IOException { this.scm = scm; this.address = haConf.getRatisBindAddress(); @@ -79,7 +78,7 @@ public class SCMRatisServerImpl implements SCMRatisServer { .setServerId(haGrpBuilder.getPeerId()) .setGroup(haGrpBuilder.getRaftGroup()) .setProperties(serverProperties) - .setStateMachine(new SCMStateMachine(scm, this)) + .setStateMachine(new SCMStateMachine(scm, this, buffer)) .build(); this.division = server.getDivision(haGrpBuilder.getRaftGroupId()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java new file mode 100644 index 000000000000..b2f2ed40e9d9 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ *

http://www.apache.org/licenses/LICENSE-2.0 + *

+ *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.statemachine.SnapshotInfo; + +import java.util.List; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_SPLIT_KEY; + +/** + * This class captures the snapshotIndex and term of the latest snapshot in + * the SCM + * Ratis server loads the snapshotInfo during startup and updates the + * lastApplied index to this snapshotIndex. SCM SnapshotInfo does not contain + * any files. It is used only to store/ update the last applied index and term. + */ +public class SCMRatisSnapshotInfo implements SnapshotInfo { + private final long term; + private final long snapshotIndex; + + public SCMRatisSnapshotInfo(long term, long index) { + this.term = term; + this.snapshotIndex = index; + } + + @Override + public TermIndex getTermIndex() { + return TermIndex.valueOf(term, snapshotIndex); + } + + @Override + public long getTerm() { + return term; + } + + @Override + public long getIndex() { + return snapshotIndex; + } + + @Override + public List getFiles() { + return null; + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(term); + stringBuilder.append(TRANSACTION_INFO_SPLIT_KEY); + stringBuilder.append(snapshotIndex); + return stringBuilder.toString(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 052bf4b86587..aa366e1c96a9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -31,6 +31,10 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.util.Time; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -38,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.SCM_NOT_INITIALIZED; + /** * TODO. */ @@ -48,19 +54,38 @@ public class SCMStateMachine extends BaseStateMachine { private final StorageContainerManager scm; private final SCMRatisServer ratisServer; private final Map handlers; - + private final DBTransactionBuffer transactionBuffer; public SCMStateMachine(final StorageContainerManager scm, - final SCMRatisServer ratisServer) { + final SCMRatisServer ratisServer, DBTransactionBuffer buffer) + throws SCMException { this.scm = scm; this.ratisServer = ratisServer; this.handlers = new EnumMap<>(RequestType.class); + this.transactionBuffer = buffer; + SCMTransactionInfo latestTrxInfo = + this.transactionBuffer.getLatestTrxInfo(); + if (!latestTrxInfo.isInitialized()) { + if (!updateLastAppliedTermIndex(latestTrxInfo.getTerm(), + latestTrxInfo.getTransactionIndex())) { + throw new SCMException( + String.format("Failed to update LastAppliedTermIndex " + + "in StateMachine to term:{} index:{}", + latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex() + ), SCM_NOT_INITIALIZED); + } + } } public void registerHandler(RequestType type, Object handler) { handlers.put(type, handler); } + @Override + public SnapshotInfo getLatestSnapshot() { + return transactionBuffer.getLatestSnapshot(); + } + @Override public CompletableFuture applyTransaction( final TransactionContext trx) { @@ -70,14 +95,17 @@ public CompletableFuture applyTransaction( final SCMRatisRequest request = SCMRatisRequest.decode( Message.valueOf(trx.getStateMachineLogEntry().getLogData())); applyTransactionFuture.complete(process(request)); + transactionBuffer.updateLatestTrxInfo(SCMTransactionInfo.builder() + .setCurrentTerm(trx.getLogEntry().getTerm()) + .setTransactionIndex(trx.getLogEntry().getIndex()) + .build()); } catch (Exception ex) { applyTransactionFuture.completeExceptionally(ex); } return applyTransactionFuture; } - private Message process(final SCMRatisRequest request) - throws Exception { + private Message process(final SCMRatisRequest request) throws Exception { try { final Object handler = handlers.get(request.getType()); @@ -93,7 +121,6 @@ private Message process(final SCMRatisRequest request) final Object result = handler.getClass().getMethod( request.getOperation(), argumentTypes.toArray(new Class[0])) .invoke(handler, request.getArguments()); - return SCMRatisResponse.encode(result); } catch (NoSuchMethodException | SecurityException ex) { throw new InvalidProtocolBufferException(ex.getMessage()); @@ -126,4 +153,31 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, LOG.info("current SCM becomes leader of term {}.", term); scm.getScmContext().updateIsLeaderAndTerm(true, term); } + + @Override + public long takeSnapshot() throws IOException { + long startTime = Time.monotonicNow(); + TermIndex lastTermIndex = getLastAppliedTermIndex(); + long lastAppliedIndex = lastTermIndex.getIndex(); + SCMTransactionInfo lastAppliedTrxInfo = + SCMTransactionInfo.fromTermIndex(lastTermIndex); + if (transactionBuffer.getLatestTrxInfo() + .compareTo(lastAppliedTrxInfo) < 0) { + transactionBuffer.updateLatestTrxInfo( + SCMTransactionInfo.builder() + .setCurrentTerm(lastTermIndex.getTerm()) + .setTransactionIndex(lastTermIndex.getIndex()) + .build()); + transactionBuffer.setLatestSnapshot( + transactionBuffer.getLatestTrxInfo().toSnapshotInfo()); + } else { + lastAppliedIndex = + transactionBuffer.getLatestTrxInfo().getTransactionIndex(); + } + + transactionBuffer.flush(); + LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms", + lastAppliedIndex, Time.monotonicNow() - startTime); + return lastAppliedIndex; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java new file mode 100644 index 000000000000..0140839e768d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ *

http://www.apache.org/licenses/LICENSE-2.0 + *

+ *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.StringUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.SnapshotInfo; + +import java.util.Objects; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_SPLIT_KEY; + +/** + * SCMTransactionInfo saves two fields for a transaction: + * 1. term, of which the transaction belongs to + * 2. transactionIndex, which is a monotonic increasing index + * (e.g. Raft Log index) + */ +final public class SCMTransactionInfo { + private long term; + private long transactionIndex; + + private SCMTransactionInfo(String transactionInfo) { + String[] tInfo = + transactionInfo.split(TRANSACTION_INFO_SPLIT_KEY); + Preconditions.checkState(tInfo.length == 2, + "Incorrect TransactionInfo value"); + + term = Long.parseLong(tInfo[0]); + transactionIndex = Long.parseLong(tInfo[1]); + } + + private SCMTransactionInfo(long currentTerm, long transactionIndex) { + this.term = currentTerm; + this.transactionIndex = transactionIndex; + } + + public boolean isInitialized() { + return transactionIndex == -1 && term == 0; + } + + public int compareTo(SCMTransactionInfo info) { + if (info.getTerm() == this.getTerm()) { + return this.getTransactionIndex() <= info.getTransactionIndex() ? -1 : 1; + } else { + return this.getTerm() < info.getTerm() ? -1 : 1; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SCMTransactionInfo that = (SCMTransactionInfo) o; + return term == that.term && + transactionIndex == that.transactionIndex; + } + + @Override + public int hashCode() { + return Objects.hash(term, transactionIndex); + } + + @Override + public String toString() { + return "[" + term + ":" + transactionIndex + "]"; + } + + public long getTerm() { + return term; + } + + public long getTransactionIndex() { + return transactionIndex; + } + + public static SCMTransactionInfo fromTermIndex(TermIndex termIndex) { + return builder().setCurrentTerm(termIndex.getTerm()) + .setTransactionIndex(termIndex.getIndex()).build(); + } + + public SnapshotInfo toSnapshotInfo() { + return new SCMRatisSnapshotInfo(term, transactionIndex); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private long currentTerm = 0; + private long transactionIndex = -1; + + public Builder setCurrentTerm(long term) { + this.currentTerm = term; + return this; + } + + public Builder setTransactionIndex(long tIndex) { + this.transactionIndex = tIndex; + return this; + } + + public SCMTransactionInfo build() { + return new SCMTransactionInfo(currentTerm, transactionIndex); + } + } + + public static SCMTransactionInfo getFromByteArray(byte[] bytes) { + String tInfo = StringUtils.bytes2String(bytes); + return new SCMTransactionInfo(tInfo); + } + + public byte[] convertToByteArray() { + return StringUtils.string2Bytes(generateTransactionInfo()); + } + + private String generateTransactionInfo() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(term); + stringBuilder.append(TRANSACTION_INFO_SPLIT_KEY); + stringBuilder.append(transactionIndex); + + return stringBuilder.toString(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java index fcddcdd274d8..790b6db4a87b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java @@ -24,11 +24,13 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; import org.apache.hadoop.hdds.utils.db.DBDefinition; import org.apache.hadoop.hdds.utils.db.LongCodec; +import org.apache.hadoop.hdds.utils.db.StringCodec; /** * Class defines the structure and types of the scm.db. @@ -80,6 +82,15 @@ public class SCMDBDefinition implements DBDefinition { ContainerInfo.class, new ContainerInfoCodec()); + public static final DBColumnFamilyDefinition + TRANSACTIONINFO = + new DBColumnFamilyDefinition<>( + "scmTransactionInfos", + String.class, + new StringCodec(), + SCMTransactionInfo.class, + new SCMTransactionInfoCodec()); + @Override public String getName() { return "scm.db"; @@ -93,6 +104,6 @@ public String getLocationConfigKey() { @Override public DBColumnFamilyDefinition[] getColumnFamilies() { return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS, - REVOKED_CERTS, PIPELINES, CONTAINERS}; + REVOKED_CERTS, PIPELINES, CONTAINERS, TRANSACTIONINFO}; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java index 0452c05ae2a6..7ddd11d949e6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore; @@ -110,6 +111,11 @@ public interface SCMMetadataStore { */ Table getPipelineTable(); + /** + * A Table that keeps the latest transaction index of the DB state. + */ + Table getTransactionInfoTable(); + /** * Helper to create and write batch transactions. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java index 0a609c7a0f4e..0ac3a6062f2f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore; @@ -40,6 +41,7 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS; +import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.TRANSACTIONINFO; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +62,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore { private Table pipelineTable; + private Table transactionInfoTable; + private static final Logger LOG = LoggerFactory.getLogger(SCMMetadataStoreImpl.class); private DBStore store; @@ -107,6 +111,10 @@ public void start(OzoneConfiguration config) containerTable = CONTAINERS.getTable(store); checkTableStatus(containerTable, CONTAINERS.getName()); + + transactionInfoTable = TRANSACTIONINFO.getTable(store); + + checkTableStatus(transactionInfoTable, TRANSACTIONINFO.getName()); } } @@ -161,6 +169,11 @@ public Table getPipelineTable() { return pipelineTable; } + @Override + public Table getTransactionInfoTable() { + return transactionInfoTable; + } + @Override public BatchOperationHandler getBatchHandler() { return this.store; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java new file mode 100644 index 000000000000..6c0e6e1333ec --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.metadata; + +import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo; +import org.apache.hadoop.hdds.utils.db.Codec; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class SCMTransactionInfoCodec implements Codec { + + @Override + public byte[] toPersistedFormat(SCMTransactionInfo object) + throws IOException { + checkNotNull(object, "Null object can't be converted to byte array."); + return object.convertToByteArray(); + } + + @Override + public SCMTransactionInfo fromPersistedFormat(byte[] rawData) + throws IOException { + checkNotNull(rawData, "Null byte array can't be converted to " + + "real object."); + return SCMTransactionInfo.getFromByteArray(rawData); + } + + @Override + public SCMTransactionInfo copyObject(SCMTransactionInfo object) { + return object; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java index 286bdc9ddcc8..3c881742764e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java @@ -124,6 +124,7 @@ public static PipelineManagerV2Impl newPipelineManager( .newBuilder().setPipelineStore(pipelineStore) .setRatisServer(scmhaManager.getRatisServer()) .setNodeManager(nodeManager) + .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer()) .build(); // Create PipelineFactory diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java index 06bae4cb9d2c..7f88a427ff83 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -52,17 +53,19 @@ public class PipelineStateManagerV2Impl implements StateManager { private final PipelineStateMap pipelineStateMap; private final NodeManager nodeManager; private Table pipelineStore; + private final DBTransactionBuffer transactionBuffer; // Protect potential contentions between RaftServer and PipelineManager. // See https://issues.apache.org/jira/browse/HDDS-4560 private final ReadWriteLock lock = new ReentrantReadWriteLock(); public PipelineStateManagerV2Impl( - Table pipelineStore, NodeManager nodeManager) - throws IOException { + Table pipelineStore, NodeManager nodeManager, + DBTransactionBuffer buffer) throws IOException { this.pipelineStateMap = new PipelineStateMap(); this.nodeManager = nodeManager; this.pipelineStore = pipelineStore; + this.transactionBuffer = buffer; initialize(); } @@ -90,7 +93,8 @@ public void addPipeline(HddsProtos.Pipeline pipelineProto) try { Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto); if (pipelineStore != null) { - pipelineStore.put(pipeline.getId(), pipeline); + pipelineStore.putWithBatch(transactionBuffer.getCurrentBatchOperation(), + pipeline.getId(), pipeline); pipelineStateMap.addPipeline(pipeline); nodeManager.addPipeline(pipeline); LOG.info("Created pipeline {}.", pipeline); @@ -219,7 +223,8 @@ public void removePipeline(HddsProtos.PipelineID pipelineIDProto) try { PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto); if (pipelineStore != null) { - pipelineStore.delete(pipelineID); + pipelineStore.deleteWithBatch( + transactionBuffer.getCurrentBatchOperation(), pipelineID); } Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID); nodeManager.removePipeline(pipeline); @@ -256,7 +261,8 @@ public void updatePipelineState( if (pipelineStore != null) { pipelineStateMap.updatePipelineState(pipelineID, Pipeline.PipelineState.fromProtobuf(newState)); - pipelineStore.put(pipelineID, getPipeline(pipelineID)); + pipelineStore.putWithBatch(transactionBuffer.getCurrentBatchOperation(), + pipelineID, getPipeline(pipelineID)); } } catch (IOException ex) { LOG.warn("Pipeline {} state update failed", pipelineID); @@ -333,6 +339,12 @@ public static class Builder { private Table pipelineStore; private NodeManager nodeManager; private SCMRatisServer scmRatisServer; + private DBTransactionBuffer transactionBuffer; + + public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) { + this.transactionBuffer = buffer; + return this; + } public Builder setRatisServer(final SCMRatisServer ratisServer) { scmRatisServer = ratisServer; @@ -354,7 +366,8 @@ public StateManager build() throws IOException { Preconditions.checkNotNull(pipelineStore); final StateManager pipelineStateManager = - new PipelineStateManagerV2Impl(pipelineStore, nodeManager); + new PipelineStateManagerV2Impl( + pipelineStore, nodeManager, transactionBuffer); final SCMHAInvocationHandler invocationHandler = new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index b0145bb4431d..6782f5275bae 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -271,7 +271,6 @@ private StorageContainerManager(OzoneConfiguration conf, // Creates the SCM DBs or opens them if it exists. // A valid pointer to the store is required by all the other services below. initalizeMetadataStore(conf, configurator); - // Authenticate SCM if security is enabled, this initialization can only // be done after the metadata store is initialized. if (OzoneSecurityUtil.isSecurityEnabled(conf)) { @@ -868,7 +867,6 @@ public void start() throws IOException { * Stop service. */ public void stop() { - try { LOG.info("Stopping Replication Manager Service."); replicationManager.stop(); @@ -961,6 +959,12 @@ public void stop() { IOUtils.cleanupWithLogger(LOG, containerManager); IOUtils.cleanupWithLogger(LOG, pipelineManager); + try { + scmHAManager.shutdown(); + } catch (Exception ex) { + LOG.error("SCM HA Manager stop failed", ex); + } + try { scmMetadataStore.stop(); } catch (Exception ex) { @@ -971,12 +975,6 @@ public void stop() { ms.stop(); } - try { - scmHAManager.shutdown(); - } catch (Exception ex) { - LOG.error("SCM HA Manager stop failed", ex); - } - scmSafeModeManager.stop(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java new file mode 100644 index 000000000000..f0939abeaa7e --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.metadata; + + +import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.fail; + +public class TestSCMTransactionInfoCodec { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private SCMTransactionInfoCodec codec; + + @Before + public void setUp() { + codec = new SCMTransactionInfoCodec(); + } + + @Test + public void toAndFromPersistedFormat() throws IOException { + SCMTransactionInfo scmTransactionInfo = + new SCMTransactionInfo.Builder().setTransactionIndex(100) + .setCurrentTerm(11).build(); + + SCMTransactionInfo convertedTransactionInfo = + codec.fromPersistedFormat(codec.toPersistedFormat(scmTransactionInfo)); + + Assert.assertEquals(scmTransactionInfo, convertedTransactionInfo); + } + + @Test + public void testCodecWithNullDataFromTable() throws Exception { + thrown.expect(NullPointerException.class); + codec.fromPersistedFormat(null); + } + + @Test + public void testCodecWithNullDataFromUser() throws Exception { + thrown.expect(NullPointerException.class); + codec.toPersistedFormat(null); + } + + @Test + public void testCodecWithIncorrectValues() throws Exception { + try { + codec.fromPersistedFormat("random".getBytes(StandardCharsets.UTF_8)); + fail("testCodecWithIncorrectValues failed"); + } catch (IllegalStateException ex) { + GenericTestUtils.assertExceptionContains("Incorrect TransactionInfo " + + "value", ex); + } + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index c2dc9950bed6..654fd9c9714b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer; +import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; @@ -104,9 +106,21 @@ private PipelineManagerV2Impl createPipelineManager(boolean isLeader) SCMContext.emptyContext()); } + private PipelineManagerV2Impl createPipelineManager( + boolean isLeader, DBTransactionBuffer buffer) throws IOException { + return PipelineManagerV2Impl.newPipelineManager(conf, + MockSCMHAManager.getInstance(isLeader, buffer), + new MockNodeManager(true, 20), + SCMDBDefinition.PIPELINES.getTable(dbStore), + new EventQueue(), + SCMContext.emptyContext()); + } + @Test public void testCreatePipeline() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore); + PipelineManagerV2Impl pipelineManager = + createPipelineManager(true, buffer1); Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); pipelineManager.allowPipelineCreation(); Pipeline pipeline1 = pipelineManager.createPipeline( @@ -118,15 +132,19 @@ public void testCreatePipeline() throws Exception { HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE); Assert.assertEquals(2, pipelineManager.getPipelines().size()); Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId())); + buffer1.close(); pipelineManager.close(); - PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true); + DBTransactionBuffer buffer2 = new MockDBTransactionBuffer(dbStore); + PipelineManagerV2Impl pipelineManager2 = + createPipelineManager(true, buffer2); // Should be able to load previous pipelines. Assert.assertFalse(pipelineManager2.getPipelines().isEmpty()); Assert.assertEquals(2, pipelineManager.getPipelines().size()); pipelineManager2.allowPipelineCreation(); Pipeline pipeline3 = pipelineManager2.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + buffer2.close(); Assert.assertEquals(3, pipelineManager2.getPipelines().size()); Assert.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId())); @@ -151,7 +169,9 @@ public void testCreatePipelineShouldFailOnFollower() throws Exception { @Test public void testUpdatePipelineStates() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + DBTransactionBuffer buffer = new MockDBTransactionBuffer(dbStore); + PipelineManagerV2Impl pipelineManager = + createPipelineManager(true, buffer); Table pipelineStore = SCMDBDefinition.PIPELINES.getTable(dbStore); pipelineManager.allowPipelineCreation(); @@ -160,6 +180,7 @@ public void testUpdatePipelineStates() throws Exception { Assert.assertEquals(1, pipelineManager.getPipelines().size()); Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); + buffer.flush(); Assert.assertEquals(ALLOCATED, pipelineStore.get(pipeline.getId()).getPipelineState()); PipelineID pipelineID = pipeline.getId(); @@ -170,11 +191,13 @@ public void testUpdatePipelineStates() throws Exception { .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN).contains(pipeline)); + buffer.flush(); Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen()); pipelineManager.deactivatePipeline(pipeline.getId()); Assert.assertEquals(Pipeline.PipelineState.DORMANT, pipelineManager.getPipeline(pipelineID).getPipelineState()); + buffer.flush(); Assert.assertEquals(Pipeline.PipelineState.DORMANT, pipelineStore.get(pipeline.getId()).getPipelineState()); Assert.assertFalse(pipelineManager @@ -187,6 +210,7 @@ public void testUpdatePipelineStates() throws Exception { .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN).contains(pipeline)); + buffer.flush(); Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen()); pipelineManager.close(); } @@ -430,7 +454,9 @@ public void testPipelineCreationFailedMetric() throws Exception { @Test public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore); + PipelineManagerV2Impl pipelineManager = + createPipelineManager(true, buffer1); pipelineManager.allowPipelineCreation(); pipelineManager.onMessage( @@ -439,6 +465,7 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); // close manager + buffer1.close(); pipelineManager.close(); // new pipeline manager loads the pipelines from the db in ALLOCATED state pipelineManager = createPipelineManager(true); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java index a04ecea75041..ef579d12fd62 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo; import org.apache.hadoop.hdds.scm.metadata.PipelineCodec; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore; @@ -116,6 +117,11 @@ public Table getPipelineTable() { return pipelineTable; } + @Override + public Table getTransactionInfoTable() { + return null; + } + @Override public BatchOperationHandler getBatchHandler() { return null; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java new file mode 100644 index 000000000000..e1d43a556810 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; +import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration; +import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS; +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; + +public class TestSCMSnapshot { + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + @BeforeClass + public static void setup() throws Exception { + conf = new OzoneConfiguration(); + SCMHAConfiguration scmhaConfiguration = conf.getObject( + SCMHAConfiguration.class); + scmhaConfiguration.setRatisSnapshotThreshold(1L); + conf.setFromObject(scmhaConfiguration); + cluster = MiniOzoneCluster + .newBuilder(conf) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + } + + @Test + public void testSnapshot() throws Exception { + StorageContainerManager scm = cluster.getStorageContainerManager(); + long snapshotInfo1 = scm.getScmHAManager().getDBTransactionBuffer() + .getLatestTrxInfo().getTransactionIndex(); + ContainerManagerV2 containerManager = scm.getContainerManager(); + PipelineManager pipelineManager = scm.getPipelineManager(); + Pipeline ratisPipeline1 = pipelineManager.getPipeline( + containerManager.allocateContainer( + RATIS, THREE, "Owner1").getPipelineID()); + pipelineManager.openPipeline(ratisPipeline1.getId()); + Pipeline ratisPipeline2 = pipelineManager.getPipeline( + containerManager.allocateContainer( + RATIS, ONE, "Owner2").getPipelineID()); + pipelineManager.openPipeline(ratisPipeline2.getId()); + long snapshotInfo2 = scm.getScmHAManager().getDBTransactionBuffer() + .getLatestTrxInfo().getTransactionIndex(); + + Assert.assertTrue( + String.format("Snapshot index 2 {} should greater than Snapshot " + + "index 1 {}", snapshotInfo2, snapshotInfo1), + snapshotInfo2 > snapshotInfo1); + + Table trxInfo = + scm.getScmMetadataStore().getTransactionInfoTable(); + SCMTransactionInfo scmTransactionInfo = trxInfo.get(TRANSACTION_INFO_KEY); + + Assert.assertTrue( + "DB trx info:" + scmTransactionInfo.getTransactionIndex() + + ", latestSnapshotInfo:" + snapshotInfo2, + scmTransactionInfo.getTransactionIndex() >= snapshotInfo2); + + cluster.restartStorageContainerManager(false); + SCMTransactionInfo trxInfoAfterRestart = + scm.getScmHAManager().getDBTransactionBuffer().getLatestTrxInfo(); + Assert.assertTrue( + trxInfoAfterRestart.getTransactionIndex() >= snapshotInfo2); + try { + pipelineManager.getPipeline(ratisPipeline1.getId()); + pipelineManager.getPipeline(ratisPipeline2.getId()); + } catch (PipelineNotFoundException e) { + Assert.fail("Should not see a PipelineNotFoundException"); + } + } + + @AfterClass + public static void shutdown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 58e459725bc2..d0cd642296f8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -300,6 +300,7 @@ public OzoneClient getRpcClient() throws IOException { public void restartStorageContainerManager(boolean waitForDatanode) throws TimeoutException, InterruptedException, IOException, AuthenticationException { + LOG.info("Restarting SCM in cluster " + this.getClass()); scm.stop(); scm.join(); scm = TestUtils.getScmSimple(conf); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java index 4b71f440120f..73e260ff8eed 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java @@ -32,9 +32,11 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager; @@ -66,12 +68,13 @@ public class ReconContainerManager extends ContainerManagerImpl { */ public ReconContainerManager( Configuration conf, + DBStore store, Table containerStore, PipelineManager pipelineManager, StorageContainerServiceProvider scm, ContainerSchemaManager containerSchemaManager) throws IOException { - super(conf, MockSCMHAManager.getInstance(true), - pipelineManager, containerStore); + super(conf, MockSCMHAManager.getInstance(true, + new MockDBTransactionBuffer(store)), pipelineManager, containerStore); this.scmClient = scm; this.pipelineManager = pipelineManager; this.containerSchemaManager = containerSchemaManager; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index a406779187cc..80648cf61631 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -123,6 +123,7 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, ReconSCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue); this.containerManager = new ReconContainerManager(conf, + dbStore, ReconSCMDBDefinition.CONTAINERS.getTable(dbStore), pipelineManager, scmServiceProvider, diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java index f3592bcb8119..93cf256c743a 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java @@ -83,6 +83,7 @@ public void setUp() throws Exception { ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue); containerManager = new ReconContainerManager( conf, + store, ReconSCMDBDefinition.CONTAINERS.getTable(store), pipelineManager, getScmServiceProvider(), diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java index c493e2c8ec77..fec673a54971 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.junit.Test; /** @@ -72,6 +73,8 @@ public void testAddNewOpenContainer() throws IOException { assertEquals(containerID, containersInPipeline.first()); // Verify container DB. + SCMHAManager scmhaManager = containerManager.getSCMHAManager(); + scmhaManager.getDBTransactionBuffer().close(); assertTrue(getContainerTable().isExist(containerID)); } @@ -95,6 +98,8 @@ public void testAddNewClosedContainer() throws IOException { assertEquals(1, containers.size()); assertEquals(containerInfo, containers.get(0)); // Verify container DB. + SCMHAManager scmhaManager = containerManager.getSCMHAManager(); + scmhaManager.getDBTransactionBuffer().close(); assertTrue(getContainerTable().isExist(containerID)); }