Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
Expand All @@ -68,6 +71,8 @@
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the client-side translator to translate the requests made on
Expand All @@ -78,6 +83,12 @@
public final class ScmBlockLocationProtocolClientSideTranslatorPB
implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable {

private static final Logger LOG =
LoggerFactory.getLogger(ScmBlockLocationProtocolClientSideTranslatorPB.class);

private static final double RATIS_LIMIT_FACTOR = 0.9;
private int ratisByteLimit;

/**
* RpcController is not used and hence is set to null.
*/
Expand All @@ -93,12 +104,18 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
* failover proxy provider.
*/
public ScmBlockLocationProtocolClientSideTranslatorPB(
SCMBlockLocationFailoverProxyProvider proxyProvider) {
SCMBlockLocationFailoverProxyProvider proxyProvider, OzoneConfiguration conf) {
Preconditions.checkState(proxyProvider != null);
this.failoverProxyProvider = proxyProvider;
this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create(
ScmBlockLocationProtocolPB.class, failoverProxyProvider,
failoverProxyProvider.getRetryPolicy());
int limit = (int) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR);
Comment thread
aryangupta1998 marked this conversation as resolved.
}

/**
Expand Down Expand Up @@ -230,18 +247,40 @@ public List<AllocatedBlock> allocateBlock(
@Override
public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
.map(BlockGroup::getProto).collect(Collectors.toList());

List<DeleteBlockGroupResult> allResults = new ArrayList<>();
List<KeyBlocks> batch = new ArrayList<>();

int serializedSize = 0;
for (BlockGroup bg : keyBlocksInfoList) {
int currSize = bg.getProto().getSerializedSize();
if (currSize + serializedSize > ratisByteLimit) {
allResults.addAll(submitDeleteKeyBlocks(batch));
LOG.info("Sending batch of {} KeyBlocks (~{} bytes)", batch.size(), serializedSize);
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
serializedSize = 0;
batch.clear();
}
batch.add(bg.getProto());
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
serializedSize += currSize;
}

if (!batch.isEmpty()) {
allResults.addAll(submitDeleteKeyBlocks(batch));
}

return allResults;
}

private List<DeleteBlockGroupResult> submitDeleteKeyBlocks(List<KeyBlocks> batch)
throws IOException {
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
.newBuilder()
.addAllKeyBlocks(keyBlocksProto)
.addAllKeyBlocks(batch)
.build();

SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.DeleteScmKeyBlocks)
.setDeleteScmKeyBlocksRequest(request)
.build();

final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final DeleteScmKeyBlocksResponseProto resp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static ScmBlockLocationProtocol getScmBlockClient(
OzoneConfiguration conf) {
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
new SCMBlockLocationFailoverProxyProvider(conf));
new SCMBlockLocationFailoverProxyProvider(conf), conf);
return TracingUtil
.createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testFailover() throws Exception {
failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
failoverProxyProvider);
failoverProxyProvider, conf);
GenericTestUtils
.setLogLevel(SCMBlockLocationFailoverProxyProvider.class, Level.DEBUG);
LogCapturer logCapture = LogCapturer.captureLogs(SCMBlockLocationFailoverProxyProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testGetClusterTreeInformation() throws IOException {
failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
failoverProxyProvider);
failoverProxyProvider, conf);

InnerNode expectedInnerNode = (InnerNode) scm.getClusterMap().getNode(ROOT);
InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,16 +775,6 @@ public PendingKeysDeletion getPendingDeletionKeys(
.addAllBlockIDs(blockIDS).build();
int keyBlockSerializedSize = keyBlocks.getProto().getSerializedSize();
serializedSize += keyBlockSerializedSize;
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
if (serializedSize > ratisByteLimit) {
maxReqSizeExceeded = true;
if (LOG.isDebugEnabled()) {
LOG.debug(
"Total size of cumulative keys and rename entries in the snapshotRenamedTable in a cycle " +
"crossed 90% ratis limit, serialized size of keys: {}",
serializedSize);
}
break;
}
blockGroupList.add(keyBlocks);
currentCount++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksList,
if (blockDeletionResults != null) {
long purgeStartTime = Time.monotonicNow();
purgeResult = submitPurgeKeysRequest(blockDeletionResults,
keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId);
keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId, ratisByteLimit);
int limit = getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK,
OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.",
Expand All @@ -164,15 +164,23 @@ Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksList,
* @param results DeleteBlockGroups returned by SCM.
* @param keysToModify Updated list of RepeatedOmKeyInfo
*/
private Pair<Integer, Boolean> submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntriesToBeDeleted,
String snapTableKey, UUID expectedPreviousSnapshotId) throws InterruptedException {
@SuppressWarnings("checkstyle:MethodLength")
private Pair<Integer, Boolean> submitPurgeKeysRequest(
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
List<DeleteBlockGroupResult> results,
Map<String, RepeatedOmKeyInfo> keysToModify,
List<String> renameEntriesToBeDeleted,
String snapTableKey,
UUID expectedPreviousSnapshotId,
int ratisLimit) throws InterruptedException {

List<String> purgeKeys = new ArrayList<>();

// Put all keys to be purged in a list
int deletedCount = 0;
Set<String> failedDeletedKeys = new HashSet<>();
boolean purgeSuccess = true;

// Step 1: Process DeleteBlockGroupResults
for (DeleteBlockGroupResult result : results) {
String deletedKey = result.getObjectKey();
if (result.isSuccess()) {
Expand All @@ -198,25 +206,7 @@ private Pair<Integer, Boolean> submitPurgeKeysRequest(List<DeleteBlockGroupResul
}
}

PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder();
if (snapTableKey != null) {
purgeKeysRequest.setSnapshotTableKey(snapTableKey);
}
NullableUUID.Builder expectedPreviousSnapshotNullableUUID = NullableUUID.newBuilder();
if (expectedPreviousSnapshotId != null) {
expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId));
}
purgeKeysRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
.setVolumeName("")
.setBucketName("")
.addAllKeys(purgeKeys)
.build();
purgeKeysRequest.addDeletedKeys(deletedKeys);
// Adding rename entries to be purged.
if (renameEntriesToBeDeleted != null) {
purgeKeysRequest.addAllRenamedKeys(renameEntriesToBeDeleted);
}
// Step 2: Prepare keysToUpdateList
List<OzoneManagerProtocolProtos.SnapshotMoveKeyInfos> keysToUpdateList = new ArrayList<>();
if (keysToModify != null) {
for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify :
Expand All @@ -234,27 +224,126 @@ private Pair<Integer, Boolean> submitPurgeKeysRequest(List<DeleteBlockGroupResul
keyToUpdate.addAllKeyInfos(keyInfos);
keysToUpdateList.add(keyToUpdate.build());
}
}

if (!keysToUpdateList.isEmpty()) {
purgeKeysRequest.addAllKeysToUpdate(keysToUpdateList);
int purgeKeyIndex = 0, updateIndex = 0, renameIndex = 0;
int currSize = 0;
boolean batchCapacityReached;

while (purgeKeyIndex < purgeKeys.size() ||
updateIndex < keysToUpdateList.size() ||
(renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size())) {

int remainingRatisLimit = ratisLimit;
batchCapacityReached = false;
PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder();
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated

if (snapTableKey != null) {
requestBuilder.setSnapshotTableKey(snapTableKey);
}
if (expectedPreviousSnapshotId != null) {
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
requestBuilder.setExpectedPreviousSnapshotID(
NullableUUID.newBuilder()
.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId))
.build());
}
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated

currSize = requestBuilder.build().getSerializedSize();
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
remainingRatisLimit -= currSize;

// 3.1 Add purgeKeys
List<String> batchPurgeKeys = new ArrayList<>();
int estimatedPurgeKeysSize = 0;

while (purgeKeyIndex < purgeKeys.size()) {
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
String nextKey = purgeKeys.get(purgeKeyIndex);

List<String> temp = new ArrayList<>(batchPurgeKeys);
temp.add(nextKey);

OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
.setVolumeName("").setBucketName("").addAllKeys(temp).build();

PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone();
tempBuilder.clearDeletedKeys().addDeletedKeys(deletedKeys);
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated

estimatedPurgeKeysSize = tempBuilder.build().getSerializedSize();
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
if (currSize + estimatedPurgeKeysSize > remainingRatisLimit) {
batchCapacityReached = true;
break;
}

batchPurgeKeys.add(nextKey);
purgeKeyIndex++;
}
}

OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
.setPurgeKeysRequest(purgeKeysRequest)
.setClientId(getClientId().toString())
.build();

// Submit PurgeKeys request to OM. Acquire bootstrap lock when processing deletes for snapshots.
try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) {
OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest);
if (omResponse != null) {
purgeSuccess = purgeSuccess && omResponse.getSuccess();
// Now actually add batchPurgeKeys
if (!batchPurgeKeys.isEmpty()) {
OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
.setVolumeName("").setBucketName("")
.addAllKeys(batchPurgeKeys).build();
requestBuilder.addDeletedKeys(deletedKeys);
currSize = requestBuilder.build().getSerializedSize();
}

remainingRatisLimit -= currSize;

// 3.2 Add keysToUpdate
while (!batchCapacityReached && updateIndex < keysToUpdateList.size()) {
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex);
Comment thread
aryangupta1998 marked this conversation as resolved.

PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone();
tempBuilder.addKeysToUpdate(nextUpdate);
int estimatedSize = tempBuilder.build().getSerializedSize();

if (currSize + estimatedSize > remainingRatisLimit) {
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
batchCapacityReached = true;
break;
}

requestBuilder.addKeysToUpdate(nextUpdate);
currSize += estimatedSize;
updateIndex++;
}

remainingRatisLimit -= currSize;

// 3.3 Add renamed keys
while (!batchCapacityReached && renameEntriesToBeDeleted != null &&
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
renameIndex < renameEntriesToBeDeleted.size()) {
String nextRename = renameEntriesToBeDeleted.get(renameIndex);
Comment thread
aryangupta1998 marked this conversation as resolved.

PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone();
tempBuilder.addRenamedKeys(nextRename);
int estimatedSize = tempBuilder.build().getSerializedSize();

if (currSize + estimatedSize > remainingRatisLimit) {
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
break;
}

Comment thread
aryangupta1998 marked this conversation as resolved.
requestBuilder.addRenamedKeys(nextRename);
currSize += estimatedSize;
renameIndex++;
}

// Finalize and send this batch
OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder()
Comment thread
aryangupta1998 marked this conversation as resolved.
Outdated
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
.setPurgeKeysRequest(requestBuilder.build())
.setClientId(getClientId().toString())
.build();

try (BootstrapStateHandler.Lock lock =
snapTableKey != null ? getBootstrapStateLock().lock() : null) {
OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest);
if (omResponse != null) {
purgeSuccess = purgeSuccess && omResponse.getSuccess();
Comment thread
sumitagrawl marked this conversation as resolved.
Outdated
}
} catch (ServiceException | InterruptedException e) {
LOG.error("PurgeKey request failed in batch. Will retry at next run.", e);
purgeSuccess = false;
// Continue to next batch instead of returning immediately
}
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.", e);
return Pair.of(0, false);
}

return Pair.of(deletedCount, purgeSuccess);
Expand Down
Loading