Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.util;

import com.google.protobuf.CodedOutputStream;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;

Expand All @@ -37,4 +38,12 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
public static UUID fromProtobuf(HddsProtos.UUID proto) {
return new UUID(proto.getMostSigBits(), proto.getLeastSigBits());
}

/**
* Computes the serialized size of a string in a repeated string field.
* Wraps protobuf's computeStringSizeNoTag for safer use.
*/
public static int computeRepeatedStringSize(String value) {
return CodedOutputStream.computeStringSizeNoTag(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
Expand All @@ -68,6 +71,8 @@
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the client-side translator to translate the requests made on
Expand All @@ -78,6 +83,12 @@
public final class ScmBlockLocationProtocolClientSideTranslatorPB
implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable {

private static final Logger LOG =
LoggerFactory.getLogger(ScmBlockLocationProtocolClientSideTranslatorPB.class);

private static final double RATIS_LIMIT_FACTOR = 0.9;
private int ratisByteLimit;

/**
* RpcController is not used and hence is set to null.
*/
Expand All @@ -93,12 +104,18 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
* failover proxy provider.
*/
public ScmBlockLocationProtocolClientSideTranslatorPB(
SCMBlockLocationFailoverProxyProvider proxyProvider) {
SCMBlockLocationFailoverProxyProvider proxyProvider, OzoneConfiguration conf) {
Preconditions.checkState(proxyProvider != null);
this.failoverProxyProvider = proxyProvider;
this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create(
ScmBlockLocationProtocolPB.class, failoverProxyProvider,
failoverProxyProvider.getRetryPolicy());
int limit = (int) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR);
}

/**
Expand Down Expand Up @@ -230,18 +247,43 @@ public List<AllocatedBlock> allocateBlock(
@Override
public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
.map(BlockGroup::getProto).collect(Collectors.toList());

List<DeleteBlockGroupResult> allResults = new ArrayList<>();
List<KeyBlocks> batch = new ArrayList<>();

int serializedSize = 0;
for (BlockGroup bg : keyBlocksInfoList) {
KeyBlocks bgProto = bg.getProto();
int currSize = bgProto.getSerializedSize();
if (currSize + serializedSize > ratisByteLimit) {
allResults.addAll(submitDeleteKeyBlocks(batch));
if (LOG.isDebugEnabled()) {
LOG.debug("Sending batch of {} KeyBlocks (~{} bytes)", batch.size(), serializedSize);
}
serializedSize = 0;
batch.clear();
}
batch.add(bgProto);
serializedSize += currSize;
}

if (!batch.isEmpty()) {
allResults.addAll(submitDeleteKeyBlocks(batch));
}

return allResults;
}

private List<DeleteBlockGroupResult> submitDeleteKeyBlocks(List<KeyBlocks> batch)
throws IOException {
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
.newBuilder()
.addAllKeyBlocks(keyBlocksProto)
.addAllKeyBlocks(batch)
.build();

SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.DeleteScmKeyBlocks)
.setDeleteScmKeyBlocksRequest(request)
.build();

final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final DeleteScmKeyBlocksResponseProto resp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static ScmBlockLocationProtocol getScmBlockClient(
OzoneConfiguration conf) {
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
new SCMBlockLocationFailoverProxyProvider(conf));
new SCMBlockLocationFailoverProxyProvider(conf), conf);
return TracingUtil
.createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testFailover() throws Exception {
failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
failoverProxyProvider);
failoverProxyProvider, conf);
GenericTestUtils
.setLogLevel(SCMBlockLocationFailoverProxyProvider.class, Level.DEBUG);
LogCapturer logCapture = LogCapturer.captureLogs(SCMBlockLocationFailoverProxyProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testGetClusterTreeInformation() throws IOException {
failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
failoverProxyProvider);
failoverProxyProvider, conf);

InnerNode expectedInnerNode = (InnerNode) scm.getClusterMap().getNode(ROOT);
InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
Expand All @@ -49,7 +50,6 @@
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
Expand All @@ -68,6 +68,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.NullableUUID;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
import org.apache.hadoop.ozone.util.ProtobufUtils;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
Expand All @@ -88,6 +89,9 @@ public class KeyDeletingService extends AbstractKeyDeletingService {
private final AtomicLong deletedKeyCount;
private final boolean deepCleanSnapshots;
private final SnapshotChainManager snapshotChainManager;
private int ratisByteLimit;
private static final double RATIS_LIMIT_FACTOR = 0.9;

// Track metrics for current task execution
private long latestRunTimestamp = 0L;
private final DeletionStats aosDeletionStats = new DeletionStats();
Expand All @@ -108,6 +112,13 @@ public KeyDeletingService(OzoneManager ozoneManager,
this.deepCleanSnapshots = deepCleanSnapshots;
this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
this.scmClient = scmClient;
int limit = (int) ozoneManager.getConfiguration().getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
// Use 90% of the actual Ratis limit to account for protobuf overhead and
// prevent accidentally exceeding the hard limit during request serialization.
this.ratisByteLimit = (int) Math.max(limit * RATIS_LIMIT_FACTOR, 1);
}

/**
Expand Down Expand Up @@ -143,8 +154,8 @@ Pair<Pair<Integer, Long>, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksL
keyBlocksList.size(), Time.monotonicNow() - startTime);
if (blockDeletionResults != null) {
long purgeStartTime = Time.monotonicNow();
purgeResult = submitPurgeKeysRequest(blockDeletionResults,
keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId, keyBlockReplicatedSize);
purgeResult = submitPurgeKeysRequest(blockDeletionResults, keysToModify, renameEntries, snapTableKey,
expectedPreviousSnapshotId, ratisByteLimit, keyBlockReplicatedSize);
int limit = getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK,
OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.",
Expand All @@ -160,17 +171,25 @@ Pair<Pair<Integer, Long>, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksL
* @param results DeleteBlockGroups returned by SCM.
* @param keysToModify Updated list of RepeatedOmKeyInfo
*/
private Pair<Pair<Integer, Long>, Boolean> submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntriesToBeDeleted,
String snapTableKey, UUID expectedPreviousSnapshotId, Map<String, Long> keyBlockReplicatedSize)
throws InterruptedException {
@SuppressWarnings("checkstyle:MethodLength")
private Pair<Pair<Integer, Long>, Boolean> submitPurgeKeysRequest(
List<DeleteBlockGroupResult> results,
Map<String, RepeatedOmKeyInfo> keysToModify,
List<String> renameEntriesToBeDeleted,
String snapTableKey,
UUID expectedPreviousSnapshotId,
int ratisLimit,
Map<String, Long> keyBlockReplicatedSize) throws InterruptedException {

List<String> purgeKeys = new ArrayList<>();

// Put all keys to be purged in a list
int deletedCount = 0;
long deletedReplSize = 0;
Set<String> failedDeletedKeys = new HashSet<>();
boolean purgeSuccess = true;

// Step 1: Process DeleteBlockGroupResults
for (DeleteBlockGroupResult result : results) {
String deletedKey = result.getObjectKey();
if (result.isSuccess()) {
Expand Down Expand Up @@ -199,25 +218,7 @@ private Pair<Pair<Integer, Long>, Boolean> submitPurgeKeysRequest(List<DeleteBlo
}
}

PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder();
if (snapTableKey != null) {
purgeKeysRequest.setSnapshotTableKey(snapTableKey);
}
NullableUUID.Builder expectedPreviousSnapshotNullableUUID = NullableUUID.newBuilder();
if (expectedPreviousSnapshotId != null) {
expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId));
}
purgeKeysRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
.setVolumeName("")
.setBucketName("")
.addAllKeys(purgeKeys)
.build();
purgeKeysRequest.addDeletedKeys(deletedKeys);
// Adding rename entries to be purged.
if (renameEntriesToBeDeleted != null) {
purgeKeysRequest.addAllRenamedKeys(renameEntriesToBeDeleted);
}
// Step 2: Prepare keysToUpdateList
List<OzoneManagerProtocolProtos.SnapshotMoveKeyInfos> keysToUpdateList = new ArrayList<>();
if (keysToModify != null) {
for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify :
Expand All @@ -235,30 +236,107 @@ private Pair<Pair<Integer, Long>, Boolean> submitPurgeKeysRequest(List<DeleteBlo
keyToUpdate.addAllKeyInfos(keyInfos);
keysToUpdateList.add(keyToUpdate.build());
}
}

if (purgeKeys.isEmpty() && keysToUpdateList.isEmpty() &&
(renameEntriesToBeDeleted == null || renameEntriesToBeDeleted.isEmpty())) {
return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess);
}

int purgeKeyIndex = 0, updateIndex = 0, renameIndex = 0;
PurgeKeysRequest.Builder requestBuilder = getPurgeKeysRequest(snapTableKey, expectedPreviousSnapshotId);
int currSize = requestBuilder.build().getSerializedSize();
int baseSize = currSize;

while (purgeKeyIndex < purgeKeys.size() || updateIndex < keysToUpdateList.size() ||
(renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size())) {

// 3.1 Purge keys (one at a time)
if (purgeKeyIndex < purgeKeys.size()) {
String nextKey = purgeKeys.get(purgeKeyIndex);
int estimatedKeySize = ProtobufUtils.computeRepeatedStringSize(nextKey);

requestBuilder.addDeletedKeys(
OzoneManagerProtocolProtos.DeletedKeys.newBuilder().setVolumeName("").setBucketName("").addKeys(nextKey)
.build());
currSize += estimatedKeySize;
purgeKeyIndex++;

} else if (updateIndex < keysToUpdateList.size()) {
// 3.2 Add keysToUpdate
OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex);

int estimatedSize = nextUpdate.getSerializedSize();

requestBuilder.addKeysToUpdate(nextUpdate);
currSize += estimatedSize;
updateIndex++;

} else if (renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size()) {
// 3.3 Add renamed keys
String nextRename = renameEntriesToBeDeleted.get(renameIndex);

int estimatedSize = ProtobufUtils.computeRepeatedStringSize(nextRename);

requestBuilder.addRenamedKeys(nextRename);
currSize += estimatedSize;
renameIndex++;
}

if (!keysToUpdateList.isEmpty()) {
purgeKeysRequest.addAllKeysToUpdate(keysToUpdateList);
// Flush either when limit is hit, or at the very end if items remain
boolean allDone = purgeKeyIndex == purgeKeys.size() && updateIndex == keysToUpdateList.size() &&
(renameEntriesToBeDeleted == null || renameIndex == renameEntriesToBeDeleted.size());

if (currSize >= ratisLimit || (allDone && hasPendingItems(requestBuilder))) {
purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder);
requestBuilder = getPurgeKeysRequest(snapTableKey, expectedPreviousSnapshotId);
currSize = baseSize;
}
}

OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
.setPurgeKeysRequest(purgeKeysRequest)
.setClientId(getClientId().toString())
.build();
return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess);
}

private boolean hasPendingItems(PurgeKeysRequest.Builder builder) {
return builder.getDeletedKeysCount() > 0
|| builder.getKeysToUpdateCount() > 0
|| builder.getRenamedKeysCount() > 0;
}

private static PurgeKeysRequest.Builder getPurgeKeysRequest(String snapTableKey,
UUID expectedPreviousSnapshotId) {
PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder();

if (snapTableKey != null) {
requestBuilder.setSnapshotTableKey(snapTableKey);
}

NullableUUID.Builder expectedPreviousSnapshotNullableUUID = NullableUUID.newBuilder();
if (expectedPreviousSnapshotId != null) {
expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId));
}
requestBuilder.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
return requestBuilder;
}

private boolean submitPurgeRequest(String snapTableKey, boolean purgeSuccess,
PurgeKeysRequest.Builder requestBuilder) {

OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
.setPurgeKeysRequest(requestBuilder.build()).setClientId(getClientId().toString()).build();

// Submit PurgeKeys request to OM. Acquire bootstrap lock when processing deletes for snapshots.
try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) {
try (Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) {
OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest);
if (omResponse != null) {
purgeSuccess = purgeSuccess && omResponse.getSuccess();
}
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.", e);
return Pair.of(Pair.of(0, 0L), false);
} catch (ServiceException | InterruptedException e) {
LOG.error("PurgeKey request failed in batch. Will retry at next run.", e);
purgeSuccess = false;
// Continue to next batch instead of returning immediately
}

return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess);
return purgeSuccess;
}

/**
Expand Down
Loading