Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -110,15 +111,14 @@ public ContainerManagerImpl(
.setPipelineManager(pipelineManager)
.setRatisServer(scmHaManager.getRatisServer())
.setContainerStore(containerStore)
.setSCMDBTransactionBuffer(scmHaManager.getDBTransactionBuffer())
.build();

this.numContainerPerVolume = conf
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);

this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();


}

@Override
Expand Down Expand Up @@ -397,4 +397,9 @@ public void close() throws IOException {
protected ContainerStateManagerV2 getContainerStateManager() {
return containerStateManager;
}

@VisibleForTesting
public SCMHAManager getSCMHAManager() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an annotation of @VisibleForTesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return haManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.ha.CheckedConsumer;
import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.ExecutionUtil;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
Expand Down Expand Up @@ -99,7 +100,9 @@ public final class ContainerStateManagerImpl
/**
* Persistent store for Container States.
*/
private final Table<ContainerID, ContainerInfo> containerStore;
private Table<ContainerID, ContainerInfo> containerStore;

private final DBTransactionBuffer transactionBuffer;

/**
* PipelineManager instance.
Expand Down Expand Up @@ -135,7 +138,8 @@ public final class ContainerStateManagerImpl
*/
private ContainerStateManagerImpl(final Configuration conf,
final PipelineManager pipelineManager,
final Table<ContainerID, ContainerInfo> containerStore)
final Table<ContainerID, ContainerInfo> containerStore,
final DBTransactionBuffer buffer)
throws IOException {
this.pipelineManager = pipelineManager;
this.containerStore = containerStore;
Expand All @@ -144,6 +148,7 @@ private ContainerStateManagerImpl(final Configuration conf,
this.containers = new ContainerStateMap();
this.lastUsedMap = new ConcurrentHashMap<>();
this.containerStateChangeActions = getContainerStateChangeActions();
this.transactionBuffer = buffer;

initialize();
}
Expand Down Expand Up @@ -298,12 +303,16 @@ public void addContainer(final ContainerInfoProto containerInfo)
try {
if (!containers.contains(containerID)) {
ExecutionUtil.create(() -> {
containerStore.put(containerID, container);
containerStore.putWithBatch(
transactionBuffer.getCurrentBatchOperation(),
containerID, container);
containers.addContainer(container);
pipelineManager.addContainerToPipeline(pipelineID, containerID);
}).onException(() -> {
containers.removeContainer(containerID);
containerStore.delete(containerID);
containerStore.deleteWithBatch(
transactionBuffer.getCurrentBatchOperation(),
containerID);
}).execute();
}
} finally {
Expand Down Expand Up @@ -339,13 +348,17 @@ public void updateContainerState(final HddsProtos.ContainerID containerID,
if (newState.getNumber() > oldState.getNumber()) {
ExecutionUtil.create(() -> {
containers.updateState(id, oldState, newState);
containerStore.put(id, containers.getContainerInfo(id));
containerStore.putWithBatch(
transactionBuffer.getCurrentBatchOperation(),
id, containers.getContainerInfo(id));
}).onException(() -> {
containerStore.put(id, oldInfo);
containerStore.putWithBatch(
transactionBuffer.getCurrentBatchOperation(),
id, oldInfo);
containers.updateState(id, newState, oldState);
}).execute();
containerStateChangeActions.getOrDefault(event, info -> {})
.execute(oldInfo);
containerStateChangeActions.getOrDefault(event, info -> {
}).execute(oldInfo);
}
}
} finally {
Expand Down Expand Up @@ -475,7 +488,9 @@ public void removeContainer(final HddsProtos.ContainerID id)
final ContainerID cid = ContainerID.getFromProtobuf(id);
final ContainerInfo containerInfo = containers.getContainerInfo(cid);
ExecutionUtil.create(() -> {
containerStore.delete(cid);
containerStore.deleteWithBatch(
transactionBuffer.getCurrentBatchOperation(),
cid);
containers.removeContainer(cid);
}).onException(() -> containerStore.put(cid, containerInfo)).execute();
} finally {
Expand Down Expand Up @@ -504,7 +519,12 @@ public static class Builder {
private PipelineManager pipelineMgr;
private SCMRatisServer scmRatisServer;
private Table<ContainerID, ContainerInfo> table;
private DBTransactionBuffer transactionBuffer;

public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
this.transactionBuffer = buffer;
return this;
}
public Builder setConfiguration(final Configuration config) {
conf = config;
return this;
Expand Down Expand Up @@ -533,7 +553,7 @@ public ContainerStateManagerV2 build() throws IOException {
Preconditions.checkNotNull(table);

final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
conf, pipelineMgr, table);
conf, pipelineMgr, table, transactionBuffer);

final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.ratis.statemachine.SnapshotInfo;

import java.io.Closeable;
import java.io.IOException;

/**
* DB transaction that buffers SCM DB transactions. Call the flush method
* to flush a batch into SCM DB. This buffer also maintains a latest transaction
* info to indicate the information of the latest transaction in the buffer.
*/
public interface DBTransactionBuffer extends Closeable {

BatchOperation getCurrentBatchOperation();

void updateLatestTrxInfo(SCMTransactionInfo info);

SCMTransactionInfo getLatestTrxInfo();

SnapshotInfo getLatestSnapshot();

void setLatestSnapshot(SnapshotInfo latestSnapshot);

void flush() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.ratis.statemachine.SnapshotInfo;

import java.io.IOException;

public class MockDBTransactionBuffer implements DBTransactionBuffer {
private DBStore dbStore;
private BatchOperation currentBatchOperation;

public MockDBTransactionBuffer() {
}

public MockDBTransactionBuffer(DBStore store) {
this.dbStore = store;
}

@Override
public BatchOperation getCurrentBatchOperation() {
if (currentBatchOperation == null) {
if (dbStore != null) {
currentBatchOperation = dbStore.initBatchOperation();
} else {
currentBatchOperation = new RDBBatchOperation();
}
}
return currentBatchOperation;
}

@Override
public void updateLatestTrxInfo(SCMTransactionInfo info) {

}

@Override
public SCMTransactionInfo getLatestTrxInfo() {
return null;
}

@Override
public SnapshotInfo getLatestSnapshot() {
return null;
}

@Override
public void setLatestSnapshot(SnapshotInfo latestSnapshot) {

}

@Override
public void flush() throws IOException {
if (dbStore != null) {
dbStore.commitBatchOperation(currentBatchOperation);
currentBatchOperation.close();
currentBatchOperation = null;
}
}

@Override
public void close() throws IOException {
flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/

package org.apache.hadoop.hdds.scm.ha;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
Expand Down Expand Up @@ -45,17 +44,28 @@ public final class MockSCMHAManager implements SCMHAManager {

private final SCMRatisServer ratisServer;
private boolean isLeader;
private DBTransactionBuffer transactionBuffer;

public static SCMHAManager getInstance(boolean isLeader) {
return new MockSCMHAManager(isLeader);
}

public static SCMHAManager getInstance(boolean isLeader,
DBTransactionBuffer buffer) {
return new MockSCMHAManager(isLeader, buffer);
}

/**
* Creates MockSCMHAManager instance.
*/
private MockSCMHAManager(boolean isLeader) {
this(isLeader, new MockDBTransactionBuffer());
}

private MockSCMHAManager(boolean isLeader, DBTransactionBuffer buffer) {
this.ratisServer = new MockRatisServer();
this.isLeader = isLeader;
this.transactionBuffer = buffer;
}

@Override
Expand All @@ -82,6 +92,11 @@ public SCMRatisServer getRatisServer() {
return ratisServer;
}

@Override
public DBTransactionBuffer getDBTransactionBuffer() {
return transactionBuffer;
}

/**
* {@inheritDoc}
*/
Expand Down
Loading