Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.util;

import com.google.protobuf.CodedOutputStream;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;

Expand All @@ -37,4 +38,12 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
public static UUID fromProtobuf(HddsProtos.UUID proto) {
return new UUID(proto.getMostSigBits(), proto.getLeastSigBits());
}

/**
* Computes the serialized size of a string in a repeated string field.
* Wraps protobuf's computeStringSizeNoTag for safer use.
*/
public static int computeRepeatedStringSize(String value) {
return CodedOutputStream.computeStringSizeNoTag(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
Expand All @@ -68,6 +71,8 @@
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the client-side translator to translate the requests made on
Expand All @@ -78,6 +83,12 @@
public final class ScmBlockLocationProtocolClientSideTranslatorPB
implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable {

private static final Logger LOG =
LoggerFactory.getLogger(ScmBlockLocationProtocolClientSideTranslatorPB.class);

private static final double RATIS_LIMIT_FACTOR = 0.9;
private int ratisByteLimit;

/**
* RpcController is not used and hence is set to null.
*/
Expand All @@ -93,12 +104,18 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
* failover proxy provider.
*/
public ScmBlockLocationProtocolClientSideTranslatorPB(
SCMBlockLocationFailoverProxyProvider proxyProvider) {
SCMBlockLocationFailoverProxyProvider proxyProvider, OzoneConfiguration conf) {
Preconditions.checkState(proxyProvider != null);
this.failoverProxyProvider = proxyProvider;
this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create(
ScmBlockLocationProtocolPB.class, failoverProxyProvider,
failoverProxyProvider.getRetryPolicy());
int limit = (int) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR);
}

/**
Expand Down Expand Up @@ -230,18 +247,43 @@ public List<AllocatedBlock> allocateBlock(
@Override
public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
.map(BlockGroup::getProto).collect(Collectors.toList());

List<DeleteBlockGroupResult> allResults = new ArrayList<>();
List<KeyBlocks> batch = new ArrayList<>();

int serializedSize = 0;
for (BlockGroup bg : keyBlocksInfoList) {
KeyBlocks bgProto = bg.getProto();
int currSize = bgProto.getSerializedSize();
if (currSize + serializedSize > ratisByteLimit) {
allResults.addAll(submitDeleteKeyBlocks(batch));
if (LOG.isDebugEnabled()) {
LOG.debug("Sending batch of {} KeyBlocks (~{} bytes)", batch.size(), serializedSize);
}
serializedSize = 0;
batch.clear();
}
batch.add(bgProto);
serializedSize += currSize;
}

if (!batch.isEmpty()) {
allResults.addAll(submitDeleteKeyBlocks(batch));
}

return allResults;
}

private List<DeleteBlockGroupResult> submitDeleteKeyBlocks(List<KeyBlocks> batch)
throws IOException {
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
.newBuilder()
.addAllKeyBlocks(keyBlocksProto)
.addAllKeyBlocks(batch)
.build();

SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.DeleteScmKeyBlocks)
.setDeleteScmKeyBlocksRequest(request)
.build();

final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final DeleteScmKeyBlocksResponseProto resp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static ScmBlockLocationProtocol getScmBlockClient(
OzoneConfiguration conf) {
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
new SCMBlockLocationFailoverProxyProvider(conf));
new SCMBlockLocationFailoverProxyProvider(conf), conf);
return TracingUtil
.createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testFailover() throws Exception {
failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
failoverProxyProvider);
failoverProxyProvider, conf);
GenericTestUtils
.setLogLevel(SCMBlockLocationFailoverProxyProvider.class, Level.DEBUG);
LogCapturer logCapture = LogCapturer.captureLogs(SCMBlockLocationFailoverProxyProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testGetClusterTreeInformation() throws IOException {
failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
failoverProxyProvider);
failoverProxyProvider, conf);

InnerNode expectedInnerNode = (InnerNode) scm.getClusterMap().getNode(ROOT);
InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology();
Expand Down
Loading