Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Objects;

import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.StringCodec;
Expand Down Expand Up @@ -162,7 +163,15 @@ public String toString() {
*/
public static TransactionInfo readTransactionInfo(
DBStoreHAManager metadataManager) throws IOException {
return metadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
return metadataManager.getTransactionInfoTable().getSkipCache(TRANSACTION_INFO_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need getSkipCache() version for TransactionInfo ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are always supposed to get the value from the disk, and never from the cache. There should be no cache present for transactionInfo table. I have added a fail safe check, to skip cache even if it exists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we agreed that there is no need to skip the cache because we don't update the cache directly for TransactionInfoTable. Once the transaction gets flushed, it updates the cache as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even check the cache if the cache datastructure is not going to have it. SkipCache would directly go to rocksdb and check it, in contrary to the get method making a roundtrip to cache and then going to rocksdb when not found in cache.

}

public ByteString toByteString() throws IOException {
return ByteString.copyFrom(getCodec().toPersistedFormat(this));
}

public static TransactionInfo fromByteString(ByteString byteString) throws IOException {
return byteString == null ? null : getCodec().fromPersistedFormat(byteString.toByteArray());
}

public SnapshotInfo toSnapshotInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.CopyObject;
Expand Down Expand Up @@ -124,6 +125,7 @@ public static SnapshotStatus valueOf(SnapshotStatusProto status) {
private long exclusiveSize;
private long exclusiveReplicatedSize;
private boolean deepCleanedDeletedDir;
private ByteString lastTransactionInfo;

private SnapshotInfo(Builder b) {
this.snapshotId = b.snapshotId;
Expand All @@ -145,6 +147,7 @@ private SnapshotInfo(Builder b) {
this.exclusiveSize = b.exclusiveSize;
this.exclusiveReplicatedSize = b.exclusiveReplicatedSize;
this.deepCleanedDeletedDir = b.deepCleanedDeletedDir;
this.lastTransactionInfo = b.lastTransactionInfo;
}

public void setName(String name) {
Expand Down Expand Up @@ -261,13 +264,15 @@ public SnapshotInfo.Builder toBuilder() {
.setGlobalPreviousSnapshotId(globalPreviousSnapshotId)
.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir)
.setDbTxSequenceNumber(dbTxSequenceNumber)
.setDeepClean(deepClean)
.setSstFiltered(sstFiltered)
.setReferencedSize(referencedSize)
.setReferencedReplicatedSize(referencedReplicatedSize)
.setExclusiveSize(exclusiveSize)
.setExclusiveReplicatedSize(exclusiveReplicatedSize)
.setDeepCleanedDeletedDir(deepCleanedDeletedDir);
.setDeepCleanedDeletedDir(deepCleanedDeletedDir)
.setLastTransactionInfo(lastTransactionInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the existing snapshots when the system is upgraded ? they would not have this field set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lastTransactionInfo would be having a null value

}

/**
Expand All @@ -293,6 +298,7 @@ public static class Builder {
private long exclusiveSize;
private long exclusiveReplicatedSize;
private boolean deepCleanedDeletedDir;
private ByteString lastTransactionInfo;

public Builder() {
// default values
Expand Down Expand Up @@ -411,6 +417,11 @@ public Builder setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
return this;
}

public Builder setLastTransactionInfo(ByteString lastTransactionInfo) {
this.lastTransactionInfo = lastTransactionInfo;
return this;
}

public SnapshotInfo build() {
Preconditions.checkNotNull(name);
return new SnapshotInfo(this);
Expand Down Expand Up @@ -445,6 +456,10 @@ public OzoneManagerProtocolProtos.SnapshotInfo getProtobuf() {
sib.setGlobalPreviousSnapshotID(toProtobuf(globalPreviousSnapshotId));
}

if (lastTransactionInfo != null) {
sib.setLastTransactionInfo(lastTransactionInfo);
}

sib.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir)
.setDbTxSequenceNumber(dbTxSequenceNumber)
Expand Down Expand Up @@ -513,6 +528,10 @@ public static SnapshotInfo getFromProtobuf(
snapshotInfoProto.getDeepCleanedDeletedDir());
}

if (snapshotInfoProto.hasLastTransactionInfo()) {
osib.setLastTransactionInfo(snapshotInfoProto.getLastTransactionInfo());
}

osib.setSnapshotPath(snapshotInfoProto.getSnapshotPath())
.setCheckpointDir(snapshotInfoProto.getCheckpointDir())
.setDbTxSequenceNumber(snapshotInfoProto.getDbTxSequenceNumber());
Expand Down Expand Up @@ -605,6 +624,14 @@ public void setDeepCleanedDeletedDir(boolean deepCleanedDeletedDir) {
this.deepCleanedDeletedDir = deepCleanedDeletedDir;
}

public ByteString getLastTransactionInfo() {
return lastTransactionInfo;
}

public void setLastTransactionInfo(ByteString lastTransactionInfo) {
this.lastTransactionInfo = lastTransactionInfo;
}

/**
* Generate default name of snapshot, (used if user doesn't provide one).
*/
Expand Down Expand Up @@ -673,7 +700,8 @@ public boolean equals(Object o) {
referencedReplicatedSize == that.referencedReplicatedSize &&
exclusiveSize == that.exclusiveSize &&
exclusiveReplicatedSize == that.exclusiveReplicatedSize &&
deepCleanedDeletedDir == that.deepCleanedDeletedDir;
deepCleanedDeletedDir == that.deepCleanedDeletedDir &&
Objects.equals(lastTransactionInfo, that.lastTransactionInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre=upgrade snapshots would not have this field set. This can cause equality tests to be fail for such snapshots. Can this equality test cause regression in existing code paths due to this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would always get the snapshotInfo from the disk. So this value would be null anyways. We don't use the snapshotInfo.equals anywhere in the code flow. It is only in the test case we use this.

}

@Override
Expand All @@ -684,35 +712,15 @@ public int hashCode() {
globalPreviousSnapshotId, snapshotPath, checkpointDir,
deepClean, sstFiltered,
referencedSize, referencedReplicatedSize,
exclusiveSize, exclusiveReplicatedSize, deepCleanedDeletedDir);
exclusiveSize, exclusiveReplicatedSize, deepCleanedDeletedDir, lastTransactionInfo);
}

/**
* Return a new copy of the object.
*/
@Override
public SnapshotInfo copyObject() {
return new Builder()
.setSnapshotId(snapshotId)
.setName(name)
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setSnapshotStatus(snapshotStatus)
.setCreationTime(creationTime)
.setDeletionTime(deletionTime)
.setPathPreviousSnapshotId(pathPreviousSnapshotId)
.setGlobalPreviousSnapshotId(globalPreviousSnapshotId)
.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir)
.setDbTxSequenceNumber(dbTxSequenceNumber)
.setDeepClean(deepClean)
.setSstFiltered(sstFiltered)
.setReferencedSize(referencedSize)
.setReferencedReplicatedSize(referencedReplicatedSize)
.setExclusiveSize(exclusiveSize)
.setExclusiveReplicatedSize(exclusiveReplicatedSize)
.setDeepCleanedDeletedDir(deepCleanedDeletedDir)
.build();
return this.toBuilder().build();
}

@Override
Expand All @@ -737,6 +745,7 @@ public String toString() {
", exclusiveSize: '" + exclusiveSize + '\'' +
", exclusiveReplicatedSize: '" + exclusiveReplicatedSize + '\'' +
", deepCleanedDeletedDir: '" + deepCleanedDeletedDir + '\'' +
", lastTransactionInfo: '" + lastTransactionInfo + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ message SnapshotInfo {
optional uint64 exclusiveReplicatedSize = 18;
// note: shared sizes can be calculated from: referenced - exclusive
optional bool deepCleanedDeletedDir = 19;
optional bytes lastTransactionInfo = 20;
}

message SnapshotDiffJobProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
Expand Down Expand Up @@ -674,6 +675,41 @@ private ReferenceCounted<OmSnapshot> getSnapshot(String snapshotTableKey, boolea
return snapshotCache.get(snapshotInfo.getSnapshotId());
}

/**
* Checks if the last transaction performed on the snapshot has been flushed to disk.
* @param metadataManager Metadatamanager of Active OM.
* @param snapshotTableKey table key corresponding to snapshot in snapshotInfoTable.
* @return True if the changes have been flushed to DB otherwise false
* @throws IOException
*/
public static boolean areSnapshotChangesFlushedToDB(OMMetadataManager metadataManager, String snapshotTableKey)
throws IOException {
// Need this info from cache since the snapshot could have been updated only on cache and not on disk.
SnapshotInfo snapshotInfo = metadataManager.getSnapshotInfoTable().get(snapshotTableKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should use SnapshotUtils#getSnapshotInfo.

return areSnapshotChangesFlushedToDB(metadataManager, snapshotInfo);
}

/**
* Checks if the last transaction performed on the snapshot has been flushed to disk.
* @param metadataManager Metadatamanager of Active OM.
* @param snapshotInfo SnapshotInfo value.
* @return True if the changes have been flushed to DB otherwise false. It would return true if the snapshot
* provided is null meaning the snapshot doesn't exist.
* @throws IOException
*/
public static boolean areSnapshotChangesFlushedToDB(OMMetadataManager metadataManager, SnapshotInfo snapshotInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the usage of this function in PR#7200, these static functions should be moved to SnapshotUtils.

throws IOException {
if (snapshotInfo != null) {
TransactionInfo snapshotTransactionInfo = snapshotInfo.getLastTransactionInfo() != null ?
TransactionInfo.fromByteString(snapshotInfo.getLastTransactionInfo()) : null;
TransactionInfo omTransactionInfo = TransactionInfo.readTransactionInfo(metadataManager);
// If transactionInfo field is null then return true to keep things backward compatible.
return snapshotTransactionInfo == null || omTransactionInfo.compareTo(snapshotTransactionInfo) >= 0;
}
return true;
}


/**
* Returns OmSnapshot object and skips active check.
* This should only be used for API calls initiated by background service e.g. purgeKeys, purgeSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -149,6 +150,11 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
}
}
}
if (fromSnapshotInfo != null) {
fromSnapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(fromSnapshotInfo.getTableKey()),
CacheValue.get(termIndex.getIndex(), fromSnapshotInfo));
}
} catch (IOException ex) {
// Case of IOException for fromProtobuf will not happen
// as this is created and send within OM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -61,6 +65,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ?
purgeKeysRequest.getSnapshotTableKey() : null;
List<String> keysToBePurgedList = new ArrayList<>();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager();

OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
getOmRequest());
Expand All @@ -71,17 +76,27 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
keysToBePurgedList.add(deletedKey);
}
}
final SnapshotInfo fromSnapshotInfo;

try {
SnapshotInfo fromSnapshotInfo = null;
if (fromSnapshot != null) {
fromSnapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot);
}
omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
keysToBePurgedList, fromSnapshotInfo, keysToUpdateList);
fromSnapshotInfo = fromSnapshot == null ? null : SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot);
} catch (IOException ex) {
omClientResponse = new OMKeyPurgeResponse(createErrorOMResponse(omResponse, ex));
return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, ex));
}

// Setting transaction info for snapshot, this is to prevent duplicate purge requests to OM from background
// services.
try {
if (fromSnapshotInfo != null) {
fromSnapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(fromSnapshotInfo.getTableKey()),
CacheValue.get(termIndex.getIndex(), fromSnapshotInfo));
}
} catch (IOException e) {
return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, e));
}
omClientResponse = new OMKeyPurgeResponse(omResponse.build(), keysToBePurgedList, fromSnapshotInfo,
keysToUpdateList);

return omClientResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
Expand Down Expand Up @@ -166,7 +167,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
((RDBStore) omMetadataManager.getStore()).getDb()
.getLatestSequenceNumber();
snapshotInfo.setDbTxSequenceNumber(dbLatestSequenceNumber);

snapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
// Snapshot referenced size should be bucket's used bytes
OmBucketInfo omBucketInfo =
getBucketInfo(omMetadataManager, volumeName, bucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -82,15 +85,20 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
nextSnapshot = SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, ozoneManager);

// Get next non-deleted snapshot.
List<SnapshotMoveKeyInfos> nextDBKeysList =
moveDeletedKeysRequest.getNextDBKeysList();
List<SnapshotMoveKeyInfos> reclaimKeysList =
moveDeletedKeysRequest.getReclaimKeysList();
List<HddsProtos.KeyValue> renamedKeysList =
moveDeletedKeysRequest.getRenamedKeysList();
List<String> movedDirs =
moveDeletedKeysRequest.getDeletedDirsToMoveList();

List<SnapshotMoveKeyInfos> nextDBKeysList = moveDeletedKeysRequest.getNextDBKeysList();
List<SnapshotMoveKeyInfos> reclaimKeysList = moveDeletedKeysRequest.getReclaimKeysList();
List<HddsProtos.KeyValue> renamedKeysList = moveDeletedKeysRequest.getRenamedKeysList();
List<String> movedDirs = moveDeletedKeysRequest.getDeletedDirsToMoveList();

// Update lastTransactionInfo for fromSnapshot and the nextSnapshot.
fromSnapshot.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()),
CacheValue.get(termIndex.getIndex(), fromSnapshot));
if (nextSnapshot != null) {
nextSnapshot.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(nextSnapshot.getTableKey()),
CacheValue.get(termIndex.getIndex(), nextSnapshot));
}
omClientResponse = new OMSnapshotMoveDeletedKeysResponse(
omResponse.build(), fromSnapshot, nextSnapshot,
nextDBKeysList, reclaimKeysList, renamedKeysList, movedDirs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -110,9 +111,16 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex);
// Step 2: Update the snapshot chain.
updateSnapshotChainAndCache(omMetadataManager, fromSnapshot, trxnLogIndex);
// Step 3: Purge the snapshot from SnapshotInfoTable cache.
// Step 3: Purge the snapshot from SnapshotInfoTable cache and also remove from the map.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
updatedSnapshotInfos.remove(fromSnapshot.getTableKey());
}

for (SnapshotInfo snapshotInfo : updatedSnapshotInfos.values()) {
snapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(snapshotInfo.getTableKey()),
CacheValue.get(termIndex.getIndex(), snapshotInfo));
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), snapshotDbKeys, updatedSnapshotInfos);
Expand Down
Loading