diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 14fb0a40cd00..1bfdfd34c46b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -462,4 +462,12 @@ DecommissionScmResponseProto decommissionScm( String scmId) throws IOException; String getMetrics(String query) throws IOException; + + /** + * Trigger a reconcile command to datanodes for a container ID. + * + * @param containerID The ID of the container to reconcile. + * @throws IOException On error + */ + void reconcileContainer(long containerID) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java index 5a81f6bb47a1..a239cbfdba96 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hdds.scm.container; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import java.io.IOException; import java.util.UUID; /** @@ -35,6 +40,8 @@ public final class ContainerReplicaInfo { private long keyCount; private long bytesUsed; private int replicaIndex = -1; + @JsonSerialize(using = LongToHexJsonSerializer.class) + private long dataChecksum; public static ContainerReplicaInfo fromProto( HddsProtos.SCMContainerReplicaProto proto) { @@ -48,7 +55,8 @@ public static ContainerReplicaInfo fromProto( .setKeyCount(proto.getKeyCount()) .setBytesUsed(proto.getBytesUsed()) .setReplicaIndex( - proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1); + proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1) + .setDataChecksum(proto.getDataChecksum()); return builder.build(); } @@ -87,6 +95,17 @@ public int getReplicaIndex() { return replicaIndex; } + public long getDataChecksum() { + return dataChecksum; + } + + private static class LongToHexJsonSerializer extends JsonSerializer { + @Override + public void serialize(Long value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeString(Long.toHexString(value)); + } + } + /** * Builder for ContainerReplicaInfo class. */ @@ -134,6 +153,11 @@ public Builder setReplicaIndex(int replicaIndex) { return this; } + public Builder setDataChecksum(long dataChecksum) { + subject.dataChecksum = dataChecksum; + return this; + } + public ContainerReplicaInfo build() { return subject; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index 1cebd3296e34..fad6feca0be7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -139,6 +139,7 @@ public enum ResultCodes { CA_ROTATION_IN_PROGRESS, CA_ROTATION_IN_POST_PROGRESS, CONTAINER_ALREADY_CLOSED, - CONTAINER_ALREADY_CLOSING + CONTAINER_ALREADY_CLOSING, + UNSUPPORTED_OPERATION } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index df8ed02cf7f0..e53bd73eb0fe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -483,4 +483,12 @@ DecommissionScmResponseProto decommissionScm( String scmId) throws IOException; String getMetrics(String query) throws IOException; + + /** + * Trigger a reconcile command to datanodes for the current container ID. + * + * @param containerID The ID of the container to reconcile. + * @throws IOException On error + */ + void reconcileContainer(long containerID) throws IOException; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index b89ecff48c90..22a7408642b5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -194,21 +194,21 @@ public static synchronized DatanodeDetails readDatanodeDetailsFrom(File path) * Verify that the checksum stored in containerData is equal to the * computed checksum. */ - public static void verifyChecksum(ContainerData containerData, + public static void verifyContainerFileChecksum(ContainerData containerData, ConfigurationSource conf) throws IOException { boolean enabled = conf.getBoolean( HddsConfigKeys.HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED, HddsConfigKeys. HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT); if (enabled) { - String storedChecksum = containerData.getChecksum(); + String storedChecksum = containerData.getContainerFileChecksum(); Yaml yaml = ContainerDataYaml.getYamlForContainerType( containerData.getContainerType(), containerData instanceof KeyValueContainerData && ((KeyValueContainerData)containerData).getReplicaIndex() > 0); - containerData.computeAndSetChecksum(yaml); - String computedChecksum = containerData.getChecksum(); + containerData.computeAndSetContainerFileChecksum(yaml); + String computedChecksum = containerData.getContainerFileChecksum(); if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) { throw new StorageContainerException("Container checksum error for " + @@ -225,7 +225,7 @@ public static void verifyChecksum(ContainerData containerData, * @param containerDataYamlStr ContainerData as a Yaml String * @return Checksum of the container data */ - public static String getChecksum(String containerDataYamlStr) + public static String getContainerFileChecksum(String containerDataYamlStr) throws StorageContainerException { MessageDigest sha; try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index 3c202ba60a8a..4e3f2a7d53be 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -99,8 +99,12 @@ public abstract class ContainerData { private HddsVolume volume; + // Checksum of just the container file. private String checksum; + // Checksum of the data within the container. + private long dataChecksum; + private boolean isEmpty; private int replicaIndex; @@ -112,7 +116,7 @@ public abstract class ContainerData { private transient Optional lastDataScanTime = Optional.empty(); public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8; - private static final String DUMMY_CHECKSUM = new String(new byte[64], + private static final String ZERO_CHECKSUM = new String(new byte[64], CHARSET_ENCODING); // Common Fields need to be stored in .container file. @@ -159,7 +163,8 @@ protected ContainerData(ContainerType type, long containerId, this.originPipelineId = originPipelineId; this.originNodeId = originNodeId; this.isEmpty = false; - setChecksumTo0ByteArray(); + this.checksum = ZERO_CHECKSUM; + this.dataChecksum = 0; } protected ContainerData(ContainerData source) { @@ -571,15 +576,11 @@ public void setBlockCount(long count) { this.blockCount.set(count); } - public void setChecksumTo0ByteArray() { - this.checksum = DUMMY_CHECKSUM; - } - - public void setChecksum(String checkSum) { + public void setContainerFileChecksum(String checkSum) { this.checksum = checkSum; } - public String getChecksum() { + public String getContainerFileChecksum() { return this.checksum; } @@ -630,21 +631,29 @@ public String getOriginNodeId() { * * Checksum of ContainerData is calculated by setting the * {@link ContainerData#checksum} field to a 64-byte array with all 0's - - * {@link ContainerData#DUMMY_CHECKSUM}. After the checksum is calculated, + * {@link ContainerData#ZERO_CHECKSUM}. After the checksum is calculated, * the checksum field is updated with this value. * * @param yaml Yaml for ContainerType to get the ContainerData as Yaml String * @throws IOException */ - public void computeAndSetChecksum(Yaml yaml) throws IOException { + public void computeAndSetContainerFileChecksum(Yaml yaml) throws IOException { // Set checksum to dummy value - 0 byte array, to calculate the checksum // of rest of the data. - setChecksumTo0ByteArray(); + this.checksum = ZERO_CHECKSUM; // Dump yaml data into a string to compute its checksum String containerDataYamlStr = yaml.dump(this); - this.checksum = ContainerUtils.getChecksum(containerDataYamlStr); + this.checksum = ContainerUtils.getContainerFileChecksum(containerDataYamlStr); + } + + public void setDataChecksum(long dataChecksum) { + this.dataChecksum = dataChecksum; + } + + public long getDataChecksum() { + return dataChecksum; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java index a4750b5fae01..140a462676b0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java @@ -98,7 +98,7 @@ public static void createContainerFile(ContainerType containerType, // Create Yaml for given container type Yaml yaml = getYamlForContainerType(containerType, withReplicaIndex); // Compute Checksum and update ContainerData - containerData.computeAndSetChecksum(yaml); + containerData.computeAndSetContainerFileChecksum(yaml); // Write the ContainerData with checksum to Yaml file. out = new FileOutputStream( @@ -312,7 +312,7 @@ public Object construct(Node node) { kvData.setChunksPath((String) nodes.get(OzoneConsts.CHUNKS_PATH)); Map meta = (Map) nodes.get(OzoneConsts.METADATA); kvData.setMetadata(meta); - kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM)); + kvData.setContainerFileChecksum((String) nodes.get(OzoneConsts.CHECKSUM)); Long timestamp = (Long) nodes.get(OzoneConsts.DATA_SCAN_TIMESTAMP); kvData.setDataScanTimestamp(timestamp); String state = (String) nodes.get(OzoneConsts.STATE); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 2ffb9d30d1f4..179274f2c024 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; @@ -192,6 +194,13 @@ public abstract void closeContainer(Container container) public abstract void deleteContainer(Container container, boolean force) throws IOException; + /** + * Triggers reconciliation of this container replica's data with its peers. + * @param container container to be reconciled. + * @param peers The other datanodes with a copy of this container whose data should be checked. + */ + public abstract void reconcileContainer(Container container, List peers) throws IOException; + /** * Deletes the given files associated with a block of the container. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 967714405491..9292dba5fdd7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -56,6 +56,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconcileContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler; @@ -258,6 +259,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, supervisor::nodeStateUpdated)) .addHandler(new FinalizeNewLayoutVersionCommandHandler()) .addHandler(new RefreshVolumeUsageCommandHandler()) + .addHandler(new ReconcileContainerCommandHandler(threadNamePrefix)) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java new file mode 100644 index 000000000000..9a4110c7dfcb --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Handles commands from SCM to reconcile a container replica on this datanode with the replicas on its peers. + */ +public class ReconcileContainerCommandHandler implements CommandHandler { + private static final Logger LOG = + LoggerFactory.getLogger(ReconcileContainerCommandHandler.class); + + private final AtomicLong invocationCount; + private final AtomicInteger queuedCount; + private final ExecutorService executor; + private long totalTime; + + public ReconcileContainerCommandHandler(String threadNamePrefix) { + invocationCount = new AtomicLong(0); + queuedCount = new AtomicInteger(0); + // TODO Allow configurable thread pool size with a default value when the implementation is ready. + executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d") + .build()); + totalTime = 0; + } + + @Override + public void handle(SCMCommand command, OzoneContainer container, StateContext context, + SCMConnectionManager connectionManager) { + queuedCount.incrementAndGet(); + CompletableFuture.runAsync(() -> { + invocationCount.incrementAndGet(); + long startTime = Time.monotonicNow(); + ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) command; + LOG.info("Processing reconcile container command for container {} with peers {}", + reconcileCommand.getContainerID(), reconcileCommand.getPeerDatanodes()); + try { + container.getController().reconcileContainer(reconcileCommand.getContainerID(), + reconcileCommand.getPeerDatanodes()); + } catch (IOException ex) { + LOG.error("Failed to reconcile container {}.", reconcileCommand.getContainerID(), ex); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet()); + } + + @Override + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.reconcileContainerCommand; + } + + @Override + public int getInvocationCount() { + return (int)invocationCount.get(); + } + + @Override + public long getAverageRunTime() { + if (invocationCount.get() > 0) { + return totalTime / invocationCount.get(); + } + return 0; + } + + @Override + public long getTotalRunTime() { + return totalTime; + } + + @Override + public int getQueuedCount() { + return queuedCount.get(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 44f0eae49ead..b6ab4748fe30 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -55,6 +55,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; @@ -416,6 +417,11 @@ private void processResponse(SCMHeartbeatResponseProto response, commandResponseProto.getRefreshVolumeUsageCommandProto()); processCommonCommand(commandResponseProto, refreshVolumeUsageCommand); break; + case reconcileContainerCommand: + ReconcileContainerCommand reconcileContainerCommand = + ReconcileContainerCommand.getFromProtobuf(commandResponseProto.getReconcileContainerCommandProto()); + processCommonCommand(commandResponseProto, reconcileContainerCommand); + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCommandType().name()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 838818266757..de43a29d9f6d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -885,7 +885,8 @@ public ContainerReplicaProto getContainerReport() .setDeleteTransactionId(containerData.getDeleteTransactionId()) .setBlockCommitSequenceId(containerData.getBlockCommitSequenceId()) .setOriginNodeId(containerData.getOriginNodeId()) - .setIsEmpty(containerData.isEmpty()); + .setIsEmpty(containerData.isEmpty()) + .setDataChecksum(containerData.getDataChecksum()); return ciBuilder.build(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index 70539111fb99..f462510bf747 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -177,7 +177,7 @@ private ScanResult checkContainerFile(File containerFile) { .checkState(onDiskContainerData != null, "Container File not loaded"); try { - ContainerUtils.verifyChecksum(onDiskContainerData, checkConfig); + ContainerUtils.verifyContainerFileChecksum(onDiskContainerData, checkConfig); } catch (IOException ex) { return ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE, containerFile, ex); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index e575a93de270..2843fe3bcff4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; @@ -56,6 +57,8 @@ import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumByteBuffer; +import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.common.utils.BufferUtils; @@ -1160,6 +1163,23 @@ public void deleteContainer(Container container, boolean force) deleteInternal(container, force); } + @Override + public void reconcileContainer(Container container, List peers) throws IOException { + // TODO Just a deterministic placeholder hash for testing until actual implementation is finished. + ContainerData data = container.getContainerData(); + long id = data.getContainerID(); + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES) + .putLong(id) + .asReadOnlyBuffer(); + byteBuffer.rewind(); + ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl(); + checksumImpl.update(byteBuffer); + long dataChecksum = checksumImpl.getValue(); + LOG.info("Generated data checksum of container {} for testing: {}", id, dataChecksum); + data.setDataChecksum(dataChecksum); + sendICR(container); + } + /** * Called by BlockDeletingService to delete all the chunks in a block * before proceeding to delete the block info from DB. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index 90ee356ab59d..53def2586090 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -210,7 +210,7 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData, long containerID = kvContainerData.getContainerID(); // Verify Checksum - ContainerUtils.verifyChecksum(kvContainerData, config); + ContainerUtils.verifyContainerFileChecksum(kvContainerData, config); if (kvContainerData.getSchemaVersion() == null) { // If this container has not specified a schema version, it is in the old diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index feb580538747..feb86f351975 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.container.ozoneimpl; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto .ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -38,6 +39,7 @@ import java.io.OutputStream; import java.time.Instant; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -188,6 +190,15 @@ public void deleteContainer(final long containerId, boolean force) } } + public void reconcileContainer(long containerID, List peers) throws IOException { + Container container = containerSet.getContainer(containerID); + if (container == null) { + LOG.warn("Container {} to reconcile not found on this datanode.", containerID); + } else { + getHandler(container).reconcileContainer(container, peers); + } + } + /** * Given a container, returns its handler instance. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index f20094079c9e..9e5b5dbdabd5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -122,7 +122,7 @@ public void importContainer(long containerID, Path tarFilePath, packer.unpackContainerDescriptor(input); containerData = getKeyValueContainerData(containerDescriptorYaml); } - ContainerUtils.verifyChecksum(containerData, conf); + ContainerUtils.verifyContainerFileChecksum(containerData, conf); containerData.setVolume(targetVolume); try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java new file mode 100644 index 000000000000..cdd4522cc691 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto; + +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; + +/** + * Asks datanodes to reconcile the specified container with other container replicas. + */ +public class ReconcileContainerCommand extends SCMCommand { + + private final List peerDatanodes; + + public ReconcileContainerCommand(long containerID, List peerDatanodes) { + // Container ID serves as command ID, since only one reconciliation should be in progress at a time. + super(containerID); + this.peerDatanodes = peerDatanodes; + } + + + @Override + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.reconcileContainerCommand; + } + + @Override + public ReconcileContainerCommandProto getProto() { + ReconcileContainerCommandProto.Builder builder = ReconcileContainerCommandProto.newBuilder() + .setContainerID(getId()); + for (DatanodeDetails dd : peerDatanodes) { + builder.addPeers(dd.getProtoBufMessage()); + } + return builder.build(); + } + + public List getPeerDatanodes() { + return peerDatanodes; + } + + public long getContainerID() { + return getId(); + } + + public static ReconcileContainerCommand getFromProtobuf(ReconcileContainerCommandProto protoMessage) { + Preconditions.checkNotNull(protoMessage); + + List peers = protoMessage.getPeersList(); + List peerNodes = !peers.isEmpty() + ? peers.stream() + .map(DatanodeDetails::getFromProtoBuf) + .collect(Collectors.toList()) + : emptyList(); + + return new ReconcileContainerCommand(protoMessage.getContainerID(), peerNodes); + } + + @Override + public String toString() { + return getType() + + ": containerId=" + getContainerID() + + ", peerNodes=" + peerDatanodes; + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java index b3b0f5b43771..4360243f0be8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java @@ -84,6 +84,7 @@ public void testKeyValueData(ContainerTestVersionInfo versionInfo) { assertEquals(val.get(), kvData.getBlockCount()); assertEquals(val.get(), kvData.getNumPendingDeletionBlocks()); assertEquals(MAXSIZE, kvData.getMaxSize()); + assertEquals(0, kvData.getDataChecksum()); kvData.setState(state); kvData.setContainerDBType(containerDBType); @@ -98,6 +99,8 @@ public void testKeyValueData(ContainerTestVersionInfo versionInfo) { kvData.incrPendingDeletionBlocks(1); kvData.setSchemaVersion( VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf)); + long expectedDataHash = 1234L; + kvData.setDataChecksum(expectedDataHash); assertEquals(state, kvData.getState()); assertEquals(containerDBType, kvData.getContainerDBType()); @@ -114,6 +117,7 @@ public void testKeyValueData(ContainerTestVersionInfo versionInfo) { assertEquals(datanodeId.toString(), kvData.getOriginNodeId()); assertEquals(VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf), kvData.getSchemaVersion()); + assertEquals(expectedDataHash, kvData.getDataChecksum()); KeyValueContainerData newKvData = new KeyValueContainerData(kvData); assertEquals(kvData.getReplicaIndex(), newKvData.getReplicaIndex()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java index 2235b23ce882..ad5ca482189b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java @@ -614,7 +614,7 @@ private KeyValueContainerData newKvData() throws IOException { Yaml yaml = ContainerDataYaml.getYamlForContainerType( kvData.getContainerType(), kvData.getReplicaIndex() > 0); - kvData.computeAndSetChecksum(yaml); + kvData.computeAndSetContainerFileChecksum(yaml); KeyValueContainerUtil.parseKVContainerData(kvData, conf); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java index ec78398824e7..2057c4400a45 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java @@ -87,6 +87,7 @@ private File createContainerFile(long containerID, int replicaIndex) keyValueContainerData.setSchemaVersion( VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion()); keyValueContainerData.setReplicaIndex(replicaIndex); + keyValueContainerData.setDataChecksum(12345); File containerFile = new File(testRoot, containerPath); @@ -218,7 +219,7 @@ void testCheckBackWardCompatibilityOfContainerFile( File file = new File(classLoader.getResource(containerFile).getFile()); KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml .readContainerFile(file); - ContainerUtils.verifyChecksum(kvData, conf); + ContainerUtils.verifyContainerFileChecksum(kvData, conf); //Checking the Container file data is consistent or not assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, kvData @@ -236,7 +237,7 @@ void testCheckBackWardCompatibilityOfContainerFile( } /** - * Test to verify {@link ContainerUtils#verifyChecksum(ContainerData,ConfigurationSource)}. + * Test to verify {@link ContainerUtils#verifyContainerFileChecksum(ContainerData,ConfigurationSource)}. */ @ContainerLayoutTestInfo.ContainerTest public void testChecksumInContainerFile(ContainerLayoutVersion layout) throws IOException { @@ -247,13 +248,32 @@ public void testChecksumInContainerFile(ContainerLayoutVersion layout) throws IO // Read from .container file, and verify data. KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml.readContainerFile(containerFile); - ContainerUtils.verifyChecksum(kvData, conf); + ContainerUtils.verifyContainerFileChecksum(kvData, conf); cleanup(); } /** - * Test to verify {@link ContainerUtils#verifyChecksum(ContainerData,ConfigurationSource)}. + * The container's data checksum is stored in a separate file with its Merkle hash tree. It should not be persisted + * to the .container file. + */ + @ContainerLayoutTestInfo.ContainerTest + public void testDataChecksumNotInContainerFile(ContainerLayoutVersion layout) throws IOException { + setLayoutVersion(layout); + long containerID = testContainerID++; + + File containerFile = createContainerFile(containerID, 0); + + // Read from .container file. The kvData object should not have a data hash because it was not persisted in this + // file. + KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml.readContainerFile(containerFile); + assertEquals(0, kvData.getDataChecksum()); + + cleanup(); + } + + /** + * Test to verify {@link ContainerUtils#verifyContainerFileChecksum(ContainerData,ConfigurationSource)}. */ @ContainerLayoutTestInfo.ContainerTest public void testChecksumInContainerFileWithReplicaIndex( @@ -266,7 +286,7 @@ public void testChecksumInContainerFileWithReplicaIndex( // Read from .container file, and verify data. KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml .readContainerFile(containerFile); - ContainerUtils.verifyChecksum(kvData, conf); + ContainerUtils.verifyContainerFileChecksum(kvData, conf); cleanup(); } @@ -287,7 +307,7 @@ public void testIncorrectChecksum(ContainerLayoutVersion layout) { setLayoutVersion(layout); Exception ex = assertThrows(Exception.class, () -> { KeyValueContainerData kvData = getKeyValueContainerData(); - ContainerUtils.verifyChecksum(kvData, conf); + ContainerUtils.verifyContainerFileChecksum(kvData, conf); }); assertThat(ex).hasMessageStartingWith("Container checksum error for ContainerID:"); @@ -303,6 +323,6 @@ public void testDisabledChecksum(ContainerLayoutVersion layout) KeyValueContainerData kvData = getKeyValueContainerData(); conf.setBoolean(HddsConfigKeys. HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED, false); - ContainerUtils.verifyChecksum(kvData, conf); + ContainerUtils.verifyContainerFileChecksum(kvData, conf); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index 7f2cdcc6e532..d337b3a5f25a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -65,6 +65,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.ozone.test.LambdaTestUtils; @@ -707,6 +708,7 @@ public void testCommandQueueSummary() throws IOException { ctx.addCommand(ReplicateContainerCommand.forTest(3)); ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId())); ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId())); + ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptyList())); Map summary = ctx.getCommandQueueSummary(); assertEquals(3, @@ -715,6 +717,8 @@ public void testCommandQueueSummary() throws IOException { summary.get(SCMCommandProto.Type.closePipelineCommand).intValue()); assertEquals(1, summary.get(SCMCommandProto.Type.closeContainerCommand).intValue()); + assertEquals(1, + summary.get(SCMCommandProto.Type.reconcileContainerCommand).intValue()); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java new file mode 100644 index 000000000000..d6be667f41bc --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.container.common.ContainerTestUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; +import org.apache.ozone.test.GenericTestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.singletonMap; +import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; +import static org.apache.hadoop.ozone.OzoneConsts.GB; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests Datanode handling of reconcile container commands. + */ +public class TestReconcileContainerCommandHandler { + public static final Logger LOG = LoggerFactory.getLogger(TestReconcileContainerCommandHandler.class); + + private static final int NUM_CONTAINERS = 3; + + private ContainerSet containerSet; + private OzoneContainer ozoneContainer; + private StateContext context; + private ReconcileContainerCommandHandler subject; + + public void init(ContainerLayoutVersion layout, IncrementalReportSender icrSender) + throws Exception { + + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeDetails dnDetails = randomDatanodeDetails(); + subject = new ReconcileContainerCommandHandler(""); + context = ContainerTestUtils.getMockContext(dnDetails, conf); + + containerSet = new ContainerSet(1000); + for (int id = 1; id <= NUM_CONTAINERS; id++) { + KeyValueContainerData data = new KeyValueContainerData(id, layout, GB, + PipelineID.randomId().toString(), randomDatanodeDetails().getUuidString()); + containerSet.addContainer(new KeyValueContainer(data, conf)); + } + + assertEquals(NUM_CONTAINERS, containerSet.containerCount()); + + Handler containerHandler = new KeyValueHandler(new OzoneConfiguration(), dnDetails.getUuidString(), containerSet, + mock(VolumeSet.class), mock(ContainerMetrics.class), icrSender); + ContainerController controller = new ContainerController(containerSet, + singletonMap(ContainerProtos.ContainerType.KeyValueContainer, containerHandler)); + ozoneContainer = mock(OzoneContainer.class); + when(ozoneContainer.getController()).thenReturn(controller); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + } + + @ContainerLayoutTestInfo.ContainerTest + public void testReconcileContainerCommandReports(ContainerLayoutVersion layout) throws Exception { + Map containerReportsSent = new HashMap<>(); + IncrementalReportSender icrSender = c -> { + try { + ContainerID id = ContainerID.valueOf(c.getContainerData().getContainerID()); + containerReportsSent.put(id, c.getContainerReport()); + LOG.info("Added container report for container {}", id); + } catch (Exception ex) { + LOG.error("ICR sender failed", ex); + } + }; + init(layout, icrSender); + + for (int id = 1; id <= NUM_CONTAINERS; id++) { + ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, Collections.emptyList()); + subject.handle(cmd, ozoneContainer, context, null); + } + + // An unknown container should not trigger a container report being sent. + ReconcileContainerCommand unknownContainerCmd = new ReconcileContainerCommand(NUM_CONTAINERS + 1, + Collections.emptyList()); + subject.handle(unknownContainerCmd, ozoneContainer, context, null); + + waitForAllCommandsToFinish(); + verifyAllContainerReports(containerReportsSent); + } + + @ContainerLayoutTestInfo.ContainerTest + public void testReconcileContainerCommandMetrics(ContainerLayoutVersion layout) throws Exception { + // Used to block ICR sending so that queue metrics can be checked before the reconcile task completes. + CountDownLatch icrLatch = new CountDownLatch(1); + // Wait this long before completing the task. + // This provides a lower bound on execution time. + final long minExecTimeMillis = 500; + // This is the lower bound on execution time of all the commands combined. + final long expectedTotalMinExecTimeMillis = minExecTimeMillis * NUM_CONTAINERS; + + IncrementalReportSender icrSender = c -> { + try { + // Block the caller until the latch is counted down. + // Caller can check queue metrics in the meantime. + LOG.info("ICR sender waiting for latch"); + assertTrue(icrLatch.await(30, TimeUnit.SECONDS)); + LOG.info("ICR sender proceeding after latch"); + + Thread.sleep(minExecTimeMillis); + } catch (Exception ex) { + LOG.error("ICR sender failed", ex); + } + }; + + init(layout, icrSender); + + // All commands submitted will be blocked until the latch is counted down. + for (int id = 1; id <= NUM_CONTAINERS; id++) { + ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, Collections.emptyList()); + subject.handle(cmd, ozoneContainer, context, null); + } + assertEquals(NUM_CONTAINERS, subject.getQueuedCount()); + assertEquals(0, subject.getTotalRunTime()); + assertEquals(0, subject.getAverageRunTime()); + + // This will resume handling of the tasks. + icrLatch.countDown(); + waitForAllCommandsToFinish(); + + assertEquals(NUM_CONTAINERS, subject.getInvocationCount()); + long totalRunTime = subject.getTotalRunTime(); + assertTrue(totalRunTime >= expectedTotalMinExecTimeMillis, + "Total run time " + totalRunTime + "ms was not larger than the minimum total exec time " + + expectedTotalMinExecTimeMillis + "ms"); + long avgRunTime = subject.getAverageRunTime(); + assertTrue(avgRunTime >= minExecTimeMillis, + "Average run time " + avgRunTime + "ms was not larger than the minimum per task exec time " + + minExecTimeMillis + "ms"); + } + + private void waitForAllCommandsToFinish() throws Exception { + // Queue count should be decremented only after the task completes, so the other metrics should be consistent when + // it reaches zero. + GenericTestUtils.waitFor(() -> { + int qCount = subject.getQueuedCount(); + LOG.info("Waiting for queued command count to reach 0. Currently at " + qCount); + return qCount == 0; + }, 500, 3000); + } + + private void verifyAllContainerReports(Map reportsSent) throws Exception { + assertEquals(NUM_CONTAINERS, reportsSent.size()); + + for (Map.Entry entry: reportsSent.entrySet()) { + ContainerID id = entry.getKey(); + assertNotNull(containerSet.getContainer(id.getId())); + + long sentDataChecksum = entry.getValue().getDataChecksum(); + // Current implementation is incomplete, and uses a mocked checksum. + assertNotEquals(0, sentDataChecksum, "Report of container " + id + + " should have a non-zero checksum"); + } + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 7d8b94e57d35..6245489f13b9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.states.endpoint; import static java.util.Collections.emptyList; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconcileContainerCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand; import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -57,6 +58,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB; @@ -111,6 +113,42 @@ public void handlesReconstructContainerCommand() throws Exception { .get(reconstructECContainersCommand).intValue()); } + @Test + public void testHandlesReconcileContainerCommand() throws Exception { + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + mock(StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + + List peerDNs = new ArrayList<>(); + peerDNs.add(MockDatanodeDetails.randomDatanodeDetails()); + peerDNs.add(MockDatanodeDetails.randomDatanodeDetails()); + ReconcileContainerCommand cmd = new ReconcileContainerCommand(1, peerDNs); + + when(scm.sendHeartbeat(any())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .addCommands(SCMCommandProto.newBuilder() + .setCommandType(reconcileContainerCommand) + .setReconcileContainerCommandProto(cmd.getProto()) + .build()) + .build()); + + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeStateMachine datanodeStateMachine = mock(DatanodeStateMachine.class); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + datanodeStateMachine, ""); + + // WHEN + HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm); + task.call(); + + // THEN + assertEquals(1, context.getCommandQueueSummary() + .get(reconcileContainerCommand).intValue()); + } + @Test public void testheartbeatWithoutReports() throws Exception { final long termInSCM = 42; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index b9c8feae16ce..2ce6eabe3944 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -37,7 +37,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; @@ -56,7 +58,9 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; +import static org.apache.hadoop.ozone.OzoneConsts.GB; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -64,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.any; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -433,6 +438,38 @@ public void testDeleteContainer() throws IOException { } } + @ContainerLayoutTestInfo.ContainerTest + public void testReconcileContainer(ContainerLayoutVersion layoutVersion) throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + KeyValueContainerData data = new KeyValueContainerData(123L, layoutVersion, GB, + PipelineID.randomId().toString(), randomDatanodeDetails().getUuidString()); + + Container container = new KeyValueContainer(data, conf); + ContainerSet containerSet = new ContainerSet(1000); + containerSet.addContainer(container); + + // Allows checking the invocation count of the lambda. + AtomicInteger icrCount = new AtomicInteger(0); + KeyValueHandler keyValueHandler = new KeyValueHandler(conf, randomDatanodeDetails().getUuidString(), containerSet, + mock(MutableVolumeSet.class), mock(ContainerMetrics.class), c -> { + // Check that the ICR contains expected info about the container. + ContainerReplicaProto report = c.getContainerReport(); + long reportedID = report.getContainerID(); + Assertions.assertEquals(container.getContainerData().getContainerID(), reportedID); + + long reportDataChecksum = report.getDataChecksum(); + Assertions.assertNotEquals(0, reportDataChecksum, + "Container report should have populated the checksum field with a non-zero value."); + icrCount.incrementAndGet(); + }); + + Assertions.assertEquals(0, icrCount.get()); + // This should trigger container report validation in the ICR handler above. + keyValueHandler.reconcileContainer(container, Collections.emptyList()); + Assertions.assertEquals(1, icrCount.get()); + } + private static ContainerCommandRequestProto createContainerRequest( String datanodeId, long containerID) { return ContainerCommandRequestProto.newBuilder() diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java index 1b989e6bc7ff..6680a467b12b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java @@ -56,7 +56,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -152,8 +151,7 @@ public void testInconsistentChecksumContainerShouldThrowError() throws Exception KeyValueContainerData containerData = spy(new KeyValueContainerData(containerId, ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test")); // mock to return different checksum - when(containerData.getChecksum()).thenReturn("checksum1", "checksum2"); - doNothing().when(containerData).setChecksumTo0ByteArray(); + when(containerData.getContainerFileChecksum()).thenReturn("checksum1", "checksum2"); // create containerImporter object ContainerController controllerMock = mock(ContainerController.class); ContainerSet containerSet = new ContainerSet(0); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 3570257b5855..745790abc829 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -103,6 +103,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ResetDeletedBlockRetryCountRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type; import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.ScmInfo; @@ -1198,4 +1199,13 @@ public String getMetrics(String query) throws IOException { String metricsJsonStr = response.getMetricsJson(); return metricsJsonStr; } + + @Override + public void reconcileContainer(long containerID) throws IOException { + ReconcileContainerRequestProto request = ReconcileContainerRequestProto.newBuilder() + .setContainerID(containerID) + .build(); + // TODO check error handling. + submitRequest(Type.ReconcileContainer, builder -> builder.setReconcileContainerRequest(request)); + } } diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index c190dc3f4517..ea63b82c8c63 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -84,6 +84,7 @@ message ScmContainerLocationRequest { optional SingleNodeQueryRequestProto singleNodeQueryRequest = 45; optional GetContainersOnDecomNodeRequestProto getContainersOnDecomNodeRequest = 46; optional GetMetricsRequestProto getMetricsRequest = 47; + optional ReconcileContainerRequestProto reconcileContainerRequest = 48; } message ScmContainerLocationResponse { @@ -139,6 +140,7 @@ message ScmContainerLocationResponse { optional SingleNodeQueryResponseProto singleNodeQueryResponse = 45; optional GetContainersOnDecomNodeResponseProto getContainersOnDecomNodeResponse = 46; optional GetMetricsResponseProto getMetricsResponse = 47; + optional ReconcileContainerResponseProto reconcileContainerResponse = 48; enum Status { OK = 1; @@ -193,6 +195,7 @@ enum Type { SingleNodeQuery = 41; GetContainersOnDecomNode = 42; GetMetrics = 43; + ReconcileContainer = 44; } /** @@ -637,6 +640,13 @@ message GetMetricsResponseProto { optional string metricsJson = 1; } +message ReconcileContainerRequestProto { + required int64 containerID = 1; +} + +message ReconcileContainerResponseProto { +} + /** * Protocol used from an HDFS node to StorageContainerManager. See the request * and response messages for details of the RPC calls. diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 405845312357..93cdbf046234 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -432,6 +432,7 @@ message SCMContainerReplicaProto { required int64 keyCount = 6; required int64 bytesUsed = 7; optional int64 replicaIndex = 8; + optional int64 dataChecksum = 9; } message KeyContainerIDList { diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 2994073c0240..d5600880e66f 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -230,12 +230,13 @@ message ContainerReplicaProto { optional int64 writeCount = 7; optional int64 readBytes = 8; optional int64 writeBytes = 9; - optional string finalhash = 10; + optional string finalhash = 10 [ deprecated = true ]; optional int64 deleteTransactionId = 11; optional uint64 blockCommitSequenceId = 12; optional string originNodeId = 13; optional int32 replicaIndex = 14; optional bool isEmpty = 15 [default = false]; + optional int64 dataChecksum = 16; } message CommandStatusReportsProto { @@ -328,6 +329,7 @@ message SCMCommandProto { finalizeNewLayoutVersionCommand = 9; refreshVolumeUsageInfo = 10; reconstructECContainersCommand = 11; + reconcileContainerCommand = 12; } // TODO: once we start using protoc 3.x, refactor this message using "oneof" required Type commandType = 1; @@ -343,6 +345,7 @@ message SCMCommandProto { finalizeNewLayoutVersionCommandProto = 10; optional RefreshVolumeUsageCommandProto refreshVolumeUsageCommandProto = 11; optional ReconstructECContainersCommandProto reconstructECContainersCommandProto = 12; + optional ReconcileContainerCommandProto reconcileContainerCommandProto = 13; // If running upon Ratis, holds term of underlying RaftServer iff current @@ -499,6 +502,15 @@ message FinalizeNewLayoutVersionCommandProto { required int64 cmdId = 3; } +/** +This command asks the datanode to reconcile its copy of a container with its peer datanodes that also have a copy of +the container. +*/ +message ReconcileContainerCommandProto { + required int64 containerID = 1; + repeated DatanodeDetailsProto peers = 2; +} + /** * Protocol used from a datanode to StorageContainerManager. * diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto index 3d281975f2b4..9739f06b4b02 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto @@ -139,6 +139,7 @@ enum Status { CA_ROTATION_IN_POST_PROGRESS = 44; CONTAINER_ALREADY_CLOSED = 45; CONTAINER_ALREADY_CLOSING = 46; + UNSUPPORTED_OPERATION = 47; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index 7e163ac306f8..db00d6842d28 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -381,6 +381,7 @@ private void updateContainerReplica(final DatanodeDetails datanodeDetails, .setReplicaIndex(replicaProto.getReplicaIndex()) .setBytesUsed(replicaProto.getUsed()) .setEmpty(replicaProto.getIsEmpty()) + .setDataChecksum(replicaProto.getDataChecksum()) .build(); if (replica.getState().equals(State.DELETED)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java index 78ebfd311dd1..d008e24f1c14 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java @@ -43,6 +43,7 @@ public final class ContainerReplica implements Comparable { private final long keyCount; private final long bytesUsed; private final boolean isEmpty; + private final long dataChecksum; private ContainerReplica(ContainerReplicaBuilder b) { containerID = b.containerID; @@ -54,6 +55,7 @@ private ContainerReplica(ContainerReplicaBuilder b) { replicaIndex = b.replicaIndex; isEmpty = b.isEmpty; sequenceId = b.sequenceId; + dataChecksum = b.dataChecksum; } /** @@ -114,6 +116,10 @@ public boolean isEmpty() { return isEmpty; } + public long getDataChecksum() { + return dataChecksum; + } + @Override public int hashCode() { return new HashCodeBuilder(61, 71) @@ -201,6 +207,7 @@ public static class ContainerReplicaBuilder { private long keyCount; private int replicaIndex; private boolean isEmpty; + private long dataChecksum; /** * Set Container Id. @@ -275,6 +282,11 @@ public ContainerReplicaBuilder setEmpty(boolean empty) { return this; } + public ContainerReplicaBuilder setDataChecksum(long dataChecksum) { + this.dataChecksum = dataChecksum; + return this; + } + /** * Constructs new ContainerReplicaBuilder. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java new file mode 100644 index 000000000000..f13b37f3ee23 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.reconciliation; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; + +/** + * When a reconcile container event is fired, this class will check if the container is eligible for reconciliation, + * and if so, send the reconcile request to all datanodes with a replica of that container. + */ +public class ReconcileContainerEventHandler implements EventHandler { + public static final Logger LOG = + LoggerFactory.getLogger(ReconcileContainerEventHandler.class); + + private final ContainerManager containerManager; + private final SCMContext scmContext; + + public ReconcileContainerEventHandler(ContainerManager containerManager, SCMContext scmContext) { + this.containerManager = containerManager; + this.scmContext = scmContext; + } + + @Override + public void onMessage(ContainerID containerID, EventPublisher publisher) { + if (!scmContext.isLeader()) { + LOG.info("Skip reconciling container {} since current SCM is not leader.", containerID); + return; + } + + EligibilityResult result = ReconciliationEligibilityHandler.isEligibleForReconciliation(containerID, + containerManager); + if (!result.isOk()) { + LOG.error("{}", result); + return; + } + + try { + // TODO HDDS-10714 restriction peer and target nodes based on node status. + Set allReplicaNodes = containerManager.getContainerReplicas(containerID) + .stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toSet()); + + LOG.info("Reconcile container event triggered for container {} with peers {}", containerID, allReplicaNodes); + + for (DatanodeDetails replica : allReplicaNodes) { + List otherReplicas = allReplicaNodes.stream() + .filter(other -> !other.equals(replica)) + .collect(Collectors.toList()); + ReconcileContainerCommand command = new ReconcileContainerCommand(containerID.getId(), otherReplicas); + command.setTerm(scmContext.getTermOfLeader()); + publisher.fireEvent(DATANODE_COMMAND, new CommandForDatanode<>(replica.getUuid(), command)); + } + } catch (ContainerNotFoundException ex) { + LOG.error("Failed to start reconciliation for container {}. Container not found.", containerID); + } catch (NotLeaderException nle) { + LOG.info("Skip reconciling container {} since current SCM is not leader.", containerID); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java new file mode 100644 index 000000000000..cdc08556d2ca --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.reconciliation; + + +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; + +import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Determines whether a container is eligible for reconciliation based on its state, replica states, replication + * type, and replication factor. + */ +public final class ReconciliationEligibilityHandler { + public static final Set ELIGIBLE_CONTAINER_STATES = + EnumSet.of(HddsProtos.LifeCycleState.CLOSED, HddsProtos.LifeCycleState.QUASI_CLOSED); + public static final Set ELIGIBLE_REPLICA_STATES = + EnumSet.of(State.CLOSED, State.QUASI_CLOSED, State.UNHEALTHY); + + /** + * Utility class only. + */ + private ReconciliationEligibilityHandler() { } + + public static EligibilityResult isEligibleForReconciliation( + ContainerID containerID, ContainerManager containerManager) { + ContainerInfo container; + Set replicas; + try { + container = containerManager.getContainer(containerID); + replicas = containerManager.getContainerReplicas(containerID); + } catch (ContainerNotFoundException ex) { + return new EligibilityResult(Result.CONTAINER_NOT_FOUND, + String.format("Container %s not found for reconciliation.", containerID)); + } + + if (!ELIGIBLE_CONTAINER_STATES.contains(container.getState())) { + return new EligibilityResult(Result.INELIGIBLE_CONTAINER_STATE, + String.format("Cannot reconcile container %d in state %s.", container.getContainerID(), + container.getState())); + } + + if (replicas.isEmpty()) { + return new EligibilityResult(Result.NO_REPLICAS_FOUND, + String.format("Cannot reconcile container %d because no replicas could be found.", + container.getContainerID())); + } + + boolean replicasValid = replicas.stream() + .map(ContainerReplica::getState) + .allMatch(ELIGIBLE_REPLICA_STATES::contains); + if (!replicasValid) { + return new EligibilityResult(Result.INELIGIBLE_REPLICA_STATES, + String.format("Cannot reconcile container %s in state %s with replica states: %s", containerID, + container.getState(), replicas.stream() + .map(r -> r.getState().toString()) + .collect(Collectors.joining(", ")))); + } + + // Reconcile on EC containers is not yet implemented. + ReplicationConfig repConfig = container.getReplicationConfig(); + if (repConfig.getReplicationType() != HddsProtos.ReplicationType.RATIS) { + return new EligibilityResult(Result.INELIGIBLE_REPLICATION_TYPE, + String.format("Cannot reconcile container %s with replication type %s. Reconciliation is currently only " + + "supported for Ratis containers.", containerID, repConfig.getReplicationType())); + } + + // Reconciliation requires multiple replicas to reconcile. + int requiredNodes = repConfig.getRequiredNodes(); + if (requiredNodes <= 1) { + return new EligibilityResult(Result.NOT_ENOUGH_REQUIRED_NODES, + String.format("Cannot reconcile container %s with %d required nodes. Reconciliation is only supported for " + + "containers with more than 1 required node.", containerID, repConfig.getRequiredNodes())); + } + + return new EligibilityResult(Result.OK, "Container %s is eligible for reconciliation." + containerID); + } + + /** + * Defines the reasons a container may not be eligible for reconciliation. + */ + public enum Result { + OK, + CONTAINER_NOT_FOUND, + INELIGIBLE_CONTAINER_STATE, + INELIGIBLE_REPLICA_STATES, + INELIGIBLE_REPLICATION_TYPE, + NOT_ENOUGH_REQUIRED_NODES, + NO_REPLICAS_FOUND + } + + /** + * Provides a status and message indicating whether a container is eligible for reconciliation. + */ + public static final class EligibilityResult { + private final Result result; + private final String message; + + private EligibilityResult(Result result, String message) { + this.result = result; + this.message = message; + } + + public Result getResult() { + return result; + } + + public boolean isOk() { + return result == Result.OK; + } + + @Override + public String toString() { + return message; + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java new file mode 100644 index 000000000000..fa1e355fd174 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.reconciliation; +/** + * This package contains classes related to container reconciliation. + */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 0cc205b2ffce..3b2a84f4f335 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -217,6 +217,13 @@ public final class SCMEvents { new TypedEvent<>(CRLStatusReportFromDatanode.class, "Crl_Status_Report"); + /** + * This event will be triggered whenever a datanode needs to reconcile its replica of a container with other + * replicas in the cluster. + */ + public static final TypedEvent + RECONCILE_CONTAINER = new TypedEvent<>(ContainerID.class, "Reconcile_Container"); + /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 3d7cff358fe4..b88f2c2a7d39 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; @@ -722,6 +724,12 @@ public ScmContainerLocationResponse processRequest( .setStatus(Status.OK) .setGetMetricsResponse(getMetrics(request.getGetMetricsRequest())) .build(); + case ReconcileContainer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest())) + .build(); default: throw new IllegalArgumentException( "Unknown command type: " + request.getCmdType()); @@ -1333,4 +1341,9 @@ public DecommissionScmResponseProto decommissionScm( public GetMetricsResponseProto getMetrics(GetMetricsRequestProto request) throws IOException { return GetMetricsResponseProto.newBuilder().setMetricsJson(impl.getMetrics(request.getQuery())).build(); } + + public ReconcileContainerResponseProto reconcileContainer(ReconcileContainerRequestProto request) throws IOException { + impl.reconcileContainer(request.getContainerID()); + return ReconcileContainerResponseProto.getDefaultInstance(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 47bc66d8331a..9cf44677ebc4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException; import org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler; +import org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; @@ -336,7 +338,9 @@ public List getContainerReplicas( .setPlaceOfBirth(r.getOriginDatanodeId().toString()) .setKeyCount(r.getKeyCount()) .setSequenceID(r.getSequenceId()) - .setReplicaIndex(r.getReplicaIndex()).build() + .setReplicaIndex(r.getReplicaIndex()) + .setDataChecksum(r.getDataChecksum()) + .build() ); } return results; @@ -1438,4 +1442,42 @@ public String getMetrics(String query) throws IOException { FetchMetrics fetchMetrics = new FetchMetrics(); return fetchMetrics.getMetrics(query); } + + @Override + public void reconcileContainer(long longContainerID) throws IOException { + ContainerID containerID = ContainerID.valueOf(longContainerID); + getScm().checkAdminAccess(getRemoteUser(), false); + final UserGroupInformation remoteUser = getRemoteUser(); + final Map auditMap = Maps.newHashMap(); + auditMap.put("containerID", containerID.toString()); + auditMap.put("remoteUser", remoteUser.getUserName()); + + try { + EligibilityResult result = ReconciliationEligibilityHandler.isEligibleForReconciliation(containerID, + getScm().getContainerManager()); + if (!result.isOk()) { + switch (result.getResult()) { + case OK: + break; + case CONTAINER_NOT_FOUND: + throw new ContainerNotFoundException(result.toString()); + case INELIGIBLE_CONTAINER_STATE: + throw new SCMException(result.toString(), ResultCodes.UNEXPECTED_CONTAINER_STATE); + case INELIGIBLE_REPLICA_STATES: + case INELIGIBLE_REPLICATION_TYPE: + case NOT_ENOUGH_REQUIRED_NODES: + case NO_REPLICAS_FOUND: + throw new SCMException(result.toString(), ResultCodes.UNSUPPORTED_OPERATION); + default: + throw new SCMException("Unknown reconciliation eligibility result " + result, ResultCodes.INTERNAL_ERROR); + } + } + + scm.getEventQueue().fireEvent(SCMEvents.RECONCILE_CONTAINER, containerID); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess(SCMAction.RECONCILE_CONTAINER, auditMap)); + } catch (SCMException ex) { + AUDIT.logWriteFailure(buildAuditMessageForFailure(SCMAction.RECONCILE_CONTAINER, auditMap, ex)); + throw ex; + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 3d864d4ea212..98a7aa22f3e6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -73,6 +73,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; @@ -94,6 +95,7 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconcileContainerCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.refreshVolumeUsageInfo; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand; @@ -407,6 +409,12 @@ public static SCMCommandProto getCommandResponse(SCMCommand cmd, .setRefreshVolumeUsageCommandProto( ((RefreshVolumeUsageCommand)cmd).getProto()) .build(); + case reconcileContainerCommand: + return builder + .setCommandType(reconcileContainerCommand) + .setReconcileContainerCommandProto( + ((ReconcileContainerCommand)cmd).getProto()) + .build(); default: throw new IllegalArgumentException("Scm command " + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index e86dab5fd721..33fbf5fb76de 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl; import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.container.reconciliation.ReconcileContainerEventHandler; import org.apache.hadoop.hdds.scm.container.balancer.MoveManager; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler; @@ -506,6 +507,9 @@ private void initializeEventHandlers() { CRLStatusReportHandler crlStatusReportHandler = new CRLStatusReportHandler(certificateStore, configuration); + ReconcileContainerEventHandler reconcileContainerEventHandler = + new ReconcileContainerEventHandler(containerManager, scmContext); + eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); @@ -578,6 +582,7 @@ private void initializeEventHandlers() { eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler); + eventQueue.addHandler(SCMEvents.RECONCILE_CONTAINER, reconcileContainerEventHandler); scmNodeManager.registerSendCommandNotify( SCMCommandProto.Type.deleteBlocksCommand, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index 4e1fe234ff01..2c9df2afb404 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -52,7 +52,8 @@ public enum SCMAction implements AuditAction { GET_REPLICATION_MANAGER_REPORT, RESET_DELETED_BLOCK_RETRY_COUNT, TRANSFER_LEADERSHIP, - GET_FAILED_DELETED_BLOCKS_TRANSACTION; + GET_FAILED_DELETED_BLOCKS_TRANSACTION, + RECONCILE_CONTAINER; @Override public String getAction() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 695c88d11a3c..8b77e300137b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -60,12 +60,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; @@ -100,7 +102,6 @@ void setup() throws IOException, InvalidStateTransitionException { dbStore = DBStoreBuilder.createDBStore( conf, new SCMDBDefinition()); scmhaManager = SCMHAManagerStub.getInstance(true); - nodeManager = new MockNodeManager(true, 10); pipelineManager = new MockPipelineManager(dbStore, scmhaManager, nodeManager); containerStateManager = ContainerStateManagerImpl.newBuilder() @@ -979,6 +980,153 @@ public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException, containerOne.containerID()).size()); } + @Test + public void testWithNoContainerDataChecksum() throws Exception { + final ContainerReportHandler reportHandler = new ContainerReportHandler(nodeManager, containerManager); + + final int numNodes = 3; + List datanodes = nodeManager.getNodes(NodeStatus.inServiceHealthy()).stream() + .limit(numNodes) + .collect(Collectors.toList()); + + // Create a container and put one replica on each datanode. + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + ContainerID contID = container.containerID(); + final Set containerIDSet = Stream.of(contID).collect(Collectors.toSet()); + + for (DatanodeDetails dn: datanodes) { + nodeManager.setContainers(dn, containerIDSet); + } + + containerStateManager.addContainer(container.getProtobuf()); + + getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes) + .forEach(r -> containerStateManager.updateContainerReplica(contID, r)); + + // Container manager should now be aware of 3 replicas of each container. + assertEquals(numNodes, containerManager.getContainerReplicas(contID).size()); + + // All replicas should start with an empty data checksum in SCM. + boolean contOneDataChecksumsEmpty = containerManager.getContainerReplicas(contID).stream() + .allMatch(r -> r.getDataChecksum() == 0); + assertTrue(contOneDataChecksumsEmpty, "Replicas of container one should not yet have any data checksums."); + + // Send a report to SCM from one datanode that still does not have a data checksum. + int numReportsSent = 0; + for (DatanodeDetails dn: datanodes) { + final ContainerReportsProto dnReportProto = getContainerReportsProto( + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + final ContainerReportFromDatanode dnReport = new ContainerReportFromDatanode(dn, dnReportProto); + reportHandler.onMessage(dnReport, publisher); + numReportsSent++; + } + assertEquals(numNodes, numReportsSent); + + // Regardless of which datanode sent the report, none of them have checksums, so all replica's data checksums + // should remain empty. + boolean containerDataChecksumEmpty = containerManager.getContainerReplicas(contID).stream() + .allMatch(r -> r.getDataChecksum() == 0); + assertTrue(containerDataChecksumEmpty, "Replicas of the container should not have any data checksums."); + } + + @Test + public void testWithContainerDataChecksum() throws Exception { + final ContainerReportHandler reportHandler = new ContainerReportHandler(nodeManager, containerManager); + + final int numNodes = 3; + List datanodes = nodeManager.getNodes(NodeStatus.inServiceHealthy()).stream() + .limit(numNodes) + .collect(Collectors.toList()); + + // Create a container and put one replica on each datanode. + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + ContainerID contID = container.containerID(); + final Set containerIDSet = Stream.of(contID).collect(Collectors.toSet()); + + for (DatanodeDetails dn: datanodes) { + nodeManager.setContainers(dn, containerIDSet); + } + + containerStateManager.addContainer(container.getProtobuf()); + + getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes) + .forEach(r -> containerStateManager.updateContainerReplica(contID, r)); + + // Container manager should now be aware of 3 replicas of each container. + assertEquals(numNodes, containerManager.getContainerReplicas(contID).size()); + + // All replicas should start with an empty data checksum in SCM. + boolean dataChecksumsEmpty = containerManager.getContainerReplicas(contID).stream() + .allMatch(r -> r.getDataChecksum() == 0); + assertTrue(dataChecksumsEmpty, "Replicas of container one should not yet have any data checksums."); + + // For each datanode, send a container report with a mismatched checksum. + for (DatanodeDetails dn: datanodes) { + ContainerReportsProto dnReportProto = getContainerReportsProto( + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + ContainerReplicaProto replicaWithChecksum = dnReportProto.getReports(0).toBuilder() + .setDataChecksum(createUniqueDataChecksumForReplica(contID, dn.getUuidString())) + .build(); + ContainerReportsProto reportWithChecksum = dnReportProto.toBuilder() + .clearReports() + .addReports(replicaWithChecksum) + .build(); + final ContainerReportFromDatanode dnReport = new ContainerReportFromDatanode(dn, reportWithChecksum); + reportHandler.onMessage(dnReport, publisher); + } + + // All the replicas should have different checksums. + // Since the containers don't have any data in this test, different checksums are based on container ID and + // datanode ID. + int numReplicasChecked = 0; + for (ContainerReplica replica: containerManager.getContainerReplicas(contID)) { + long expectedChecksum = createUniqueDataChecksumForReplica(contID, replica.getDatanodeDetails().getUuidString()); + assertEquals(expectedChecksum, replica.getDataChecksum()); + numReplicasChecked++; + } + assertEquals(numNodes, numReplicasChecked); + + // For each datanode, send a container report with a matching checksum. + // This simulates reconciliation running. + for (DatanodeDetails dn: datanodes) { + ContainerReportsProto dnReportProto = getContainerReportsProto( + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + ContainerReplicaProto replicaWithChecksum = dnReportProto.getReports(0).toBuilder() + .setDataChecksum(createMatchingDataChecksumForReplica(contID)) + .build(); + ContainerReportsProto reportWithChecksum = dnReportProto.toBuilder() + .clearReports() + .addReports(replicaWithChecksum) + .build(); + final ContainerReportFromDatanode dnReport = new ContainerReportFromDatanode(dn, reportWithChecksum); + reportHandler.onMessage(dnReport, publisher); + } + + // All the replicas should now have matching checksums. + // Since the containers don't have any data in this test, the matching checksums are based on container ID only. + numReplicasChecked = 0; + for (ContainerReplica replica: containerManager.getContainerReplicas(contID)) { + long expectedChecksum = createMatchingDataChecksumForReplica(contID); + assertEquals(expectedChecksum, replica.getDataChecksum()); + numReplicasChecked++; + } + assertEquals(numNodes, numReplicasChecked); + } + + /** + * Generates a placeholder data checksum for testing that is specific to a container replica. + */ + protected static long createUniqueDataChecksumForReplica(ContainerID containerID, String datanodeID) { + return (datanodeID + containerID).hashCode(); + } + + /** + * Generates a placeholder data checksum for testing that is the same for all container replicas. + */ + protected static long createMatchingDataChecksumForReplica(ContainerID containerID) { + return Objects.hashCode(containerID); + } + private ContainerReportFromDatanode getContainerReportFromDatanode( ContainerID containerId, ContainerReplicaProto.State state, DatanodeDetails dn, long bytesUsed, long keyCount) { @@ -1021,7 +1169,6 @@ protected static ContainerReportsProto getContainerReportsProto( .setContainerID(containerId.getId()) .setState(state) .setOriginNodeId(originNodeId) - .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") .setSize(5368709120L) .setUsed(usedBytes) .setKeyCount(keyCount) @@ -1035,5 +1182,4 @@ protected static ContainerReportsProto getContainerReportsProto( .build(); return crBuilder.addReports(replicaProto).build(); } - } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java index dbcccce598c9..9abbda819340 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -76,6 +76,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; @@ -83,6 +84,9 @@ import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer; import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer; import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas; +import static org.apache.hadoop.hdds.scm.container.TestContainerReportHandler.createMatchingDataChecksumForReplica; +import static org.apache.hadoop.hdds.scm.container.TestContainerReportHandler.createUniqueDataChecksumForReplica; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; @@ -576,6 +580,144 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException, } } + @Test + public void testWithNoContainerDataChecksum() throws Exception { + final IncrementalContainerReportHandler reportHandler = new IncrementalContainerReportHandler(nodeManager, + containerManager, scmContext); + + final int numNodes = 3; + + // Create a container which will have one replica on each datanode. + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + ContainerID contID = container.containerID(); + final Set containerIDSet = Stream.of(contID).collect(Collectors.toSet()); + + List datanodes = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + DatanodeDetails dn = randomDatanodeDetails(); + nodeManager.register(dn, null, null); + nodeManager.setContainers(dn, containerIDSet); + datanodes.add(dn); + } + + containerStateManager.addContainer(container.getProtobuf()); + + getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes) + .forEach(r -> containerStateManager.updateContainerReplica(contID, r)); + + // Container manager should now be aware of 3 replicas of each container. + assertEquals(numNodes, containerManager.getContainerReplicas(contID).size()); + + // All replicas should start with an empty data checksum in SCM. + boolean contOneDataChecksumsEmpty = containerManager.getContainerReplicas(contID).stream() + .allMatch(r -> r.getDataChecksum() == 0); + assertTrue(contOneDataChecksumsEmpty, "Replicas of container one should not yet have any data checksums."); + + // Send a report to SCM from one datanode that still does not have a data checksum. + for (DatanodeDetails dn: datanodes) { + final IncrementalContainerReportProto dnReportProto = getIncrementalContainerReportProto( + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + final IncrementalContainerReportFromDatanode dnReport = new IncrementalContainerReportFromDatanode(dn, + dnReportProto); + reportHandler.onMessage(dnReport, publisher); + } + + // Regardless of which datanode sent the report, none of them have checksums, so all replica's data checksums + // should remain empty. + boolean containerDataChecksumEmpty = containerManager.getContainerReplicas(contID).stream() + .allMatch(r -> r.getDataChecksum() == 0); + assertTrue(containerDataChecksumEmpty, "Replicas of the container should not have any data checksums."); + } + + @Test + public void testWithContainerDataChecksum() throws Exception { + final IncrementalContainerReportHandler reportHandler = new IncrementalContainerReportHandler(nodeManager, + containerManager, scmContext); + + final int numNodes = 3; + + // Create a container which will have one replica on each datanode. + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + ContainerID contID = container.containerID(); + final Set containerIDSet = Stream.of(contID).collect(Collectors.toSet()); + + List datanodes = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + DatanodeDetails dn = randomDatanodeDetails(); + nodeManager.register(dn, null, null); + nodeManager.setContainers(dn, containerIDSet); + datanodes.add(dn); + } + + containerStateManager.addContainer(container.getProtobuf()); + + getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes) + .forEach(r -> containerStateManager.updateContainerReplica(contID, r)); + + // Container manager should now be aware of 3 replicas of each container. + assertEquals(3, containerManager.getContainerReplicas(contID).size()); + + // All replicas should start with a zero data checksum in SCM. + boolean dataChecksumsEmpty = containerManager.getContainerReplicas(contID).stream() + .allMatch(r -> r.getDataChecksum() == 0); + assertTrue(dataChecksumsEmpty, "Replicas of container one should not yet have any data checksums."); + + // For each datanode, send a container report with a mismatched checksum. + for (DatanodeDetails dn: datanodes) { + IncrementalContainerReportProto dnReportProto = getIncrementalContainerReportProto( + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + ContainerReplicaProto replicaWithChecksum = dnReportProto.getReport(0).toBuilder() + .setDataChecksum(createUniqueDataChecksumForReplica(contID, dn.getUuidString())) + .build(); + IncrementalContainerReportProto reportWithChecksum = dnReportProto.toBuilder() + .clearReport() + .addReport(replicaWithChecksum) + .build(); + final IncrementalContainerReportFromDatanode dnReport = new IncrementalContainerReportFromDatanode(dn, + reportWithChecksum); + reportHandler.onMessage(dnReport, publisher); + } + + // All the replicas should have different checksums. + // Since the containers don't have any data in this test, different checksums are based on container ID and + // datanode ID. + int numReplicasChecked = 0; + for (ContainerReplica replica: containerManager.getContainerReplicas(contID)) { + long expectedChecksum = createUniqueDataChecksumForReplica( + contID, replica.getDatanodeDetails().getUuidString()); + assertEquals(expectedChecksum, replica.getDataChecksum()); + numReplicasChecked++; + } + assertEquals(numNodes, numReplicasChecked); + + // For each datanode, send a container report with a matching checksum. + // This simulates reconciliation running. + for (DatanodeDetails dn: datanodes) { + IncrementalContainerReportProto dnReportProto = getIncrementalContainerReportProto( + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + ContainerReplicaProto replicaWithChecksum = dnReportProto.getReport(0).toBuilder() + .setDataChecksum(createMatchingDataChecksumForReplica(contID)) + .build(); + IncrementalContainerReportProto reportWithChecksum = dnReportProto.toBuilder() + .clearReport() + .addReport(replicaWithChecksum) + .build(); + IncrementalContainerReportFromDatanode dnReport = new IncrementalContainerReportFromDatanode(dn, + reportWithChecksum); + reportHandler.onMessage(dnReport, publisher); + } + + // All the replicas should now have matching checksums. + // Since the containers don't have any data in this test, the matching checksums are based on container ID only. + numReplicasChecked = 0; + for (ContainerReplica replica: containerManager.getContainerReplicas(contID)) { + long expectedChecksum = createMatchingDataChecksumForReplica(contID); + assertEquals(expectedChecksum, replica.getDataChecksum()); + numReplicasChecked++; + } + assertEquals(numNodes, numReplicasChecked); + } + private static IncrementalContainerReportProto getIncrementalContainerReportProto(ContainerReplicaProto replicaProto) { final IncrementalContainerReportProto.Builder crBuilder = @@ -595,7 +737,6 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException, .setContainerID(containerId.getId()) .setState(state) .setOriginNodeId(originNodeId) - .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") .setSize(5368709120L) .setUsed(2000000000L) .setKeyCount(100000000L) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java new file mode 100644 index 000000000000..ffc96217b441 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.reconciliation; + +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult; +import org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.Result; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.ArgumentCaptor; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests that the ReconcileContainerEventHandler properly accepts and rejects reconciliation events based on + * container state, and dispatches commands to datanodes accordingly. + */ +public class TestReconcileContainerEventHandler { + private ContainerManager containerManager; + private EventPublisher eventPublisher; + private ReconcileContainerEventHandler eventHandler; + private SCMContext scmContext; + + private static final ContainerID CONTAINER_ID = ContainerID.valueOf(123L); + private static final long LEADER_TERM = 3L; + + private static final ReplicationConfig RATIS_THREE_REP = RatisReplicationConfig.getInstance(THREE); + private static final ReplicationConfig RATIS_ONE_REP = RatisReplicationConfig.getInstance(ONE); + private static final ReplicationConfig EC_REP = new ECReplicationConfig(3, 2); + + private ArgumentCaptor> commandCaptor; + + @BeforeEach + public void setup() throws Exception { + commandCaptor = ArgumentCaptor.forClass(CommandForDatanode.class); + containerManager = mock(ContainerManager.class); + scmContext = mock(SCMContext.class); + when(scmContext.isLeader()).thenReturn(true); + when(scmContext.getTermOfLeader()).thenReturn(LEADER_TERM); + eventPublisher = mock(EventPublisher.class); + eventHandler = new ReconcileContainerEventHandler(containerManager, scmContext); + } + + /** + * EC containers are not yet supported for reconciliation. + */ + @Test + public void testReconcileECContainer() throws Exception { + addContainer(EC_REP, LifeCycleState.CLOSED); + addReplicasToContainer(5); + + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + assertFalse(result.isOk()); + assertEquals(Result.INELIGIBLE_REPLICATION_TYPE, result.getResult()); + + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any()); + } + + /** + * Ratis 1 containers are not currently supported for reconciliation. + */ + @Test + public void testReconcileRatisOneContainer() throws Exception { + addContainer(RATIS_ONE_REP, LifeCycleState.CLOSED); + addReplicasToContainer(1); + + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + assertFalse(result.isOk()); + assertEquals(Result.NOT_ENOUGH_REQUIRED_NODES, result.getResult()); + + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any()); + } + + @Test + public void testReconcileWhenNotLeader() throws Exception { + addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED); + addReplicasToContainer(3); + when(scmContext.isLeader()).thenReturn(false); + + // Container is eligible for reconciliation, but the request will not go through because this SCM is not the leader. + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + assertTrue(result.isOk()); + assertEquals(Result.OK, result.getResult()); + + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any()); + } + + @Test + public void testReconcileNonexistentContainer() throws Exception { + // The step of adding the container to the mocked ContainerManager is intentionally skipped to simulate a + // nonexistent container. + // No exceptions should be thrown out of this test method when this happens. If they are, they will be propagated + // and the test will fail. + when(containerManager.getContainer(any())).thenThrow(new ContainerNotFoundException()); + + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + assertFalse(result.isOk()); + assertEquals(Result.CONTAINER_NOT_FOUND, result.getResult()); + + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any()); + } + + @Test + public void testReconcileMissingContainer() throws Exception { + addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED); + assertTrue(containerManager.getContainerReplicas(CONTAINER_ID).isEmpty(), + "Expected no replicas for this container"); + + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + assertFalse(result.isOk()); + assertEquals(Result.NO_REPLICAS_FOUND, result.getResult()); + + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any()); + } + + @ParameterizedTest + @EnumSource(LifeCycleState.class) + public void testReconcileWithContainerStates(LifeCycleState state) throws Exception { + addContainer(RATIS_THREE_REP, state); + addReplicasToContainer(3); + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + switch (state) { + case OPEN: + case CLOSING: + case DELETING: + case DELETED: + case RECOVERING: + assertFalse(result.isOk()); + assertEquals(Result.INELIGIBLE_CONTAINER_STATE, result.getResult()); + verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); + break; + default: + assertTrue(result.isOk()); + assertEquals(Result.OK, result.getResult()); + verify(eventPublisher, times(3)).fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); + break; + } + } + + // TODO HDDS-10714 will change which datanodes are eligible to participate in reconciliation. + @Test + public void testReconcileSentToAllPeers() throws Exception { + addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED); + Set replicas = addReplicasToContainer(3); + Set allNodeIDs = replicas.stream() + .map(r -> r.getDatanodeDetails().getUuid()) + .collect(Collectors.toSet()); + + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + assertTrue(result.isOk()); + assertEquals(Result.OK, result.getResult()); + + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + assertEquals(3, replicas.size()); + assertEquals(allNodeIDs.size(), replicas.size()); + verify(eventPublisher, times(replicas.size())).fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); + + // Check each reconcile command sent for correctness. + Set nodesReceivingCommands = new HashSet<>(); + for (CommandForDatanode dnCommand: commandCaptor.getAllValues()) { + SCMCommand reconcileCommand = dnCommand.getCommand(); + ReconcileContainerCommandProto reconcileProto = reconcileCommand.getProto(); + // All commands should use the latest term of SCM so the datanode does not drop them. + assertEquals(LEADER_TERM, reconcileCommand.getTerm()); + // All commands should have the same container ID. + assertEquals(CONTAINER_ID, ContainerID.valueOf(reconcileProto.getContainerID())); + // Container ID is also used as the command's identifier. + assertEquals(CONTAINER_ID, ContainerID.valueOf(reconcileCommand.getId())); + + // Every node should receive exactly one reconcile command. + UUID targetNodeID = dnCommand.getDatanodeId(); + assertTrue(nodesReceivingCommands.add(targetNodeID), "Duplicate reconcile command sent to datanode."); + // All commands should have correctly constructed peer lists that exclude the node receiving the command. + Set expectedPeerIDs = allNodeIDs.stream() + .filter(id -> id != targetNodeID) + .collect(Collectors.toSet()); + Set actualPeerIDs = reconcileProto.getPeersList().stream() + .map(dn -> UUID.fromString(dn.getUuid())) + .collect(Collectors.toSet()); + assertEquals(replicas.size() - 1, actualPeerIDs.size()); + assertEquals(expectedPeerIDs, actualPeerIDs); + } + + assertEquals(allNodeIDs, nodesReceivingCommands); + } + + @ParameterizedTest + @EnumSource(State.class) + public void testReconcileFailsWithIneligibleReplicas(State replicaState) throws Exception { + // Overall container state is eligible for reconciliation, but some replicas may not be. + // This means the container will not be considered eligible. + addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED); + // Only one replica is in a different state. + addReplicasToContainer(replicaState, State.CLOSED, State.CLOSED); + + EligibilityResult result = + ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, containerManager); + + eventHandler.onMessage(CONTAINER_ID, eventPublisher); + switch (replicaState) { + case OPEN: + case INVALID: + case DELETED: + case CLOSING: + assertFalse(result.isOk()); + assertEquals(Result.INELIGIBLE_REPLICA_STATES, result.getResult()); + verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); + break; + default: + assertTrue(result.isOk()); + assertEquals(Result.OK, result.getResult()); + verify(eventPublisher, times(3)).fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture()); + break; + } + } + + private ContainerInfo addContainer(ReplicationConfig repConfig, LifeCycleState state) throws Exception { + ContainerInfo container = new ContainerInfo.Builder() + .setContainerID(CONTAINER_ID.getId()) + .setReplicationConfig(repConfig) + .setState(state) + .build(); + when(containerManager.getContainer(CONTAINER_ID)).thenReturn(container); + return container; + } + + private Set addReplicasToContainer(int count) throws Exception { + State[] replicaStates = new State[count]; + Arrays.fill(replicaStates, State.CLOSED); + return addReplicasToContainer(replicaStates); + } + + private Set addReplicasToContainer(State... replicaStates) throws Exception { + // Add one container replica for each replica state specified. + // If no states are specified, replica list will be empty. + Set replicas = new HashSet<>(); + try (MockNodeManager nodeManager = new MockNodeManager(true, replicaStates.length)) { + List nodes = nodeManager.getAllNodes(); + for (int i = 0; i < replicaStates.length; i++) { + replicas.addAll(HddsTestUtils.getReplicas(CONTAINER_ID, replicaStates[i], nodes.get(i))); + } + } + when(containerManager.getContainerReplicas(CONTAINER_ID)).thenReturn(replicas); + + return replicas; + } +} diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 0dd52cd291ab..f46ee62fd8b3 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -575,4 +575,8 @@ public String getMetrics(String query) throws IOException { return storageContainerLocationClient.getMetrics(query); } + @Override + public void reconcileContainer(long id) throws IOException { + storageContainerLocationClient.reconcileContainer(id); + } } diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java index 54c69273f0bc..9f93c56f2db2 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java @@ -43,7 +43,8 @@ CreateSubcommand.class, CloseSubcommand.class, ReportSubcommand.class, - UpgradeSubcommand.class + UpgradeSubcommand.class, + ReconcileSubcommand.class }) @MetaInfServices(SubcommandWithParent.class) public class ContainerCommands implements Callable, SubcommandWithParent { diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java new file mode 100644 index 000000000000..e747455a8823 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.cli.container; + +import java.io.IOException; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; +import org.apache.hadoop.hdds.scm.client.ScmClient; + +import picocli.CommandLine; +import picocli.CommandLine.Command; + +/** + * This is the handler that process container list command. + */ +@Command( + name = "reconcile", + description = "Reconcile container replicas", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ReconcileSubcommand extends ScmSubcommand { + + @CommandLine.Parameters(description = "ID of the container to reconcile") + private long containerId; + + @Override + public void execute(ScmClient scmClient) throws IOException { + scmClient.reconcileContainer(containerId); + System.out.println("Reconciliation has been triggered for container " + containerId); + // TODO a better option to check status may be added later. + System.out.println("Use \"ozone admin container info --json " + containerId + "\" to see the checksums of each " + + "container replica"); + } +} diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot index c50daa724dad..fae08991781f 100644 --- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot +++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot @@ -34,6 +34,12 @@ Container is closed ${output} = Execute ozone admin container info "${container}" Should contain ${output} CLOSED +Reconciliation complete + [arguments] ${container} + ${data_checksum} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1 + Should not be empty ${data_checksum} + Should not be equal as strings 0 ${data_checksum} + *** Test Cases *** Create container ${output} = Execute ozone admin container create @@ -71,6 +77,17 @@ Verbose container info ${output} = Execute ozone admin --verbose container info "${CONTAINER}" Should contain ${output} Pipeline Info +Incomplete command + ${output} = Execute And Ignore Error ozone admin container + Should contain ${output} Incomplete command + Should contain ${output} list + Should contain ${output} info + Should contain ${output} create + Should contain ${output} close + Should contain ${output} reconcile + Should contain ${output} report + Should contain ${output} upgrade + List containers as JSON ${output} = Execute ozone admin container info "${CONTAINER}" --json | jq -r '.' Should contain ${output} containerInfo @@ -84,23 +101,6 @@ Report containers as JSON Should contain ${output} stats Should contain ${output} samples -Close container - ${container} = Execute ozone admin container list --state OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -1 - Execute ozone admin container close "${container}" - ${output} = Execute ozone admin container info "${container}" - Should contain ${output} CLOS - Wait until keyword succeeds 1min 10sec Container is closed ${container} - -Incomplete command - ${output} = Execute And Ignore Error ozone admin container - Should contain ${output} Incomplete command - Should contain ${output} list - Should contain ${output} info - Should contain ${output} create - Should contain ${output} close - Should contain ${output} report - Should contain ${output} upgrade - #List containers on unknown host # ${output} = Execute And Ignore Error ozone admin --verbose container list --scm unknown-host # Should contain ${output} Invalid host name @@ -111,5 +111,38 @@ Cannot close container without admin privilege Cannot create container without admin privilege Requires admin privilege ozone admin container create +Cannot reconcile container without admin privilege + Requires admin privilege ozone admin container reconcile "${CONTAINER}" + Reset user Run Keyword if '${SECURITY_ENABLED}' == 'true' Kinit test user testuser testuser.keytab + +Cannot reconcile open container + # At this point we should have an open Ratis Three container. + ${container} = Execute ozone admin container list --state OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -n1 + Execute and check rc ozone admin container reconcile "${container}" 255 + # The container should not yet have any replica checksums. + # TODO When the scanner is computing checksums automatically, this test may need to be updated. + ${data_checksum} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1 + # 0 is the hex value of an empty checksum. + Should Be Equal As Strings 0 ${data_checksum} + +Close container + ${container} = Execute ozone admin container list --state OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -1 + Execute ozone admin container close "${container}" + # The container may either be in CLOSED or CLOSING state at this point. Once we have verified this, we will wait + # for it to progress to CLOSED. + ${output} = Execute ozone admin container info "${container}" + Should contain ${output} CLOS + Wait until keyword succeeds 1min 10sec Container is closed ${container} + +Reconcile closed container + # Check that info does not show replica checksums, since manual reconciliation has not yet been triggered. + # TODO When the scanner is computing checksums automatically, this test may need to be updated. + ${container} = Execute ozone admin container list --state CLOSED | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -1 + ${data_checksum} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1 + # 0 is the hex value of an empty checksum. + Should Be Equal As Strings 0 ${data_checksum} + # When reconciliation finishes, replica checksums should be shown. + Execute ozone admin container reconcile ${container} + Wait until keyword succeeds 1min 5sec Reconciliation complete ${container}