Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,17 @@ public static ContainerCommandResponseProto getEchoResponse(
.build();
}

public static ContainerCommandResponseProto getGetContainerMerkleTreeResponse(
ContainerCommandRequestProto request, ByteString checksumByteString) {

ContainerProtos.GetContainerMerkleTreeResponseProto.Builder containerMerkleTree =
ContainerProtos.GetContainerMerkleTreeResponseProto.newBuilder()
.setContainerID(request.getContainerID())
.setContainerMerkleTree(checksumByteString);
return getSuccessResponseBuilder(request)
.setGetContainerMerkleTree(containerMerkleTree).build();
}

private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,40 @@ public static EchoResponseProto echo(XceiverClientSpi client, String encodedCont
return response.getEcho();
}

/**
* Gets the Container merkle tree for a container from a datanode.
* @param client - client that communicates with the container
* @param containerID - Container Id of the container
* @param encodedContainerID - Encoded token if security is enabled
*/
public static ContainerProtos.GetContainerMerkleTreeResponseProto getContainerMerkleTree(
XceiverClientSpi client, long containerID, String encodedContainerID) throws IOException {
ContainerProtos.GetContainerMerkleTreeRequestProto containerMerkleTreeRequestProto =
ContainerProtos.GetContainerMerkleTreeRequestProto
.newBuilder()
.setContainerID(containerID)
.build();
String id = client.getPipeline().getClosestNode().getUuidString();

ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetContainerMerkleTree)
.setContainerID(containerID)
.setDatanodeUuid(id)
.setGetContainerMerkleTree(containerMerkleTreeRequestProto);
if (encodedContainerID != null) {
builder.setEncodedToken(encodedContainerID);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
return response.getGetContainerMerkleTree();
}

/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
ECHO;
ECHO,
GET_CONTAINER_MERKLE_TREE;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;
package org.apache.hadoop.ozone.container;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
Expand All @@ -40,16 +40,16 @@
/**
* Wraps block and container token managers for datanode.
*/
class TokenHelper {
public class TokenHelper {

private final OzoneBlockTokenSecretManager blockTokenMgr;
private final ContainerTokenSecretManager containerTokenMgr;
private final String user;
private static final Set<AccessModeProto> MODES =
EnumSet.of(READ, WRITE, DELETE);

TokenHelper(SecurityConfig securityConfig,
SecretKeySignerClient secretKeyClient) throws IOException {
public TokenHelper(SecurityConfig securityConfig,
SecretKeySignerClient secretKeyClient) throws IOException {

boolean blockTokenEnabled = securityConfig.isBlockTokenEnabled();
boolean containerTokenEnabled = securityConfig.isContainerTokenEnabled();
Expand Down Expand Up @@ -83,19 +83,19 @@ class TokenHelper {
}
}

Token<OzoneBlockTokenIdentifier> getBlockToken(BlockID blockID, long length) {
public Token<OzoneBlockTokenIdentifier> getBlockToken(BlockID blockID, long length) {
return blockTokenMgr != null
? blockTokenMgr.generateToken(user, blockID, MODES, length)
: null;
}

Token<ContainerTokenIdentifier> getContainerToken(ContainerID containerID) {
public Token<ContainerTokenIdentifier> getContainerToken(ContainerID containerID) {
return containerTokenMgr != null
? containerTokenMgr.generateToken(user, containerID)
: null;
}

static String encode(Token<?> token) throws IOException {
public static String encode(Token<?> token) throws IOException {
return token != null ? token.encodeToUrlString() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.SortedSet;
Expand All @@ -31,6 +32,7 @@

import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.hdds.utils.SimpleStriped;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -157,6 +159,28 @@ private void write(KeyValueContainerData data, ContainerProtos.ContainerChecksum
}
}

public ByteString readChecksumFileAsBytes(KeyValueContainerData data) {
long containerID = data.getContainerID();
Lock readLock = getReadLock(containerID);
readLock.lock();
try {
File checksumFile = getContainerChecksumFile(data);

try (FileInputStream inStream = new FileInputStream(checksumFile)) {
return ByteString.readFrom(inStream);
} catch (FileNotFoundException ex) {
LOG.info("No checksum file currently exists for container {} at the path {}. Returning an empty instance.",
containerID, checksumFile, ex);
} catch (IOException ex) {
LOG.info("Error occured when reading checksum file for container {} at the path {}. " +
"Returning an empty instance.", containerID, checksumFile, ex);
}
return null;
} finally {
readLock.unlock();
}
}

/**
* This class represents the difference between our replica of a container and a peer's replica of a container.
* It summarizes the operations we need to do to reconcile our replica with the peer replica it was compared to.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.checksum;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import jakarta.annotation.Nonnull;
import org.apache.hadoop.ozone.container.TokenHelper;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.hadoop.ozone.container.TokenHelper.encode;

/**
* This class wraps necessary container-level rpc calls for container reconcilitaion.
* - GetContainerMerkleTree
*/
public class DNContainerOperationClient implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class DNContainerOperationClient implements AutoCloseable {
public class DNContainerReconciliationOperationClient implements AutoCloseable {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep this generic so that we can merge this with ECContainerOperationClient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rename it then.


private static final Logger LOG =
LoggerFactory.getLogger(DNContainerOperationClient.class);
private final TokenHelper tokenHelper;
private final XceiverClientManager xceiverClientManager;

public DNContainerOperationClient(ConfigurationSource conf,
CertificateClient certificateClient,
SecretKeySignerClient secretKeyClient) throws IOException {
this.tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.xceiverClientManager = createClientManager(conf, certificateClient);
}

@Nonnull
private static XceiverClientManager createClientManager(
ConfigurationSource conf, CertificateClient certificateClient)
throws IOException {
ClientTrustManager trustManager = null;
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
CACertificateProvider localCaCerts =
() -> HAUtils.buildCAX509List(certificateClient, conf);
CACertificateProvider remoteCacerts =
() -> HAUtils.buildCAX509List(null, conf);
trustManager = new ClientTrustManager(remoteCacerts, localCaCerts);
}
return new XceiverClientManager(conf,
new XceiverClientManager.XceiverClientManagerConfigBuilder()
.setMaxCacheSize(100).setStaleThresholdMs(10 * 1000).build(),
trustManager);
}

public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}

public ByteString getContainerMerkleTree(long containerId, DatanodeDetails dn)
throws IOException {
XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(createSingleNodePipeline(dn));
try {
String containerToken = encode(tokenHelper.getContainerToken(
ContainerID.valueOf(containerId)));
ContainerProtos.GetContainerMerkleTreeResponseProto response =
ContainerProtocolCalls.getContainerMerkleTree(xceiverClient,
containerId, containerToken);
return response.getContainerMerkleTree();
} finally {
this.xceiverClientManager.releaseClient(xceiverClient, false);
}
}

private Pipeline createSingleNodePipeline(DatanodeDetails dn) {
return Pipeline.newBuilder()
.setNodes(ImmutableList.of(dn))
.setId(PipelineID.valueOf(dn.getUuid()))
.setState(Pipeline.PipelineState.CLOSED)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE)).build();
}

@Override
public void close() throws IOException {
if (xceiverClientManager != null) {
xceiverClientManager.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ private static DNAction getAuditAction(Type cmdType) {
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
case Echo : return DNAction.ECHO;
case GetContainerMerkleTree : return DNAction.GET_CONTAINER_MERKLE_TREE;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
Expand Down Expand Up @@ -70,16 +71,18 @@ protected Handler(ConfigurationSource config, String datanodeId,
this.icrSender = icrSender;
}

@SuppressWarnings("checkstyle:ParameterNumber")
public static Handler getHandlerForContainerType(
final ContainerType containerType, final ConfigurationSource config,
final String datanodeId, final ContainerSet contSet,
final VolumeSet volumeSet, final ContainerMetrics metrics,
IncrementalReportSender<Container> icrSender) {
IncrementalReportSender<Container> icrSender,
DNContainerOperationClient dnClient) {
switch (containerType) {
case KeyValueContainer:
return new KeyValueHandler(config,
datanodeId, contSet, volumeSet, metrics,
icrSender);
icrSender, dnClient);
default:
throw new IllegalArgumentException("Handler for ContainerType: " +
containerType + "doesn't exist.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.container.TokenHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -76,7 +77,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hadoop.ozone.container.ec.reconstruction.TokenHelper.encode;
import static org.apache.hadoop.ozone.container.TokenHelper.encode;

/**
* The Coordinator implements the main flow of reconstructing
Expand Down
Loading