Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8339f96
WIP SCM changes for reconcile cli
errose28 Mar 1, 2024
573ec30
Changes for SCM WIP pt2
errose28 Mar 6, 2024
b259dae
Use SCMException only for error handling
errose28 Apr 3, 2024
4a89c8e
Add SCM event handler for reconcile events
errose28 Apr 3, 2024
f9d1bfd
Add datanode reconcile stub.
errose28 Apr 3, 2024
ddf3ce8
Updates after reviewing diff
errose28 Apr 4, 2024
47f6c06
Fix checkstyle
errose28 Apr 5, 2024
9154e3c
Basic reconcile scm <-> DN works
errose28 Apr 10, 2024
d036229
Improve error handling
errose28 Apr 10, 2024
9aecbcf
Add DN side unit tests
errose28 Apr 12, 2024
2775807
Test with two containers
errose28 Apr 16, 2024
600174f
Add container report handler tests
errose28 Apr 16, 2024
6ec2acb
Add ICR tests, improve FCR tests
errose28 Apr 17, 2024
4841c8f
Remove duplicate line from report handler test
errose28 Apr 18, 2024
8e7e8f7
Add (currently failing) test for scm event handler
errose28 Apr 18, 2024
74bb00a
Refactor contaienr eligibility, SCM event handler tests pass
errose28 Apr 19, 2024
2effd78
Checkstyle
errose28 Apr 19, 2024
d055094
Add robot test that may not pass yet
errose28 Apr 20, 2024
3586cc3
Rat
errose28 Apr 20, 2024
ac6ff0e
Test repeat run with fixed acc test workflow
errose28 Apr 22, 2024
d84d3ef
Some acceptance test fixes
errose28 Apr 23, 2024
a4a0a1c
Update comment
errose28 Apr 29, 2024
6f2c7e4
findbugs
errose28 Apr 29, 2024
9c94f1c
Separate container handler test for metrics
errose28 Apr 29, 2024
e42324e
Almost finished separated metrics and report tests
errose28 Apr 30, 2024
3cb0416
TestReconcileContainerCommandHandler complete and passing
errose28 Apr 30, 2024
18f4f2a
Checkstyle
errose28 Apr 30, 2024
2163008
Fix TestSCMExceptionResultCodes
errose28 Apr 30, 2024
13a335e
Might have fixed acceptance test
errose28 May 1, 2024
b03c4fc
Use long as checksum representation
errose28 May 2, 2024
bc434ec
Apparently snakeyaml is coupled to Java variable names
errose28 May 3, 2024
290fbb7
checkstyle
errose28 May 4, 2024
afd6043
Rename two existing container file checksum methods for clarity
errose28 May 4, 2024
d8e86bd
Test that container data checksum is not written to .container file
errose28 May 4, 2024
990735f
Fix simple acceptance test issue
errose28 May 4, 2024
f4ace2d
Print data checksum as hex string in container info output
errose28 May 4, 2024
36f92ee
Add log of placeholder checksum generated
errose28 May 4, 2024
f26a402
checkstyle
errose28 May 4, 2024
c8bdffc
putLong increments position, need to rewind.
errose28 May 6, 2024
f862836
Undo acceptance workflow change from master
errose28 May 7, 2024
8a2427b
Undo accidental gh actions change
errose28 May 21, 2024
31e60a9
Fix comment typo
errose28 May 21, 2024
97a76d7
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10372-re…
errose28 May 28, 2024
28b8862
Address review comments, fix error after merge commit
errose28 May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,4 +462,12 @@ DecommissionScmResponseProto decommissionScm(
String scmId) throws IOException;

String getMetrics(String query) throws IOException;

/**
* Trigger a reconcile command to datanodes for a container ID.
*
* @param containerID The ID of the container to reconcile.
* @throws IOException On error
*/
void reconcileContainer(long containerID) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
*/
package org.apache.hadoop.hdds.scm.container;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;

import java.io.IOException;
import java.util.UUID;

/**
Expand All @@ -35,6 +40,8 @@ public final class ContainerReplicaInfo {
private long keyCount;
private long bytesUsed;
private int replicaIndex = -1;
@JsonSerialize(using = LongToHexJsonSerializer.class)
private long dataChecksum;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Hex for printing to make it more human friendly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. That seemed standard but we could use a different format if there's a better option.


public static ContainerReplicaInfo fromProto(
HddsProtos.SCMContainerReplicaProto proto) {
Expand All @@ -48,7 +55,8 @@ public static ContainerReplicaInfo fromProto(
.setKeyCount(proto.getKeyCount())
.setBytesUsed(proto.getBytesUsed())
.setReplicaIndex(
proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1);
proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1)
.setDataChecksum(proto.getDataChecksum());
return builder.build();
}

Expand Down Expand Up @@ -87,6 +95,17 @@ public int getReplicaIndex() {
return replicaIndex;
}

public long getDataChecksum() {
return dataChecksum;
}

private static class LongToHexJsonSerializer extends JsonSerializer<Long> {
@Override
public void serialize(Long value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeString(Long.toHexString(value));
}
}

/**
* Builder for ContainerReplicaInfo class.
*/
Expand Down Expand Up @@ -134,6 +153,11 @@ public Builder setReplicaIndex(int replicaIndex) {
return this;
}

public Builder setDataChecksum(long dataChecksum) {
subject.dataChecksum = dataChecksum;
return this;
}

public ContainerReplicaInfo build() {
return subject;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public enum ResultCodes {
CA_ROTATION_IN_PROGRESS,
CA_ROTATION_IN_POST_PROGRESS,
CONTAINER_ALREADY_CLOSED,
CONTAINER_ALREADY_CLOSING
CONTAINER_ALREADY_CLOSING,
UNSUPPORTED_OPERATION
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,4 +483,12 @@ DecommissionScmResponseProto decommissionScm(
String scmId) throws IOException;

String getMetrics(String query) throws IOException;

/**
* Trigger a reconcile command to datanodes for the current container ID.
*
* @param containerID The ID of the container to reconcile.
* @throws IOException On error
*/
void reconcileContainer(long containerID) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,21 @@ public static synchronized DatanodeDetails readDatanodeDetailsFrom(File path)
* Verify that the checksum stored in containerData is equal to the
* computed checksum.
*/
public static void verifyChecksum(ContainerData containerData,
public static void verifyContainerFileChecksum(ContainerData containerData,
ConfigurationSource conf) throws IOException {
boolean enabled = conf.getBoolean(
HddsConfigKeys.HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED,
HddsConfigKeys.
HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT);
if (enabled) {
String storedChecksum = containerData.getChecksum();
String storedChecksum = containerData.getContainerFileChecksum();

Yaml yaml = ContainerDataYaml.getYamlForContainerType(
containerData.getContainerType(),
containerData instanceof KeyValueContainerData &&
((KeyValueContainerData)containerData).getReplicaIndex() > 0);
containerData.computeAndSetChecksum(yaml);
String computedChecksum = containerData.getChecksum();
containerData.computeAndSetContainerFileChecksum(yaml);
String computedChecksum = containerData.getContainerFileChecksum();

if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) {
throw new StorageContainerException("Container checksum error for " +
Expand All @@ -225,7 +225,7 @@ public static void verifyChecksum(ContainerData containerData,
* @param containerDataYamlStr ContainerData as a Yaml String
* @return Checksum of the container data
*/
public static String getChecksum(String containerDataYamlStr)
public static String getContainerFileChecksum(String containerDataYamlStr)
throws StorageContainerException {
MessageDigest sha;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ public abstract class ContainerData {

private HddsVolume volume;

// Checksum of just the container file.
private String checksum;

// Checksum of the data within the container.
private long dataChecksum;

private boolean isEmpty;

private int replicaIndex;
Expand All @@ -112,7 +116,7 @@ public abstract class ContainerData {
private transient Optional<Instant> lastDataScanTime = Optional.empty();

public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8;
private static final String DUMMY_CHECKSUM = new String(new byte[64],
private static final String ZERO_CHECKSUM = new String(new byte[64],
CHARSET_ENCODING);

// Common Fields need to be stored in .container file.
Expand Down Expand Up @@ -159,7 +163,8 @@ protected ContainerData(ContainerType type, long containerId,
this.originPipelineId = originPipelineId;
this.originNodeId = originNodeId;
this.isEmpty = false;
setChecksumTo0ByteArray();
this.checksum = ZERO_CHECKSUM;
this.dataChecksum = 0;
}

protected ContainerData(ContainerData source) {
Expand Down Expand Up @@ -571,15 +576,11 @@ public void setBlockCount(long count) {
this.blockCount.set(count);
}

public void setChecksumTo0ByteArray() {
this.checksum = DUMMY_CHECKSUM;
}

public void setChecksum(String checkSum) {
public void setContainerFileChecksum(String checkSum) {
this.checksum = checkSum;
}

public String getChecksum() {
public String getContainerFileChecksum() {
return this.checksum;
}

Expand Down Expand Up @@ -630,21 +631,29 @@ public String getOriginNodeId() {
*
* Checksum of ContainerData is calculated by setting the
* {@link ContainerData#checksum} field to a 64-byte array with all 0's -
* {@link ContainerData#DUMMY_CHECKSUM}. After the checksum is calculated,
* {@link ContainerData#ZERO_CHECKSUM}. After the checksum is calculated,
* the checksum field is updated with this value.
*
* @param yaml Yaml for ContainerType to get the ContainerData as Yaml String
* @throws IOException
*/
public void computeAndSetChecksum(Yaml yaml) throws IOException {
public void computeAndSetContainerFileChecksum(Yaml yaml) throws IOException {
// Set checksum to dummy value - 0 byte array, to calculate the checksum
// of rest of the data.
setChecksumTo0ByteArray();
this.checksum = ZERO_CHECKSUM;

// Dump yaml data into a string to compute its checksum
String containerDataYamlStr = yaml.dump(this);

this.checksum = ContainerUtils.getChecksum(containerDataYamlStr);
this.checksum = ContainerUtils.getContainerFileChecksum(containerDataYamlStr);
}

public void setDataChecksum(long dataChecksum) {
this.dataChecksum = dataChecksum;
}

public long getDataChecksum() {
return dataChecksum;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static void createContainerFile(ContainerType containerType,
// Create Yaml for given container type
Yaml yaml = getYamlForContainerType(containerType, withReplicaIndex);
// Compute Checksum and update ContainerData
containerData.computeAndSetChecksum(yaml);
containerData.computeAndSetContainerFileChecksum(yaml);

// Write the ContainerData with checksum to Yaml file.
out = new FileOutputStream(
Expand Down Expand Up @@ -312,7 +312,7 @@ public Object construct(Node node) {
kvData.setChunksPath((String) nodes.get(OzoneConsts.CHUNKS_PATH));
Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
kvData.setMetadata(meta);
kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM));
kvData.setContainerFileChecksum((String) nodes.get(OzoneConsts.CHECKSUM));
Long timestamp = (Long) nodes.get(OzoneConsts.DATA_SCAN_TIMESTAMP);
kvData.setDataScanTimestamp(timestamp);
String state = (String) nodes.get(OzoneConsts.STATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -192,6 +194,13 @@ public abstract void closeContainer(Container container)
public abstract void deleteContainer(Container container, boolean force)
throws IOException;

/**
* Triggers reconciliation of this container replica's data with its peers.
* @param container container to be reconciled.
* @param peers The other datanodes with a copy of this container whose data should be checked.
*/
public abstract void reconcileContainer(Container<?> container, List<DatanodeDetails> peers) throws IOException;

/**
* Deletes the given files associated with a block of the container.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconcileContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
Expand Down Expand Up @@ -258,6 +259,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
supervisor::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
.addHandler(new ReconcileContainerCommandHandler(threadNamePrefix))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Handles commands from SCM to reconcile a container replica on this datanode with the replicas on its peers.
*/
public class ReconcileContainerCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ReconcileContainerCommandHandler.class);

private final AtomicLong invocationCount;
private final AtomicInteger queuedCount;
private final ExecutorService executor;
private long totalTime;

public ReconcileContainerCommandHandler(String threadNamePrefix) {
invocationCount = new AtomicLong(0);
queuedCount = new AtomicInteger(0);
// TODO Allow configurable thread pool size with a default value when the implementation is ready.
executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d")
.build());
totalTime = 0;
}

@Override
public void handle(SCMCommand command, OzoneContainer container, StateContext context,
SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
invocationCount.incrementAndGet();
long startTime = Time.monotonicNow();
ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) command;
LOG.info("Processing reconcile container command for container {} with peers {}",
reconcileCommand.getContainerID(), reconcileCommand.getPeerDatanodes());
try {
container.getController().reconcileContainer(reconcileCommand.getContainerID(),
reconcileCommand.getPeerDatanodes());
} catch (IOException ex) {
LOG.error("Failed to reconcile container {}.", reconcileCommand.getContainerID(), ex);
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}

@Override
public SCMCommandProto.Type getCommandType() {
return SCMCommandProto.Type.reconcileContainerCommand;
}

@Override
public int getInvocationCount() {
return (int)invocationCount.get();
}

@Override
public long getAverageRunTime() {
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
}

@Override
public long getTotalRunTime() {
return totalTime;
}

@Override
public int getQueuedCount() {
return queuedCount.get();
}
}
Loading