Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public void initialize(
storage.init(raftStorage);
ratisServer.notifyGroupAdd(gid);

LOG.info("{}: initialize {}", server.getId(), id);
loadSnapshot(storage.getLatestSnapshot());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -41,6 +43,8 @@
* operation in DB.
*/
public class SCMHADBTransactionBufferImpl implements SCMHADBTransactionBuffer {

public static final Logger LOG = LoggerFactory.getLogger(SCMHADBTransactionBufferImpl.class);
private final StorageContainerManager scm;
private SCMMetadataStore metadataStore;
private BatchOperation currentBatchOperation;
Expand Down Expand Up @@ -107,6 +111,8 @@ public SnapshotInfo getLatestSnapshot() {

@Override
public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
LOG.info("{}: Set latest Snapshot to {}",
scm.getScmHAManager().getRatisServer().getDivision().getId(), latestSnapshot);
this.latestSnapshot.set(latestSnapshot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void initialize(RaftServer server, RaftGroupId id,
getLifeCycle().startAndTransition(() -> {
super.initialize(server, id, raftStorage);
storage.init(raftStorage);
LOG.info("{}: initialize {}", server.getId(), id);
});
}

Expand All @@ -149,6 +150,9 @@ public CompletableFuture<Message> applyTransaction(
final SCMRatisRequest request = SCMRatisRequest.decode(
Message.valueOf(trx.getStateMachineLogEntry().getLogData()));

if (LOG.isDebugEnabled()) {
LOG.debug("{}: applyTransaction {}", getId(), TermIndex.valueOf(trx.getLogEntry()));
}
try {
applyTransactionFuture.complete(process(request));
} catch (SCMException ex) {
Expand Down Expand Up @@ -389,6 +393,7 @@ public void notifyConfigurationChanged(long term, long index,
@Override
public void pause() {
final LifeCycle lc = getLifeCycle();
LOG.info("{}: Try to pause from current LifeCycle state {}", getId(), lc);
if (lc.getCurrentState() != LifeCycle.State.NEW) {
lc.transition(LifeCycle.State.PAUSING);
lc.transition(LifeCycle.State.PAUSED);
Expand All @@ -414,6 +419,8 @@ public void reinitialize() throws IOException {
throw new IOException(e);
}

LOG.info("{}: SCMStateMachine is reinitializing. newTermIndex = {}", getId(), termIndex);

// re-initialize the DBTransactionBuffer and update the lastAppliedIndex.
try {
transactionBuffer.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.hadoop.ozone.protocolPB.RequestHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
Expand Down Expand Up @@ -88,7 +89,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
new SimpleStateMachineStorage();
private final OzoneManager ozoneManager;
private RequestHandler handler;
private RaftGroupId raftGroupId;
private volatile OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final ExecutorService executorService;
private final ExecutorService installSnapshotExecutor;
Expand Down Expand Up @@ -134,17 +134,18 @@ public void initialize(RaftServer server, RaftGroupId id,
RaftStorage raftStorage) throws IOException {
getLifeCycle().startAndTransition(() -> {
super.initialize(server, id, raftStorage);
this.raftGroupId = id;
storage.init(raftStorage);
LOG.info("{}: initialize {} with {}", getId(), id, getLastAppliedTermIndex());
});
}

@Override
public synchronized void reinitialize() throws IOException {
loadSnapshotInfoFromDB();
if (getLifeCycleState() == LifeCycle.State.PAUSED) {
unpause(getLastAppliedTermIndex().getIndex(),
getLastAppliedTermIndex().getTerm());
final TermIndex lastApplied = getLastAppliedTermIndex();
unpause(lastApplied.getIndex(), lastApplied.getTerm());
LOG.info("{}: reinitialize {} with {}", getId(), getGroupId(), lastApplied);
}
}

Expand All @@ -160,6 +161,7 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
RaftPeerId newLeaderId) {
// Initialize OMHAMetrics
ozoneManager.omHAMetricsInit(newLeaderId.toString());
LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId);
}

/** Notified by Ratis for non-StateMachine term-index update. */
Expand Down Expand Up @@ -263,7 +265,7 @@ public TransactionContext startTransaction(
messageContent);

Preconditions.checkArgument(raftClientRequest.getRaftGroupId().equals(
raftGroupId));
getGroupId()));
try {
handler.validateRequest(omRequest);
} catch (IOException ioe) {
Expand Down Expand Up @@ -293,6 +295,10 @@ public TransactionContext preAppendTransaction(TransactionContext trx)

OzoneManagerPrepareState prepareState = ozoneManager.getPrepareState();

if (LOG.isDebugEnabled()) {
LOG.debug("{}: preAppendTransaction {}", getId(), TermIndex.valueOf(trx.getLogEntry()));
}

if (cmdType == OzoneManagerProtocolProtos.Type.Prepare) {
// Must authenticate prepare requests here, since we must determine
// whether or not to apply the prepare gate before proceeding with the
Expand All @@ -303,8 +309,7 @@ public TransactionContext preAppendTransaction(TransactionContext trx)
if (ozoneManager.getAclsEnabled()
&& !ozoneManager.isAdmin(userGroupInformation)) {
String message = "Access denied for user " + userGroupInformation
+ ". "
+ "Superuser privilege is required to prepare ozone managers.";
+ ". Superuser privilege is required to prepare upgrade/downgrade.";
OMException cause =
new OMException(message, OMException.ResultCodes.ACCESS_DENIED);
// Leader should not step down because of this failure.
Expand Down Expand Up @@ -341,6 +346,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
: OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
final TermIndex termIndex = TermIndex.valueOf(trx.getLogEntry());
LOG.debug("{}: applyTransaction {}", getId(), termIndex);
// In the current approach we have one single global thread executor.
// with single thread. Right now this is being done for correctness, as
// applyTransaction will be run on multiple OM's we want to execute the
Expand Down Expand Up @@ -427,12 +433,14 @@ public synchronized void pause() {
*/
public synchronized void unpause(long newLastAppliedSnaphsotIndex,
long newLastAppliedSnapShotTermIndex) {
LOG.info("OzoneManagerStateMachine is un-pausing");
if (statePausedCount.decrementAndGet() == 0) {
getLifeCycle().startAndTransition(() -> {
this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
this.setLastAppliedTermIndex(TermIndex.valueOf(
newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
LOG.info("{}: OzoneManagerStateMachine un-pause completed. " +
"newLastAppliedSnaphsotIndex: {}, newLastAppliedSnapShotTermIndex: {}",
getId(), newLastAppliedSnaphsotIndex, newLastAppliedSnapShotTermIndex);
});
}
}
Expand Down Expand Up @@ -482,15 +490,15 @@ private synchronized long takeSnapshotImpl() throws IOException {
final TermIndex applied = getLastAppliedTermIndex();
final TermIndex notified = getLastNotifiedTermIndex();
final TermIndex snapshot = applied.compareTo(notified) > 0 ? applied : notified;
LOG.info(" applied = {}", applied);
LOG.info(" skipped = {}", lastSkippedIndex);
LOG.info("notified = {}", notified);
LOG.info("snapshot = {}", snapshot);

long startTime = Time.monotonicNow();
final TransactionInfo transactionInfo = TransactionInfo.valueOf(snapshot);
ozoneManager.setTransactionInfo(transactionInfo);
ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY, transactionInfo);
ozoneManager.getMetadataManager().getStore().flushDB();
LOG.info("{}: taking snapshot. applied = {}, skipped = {}, " +
"notified = {}, current snapshot index = {}, took {} ms",
getId(), applied, lastSkippedIndex, notified, snapshot, Time.monotonicNow() - startTime);
return snapshot.getIndex();
}

Expand Down