Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ public class ContainerChecksumTreeManager {
public ContainerChecksumTreeManager(ConfigurationSource conf) {
fileLock = SimpleStriped.readWriteLock(
conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(), true);
// TODO: TO unregister metrics on stop.
metrics = ContainerMerkleTreeMetrics.create();
}

public void stop() {
ContainerMerkleTreeMetrics.unregister();
}

/**
* Writes the specified container merkle tree to the specified container's checksum file.
* The data merkle tree within the file is replaced with the {@code tree} parameter, but all other content of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class ContainerMerkleTreeMetrics {

public static ContainerMerkleTreeMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
// TODO: Remove when checksum manager is moved from KeyValueHandler.
MetricsSource source = ms.getSource(METRICS_SOURCE_NAME);
if (source != null) {
ms.unregisterSource(METRICS_SOURCE_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.checksum;

import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

/**
* Used to execute a container reconciliation task that has been queued from the ReplicationSupervisor.
*/
public class ReconcileContainerTask extends AbstractReplicationTask {
private final ReconcileContainerCommand command;
private final DNContainerOperationClient dnClient;
private final ContainerController controller;

private static final Logger LOG =
LoggerFactory.getLogger(ReconcileContainerTask.class);

public ReconcileContainerTask(ContainerController controller,
DNContainerOperationClient dnClient, ReconcileContainerCommand command) {
super(command.getContainerID(), command.getDeadline(), command.getTerm());
this.command = command;
this.controller = controller;
this.dnClient = dnClient;
}

@Override
public void runTask() {
long start = Time.monotonicNow();

LOG.info("{}", this);

try {
controller.reconcileContainer(dnClient, command.getContainerID(), command.getPeerDatanodes());
setStatus(Status.DONE);
long elapsed = Time.monotonicNow() - start;
LOG.info("{} completed in {} ms", this, elapsed);
} catch (Exception e) {
long elapsed = Time.monotonicNow() - start;
setStatus(Status.FAILED);
LOG.warn("{} failed in {} ms", this, elapsed, e);
}
}

@Override
protected Object getCommandForDebug() {
return command.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReconcileContainerTask that = (ReconcileContainerTask) o;
return Objects.equals(command, that.command);
}

@Override
public int hashCode() {
return Objects.hash(getContainerId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand All @@ -30,6 +30,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
Expand Down Expand Up @@ -70,16 +72,17 @@ protected Handler(ConfigurationSource config, String datanodeId,
this.icrSender = icrSender;
}

@SuppressWarnings("checkstyle:ParameterNumber")
public static Handler getHandlerForContainerType(
final ContainerType containerType, final ConfigurationSource config,
final String datanodeId, final ContainerSet contSet,
final VolumeSet volumeSet, final ContainerMetrics metrics,
IncrementalReportSender<Container> icrSender) {
IncrementalReportSender<Container> icrSender, ContainerChecksumTreeManager checksumManager) {
switch (containerType) {
case KeyValueContainer:
return new KeyValueHandler(config,
datanodeId, contSet, volumeSet, metrics,
icrSender);
icrSender, checksumManager);
default:
throw new IllegalArgumentException("Handler for ContainerType: " +
containerType + "doesn't exist.");
Expand Down Expand Up @@ -199,7 +202,8 @@ public abstract void deleteContainer(Container container, boolean force)
* @param container container to be reconciled.
* @param peers The other datanodes with a copy of this container whose data should be checked.
*/
public abstract void reconcileContainer(Container<?> container, List<DatanodeDetails> peers) throws IOException;
public abstract void reconcileContainer(DNContainerOperationClient dnClient, Container<?> container,
Set<DatanodeDetails> peers) throws IOException;

/**
* Deletes the given files associated with a block of the container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.NettyMetrics;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
Expand Down Expand Up @@ -225,6 +226,10 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
new ReconstructECContainersCommandHandler(conf, supervisor,
ecReconstructionCoordinator);

// TODO HDDS-11218 combine the clients used for reconstruction and reconciliation so they share the same cache of
// datanode clients.
DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, certClient, secretKeyClient);

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
.build();
Expand Down Expand Up @@ -253,7 +258,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
supervisor::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
.addHandler(new ReconcileContainerCommandHandler(threadNamePrefix))
.addHandler(new ReconcileContainerCommandHandler(supervisor, dnClient))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,66 +18,38 @@

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Handles commands from SCM to reconcile a container replica on this datanode with the replicas on its peers.
*/
public class ReconcileContainerCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ReconcileContainerCommandHandler.class);

private final ReplicationSupervisor supervisor;
private final AtomicLong invocationCount;
private final AtomicInteger queuedCount;
private final ExecutorService executor;
private long totalTime;
private final DNContainerOperationClient dnClient;

public ReconcileContainerCommandHandler(String threadNamePrefix) {
invocationCount = new AtomicLong(0);
queuedCount = new AtomicInteger(0);
// TODO Allow configurable thread pool size with a default value when the implementation is ready.
executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d")
.build());
totalTime = 0;
public ReconcileContainerCommandHandler(ReplicationSupervisor supervisor, DNContainerOperationClient dnClient) {
this.supervisor = supervisor;
this.dnClient = dnClient;
this.invocationCount = new AtomicLong(0);
}

@Override
public void handle(SCMCommand command, OzoneContainer container, StateContext context,
SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
invocationCount.incrementAndGet();
long startTime = Time.monotonicNow();
ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) command;
LOG.info("Processing reconcile container command for container {} with peers {}",
reconcileCommand.getContainerID(), reconcileCommand.getPeerDatanodes());
try {
container.getController().reconcileContainer(reconcileCommand.getContainerID(),
reconcileCommand.getPeerDatanodes());
} catch (IOException ex) {
LOG.error("Failed to reconcile container {}.", reconcileCommand.getContainerID(), ex);
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
invocationCount.incrementAndGet();
ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) command;
supervisor.addTask(new ReconcileContainerTask(container.getController(), dnClient, reconcileCommand));
}

@Override
Expand All @@ -90,21 +62,20 @@ public int getInvocationCount() {
return (int)invocationCount.get();
}

// Uses ReplicationSupervisor for these metrics.

@Override
public long getAverageRunTime() {
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
}

@Override
public long getTotalRunTime() {
return totalTime;
return 0;
}

@Override
public int getQueuedCount() {
return queuedCount.get();
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;

Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
Expand Down Expand Up @@ -159,14 +161,15 @@ public KeyValueHandler(ConfigurationSource config,
ContainerSet contSet,
VolumeSet volSet,
ContainerMetrics metrics,
IncrementalReportSender<Container> icrSender) {
IncrementalReportSender<Container> icrSender,
ContainerChecksumTreeManager checksumManager) {
super(config, datanodeId, contSet, volSet, metrics, icrSender);
blockManager = new BlockManagerImpl(config);
validateChunkChecksumData = conf.getObject(
DatanodeConfiguration.class).isChunkDataValidationCheck();
chunkManager = ChunkManagerFactory.createChunkManager(config, blockManager,
volSet);
checksumManager = new ContainerChecksumTreeManager(config);
this.checksumManager = checksumManager;
try {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
} catch (Exception e) {
Expand Down Expand Up @@ -1303,7 +1306,8 @@ public void deleteContainer(Container container, boolean force)
}

@Override
public void reconcileContainer(Container<?> container, List<DatanodeDetails> peers) throws IOException {
public void reconcileContainer(DNContainerOperationClient dnClient, Container<?> container,
Set<DatanodeDetails> peers) throws IOException {
// TODO Just a deterministic placeholder hash for testing until actual implementation is finished.
ContainerData data = container.getContainerData();
long id = data.getContainerID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
Expand All @@ -39,7 +40,6 @@
import java.io.OutputStream;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -215,12 +215,13 @@ public void deleteContainer(final long containerId, boolean force)
}
}

public void reconcileContainer(long containerID, List<DatanodeDetails> peers) throws IOException {
public void reconcileContainer(DNContainerOperationClient dnClient, long containerID, Set<DatanodeDetails> peers)
throws IOException {
Container<?> container = containerSet.getContainer(containerID);
if (container == null) {
LOG.warn("Container {} to reconcile not found on this datanode.", containerID);
} else {
getHandler(container).reconcileContainer(container, peers);
getHandler(container).reconcileContainer(dnClient, container, peers);
}
}

Expand Down
Loading