Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex;

import java.util.Arrays;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.IntStream;

import static java.util.Collections.unmodifiableSortedMap;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;

/**
* This class is to keep the required EC reconstruction info.
*/
public class ECReconstructionCommandInfo {
private long containerID;
private ECReplicationConfig ecReplicationConfig;
private byte[] missingContainerIndexes;
private List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sources;
private List<DatanodeDetails> targetDatanodes;
private long deadlineMsSinceEpoch = 0;
private final SortedMap<Integer, DatanodeDetails> sourceNodeMap;
private final SortedMap<Integer, DatanodeDetails> targetNodeMap;
private final long containerID;
private final ECReplicationConfig ecReplicationConfig;
private final byte[] missingContainerIndexes;
private final long deadlineMsSinceEpoch;
private final long term;

public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) {
Expand All @@ -44,10 +49,20 @@ public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) {
this.missingContainerIndexes =
Arrays.copyOf(cmd.getMissingContainerIndexes(),
cmd.getMissingContainerIndexes().length);
this.sources = cmd.getSources();
this.targetDatanodes = cmd.getTargetDatanodes();
this.deadlineMsSinceEpoch = cmd.getDeadline();
this.term = cmd.getTerm();

sourceNodeMap = cmd.getSources().stream()
.collect(toMap(
DatanodeDetailsAndReplicaIndex::getReplicaIndex,
DatanodeDetailsAndReplicaIndex::getDnDetails,
(v1, v2) -> v1, TreeMap::new));
targetNodeMap = IntStream.range(0, cmd.getTargetDatanodes().size())
.boxed()
.collect(toMap(
i -> (int) missingContainerIndexes[i],
i -> cmd.getTargetDatanodes().get(i),
(v1, v2) -> v1, TreeMap::new));
}

public long getDeadline() {
Expand All @@ -58,35 +73,36 @@ public long getContainerID() {
return containerID;
}

public byte[] getMissingContainerIndexes() {
return Arrays
.copyOf(missingContainerIndexes, missingContainerIndexes.length);
}

public ECReplicationConfig getEcReplicationConfig() {
return ecReplicationConfig;
}

public List<DatanodeDetailsAndReplicaIndex> getSources() {
return sources;
SortedMap<Integer, DatanodeDetails> getSourceNodeMap() {
return unmodifiableSortedMap(sourceNodeMap);
}

public List<DatanodeDetails> getTargetDatanodes() {
return targetDatanodes;
SortedMap<Integer, DatanodeDetails> getTargetNodeMap() {
return unmodifiableSortedMap(targetNodeMap);
}

@Override
public String toString() {
return "ECReconstructionCommandInfo{"
return "ECReconstructionCommand{"
+ "containerID=" + containerID
+ ", ecReplicationConfig=" + ecReplicationConfig
+ ", missingContainerIndexes=" + Arrays
.toString(missingContainerIndexes)
+ ", sources=" + sources
+ ", targetDatanodes=" + targetDatanodes + '}';
+ ", replication=" + ecReplicationConfig.getReplication()
+ ", missingIndexes=" + Arrays.toString(missingContainerIndexes)
+ ", sources={" + toString(sourceNodeMap) + "}"
+ ", targets={" + toString(targetNodeMap) + "}}";
}

private String toString(SortedMap<Integer, DatanodeDetails> nodeMap) {
return nodeMap.entrySet().stream()
.map(e -> e.getKey() + ":" + e.getValue().getHostNameAndIP())
.collect(joining(","));
}

public long getTerm() {
return term;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,20 @@
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Clock;
import java.util.OptionalLong;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This is the actual EC reconstruction coordination task.
*/
public class ECReconstructionCoordinatorTask implements Runnable {
static final Logger LOG =
private static final Logger LOG =
LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
private final ConcurrentHashMap.KeySetView<Object, Boolean> inprogressCounter;
private final ECReconstructionCoordinator reconstructionCoordinator;
Expand Down Expand Up @@ -69,6 +64,7 @@ public void run() {
// respective container. HDDS-6582
// 5. Close/finalize the recovered containers.
long containerID = this.reconstructionCommandInfo.getContainerID();
long start = Time.monotonicNow();
if (LOG.isDebugEnabled()) {
LOG.debug("Starting the EC reconstruction of the container {}",
containerID);
Expand All @@ -88,40 +84,27 @@ public void run() {
final long taskTerm = reconstructionCommandInfo.getTerm();
if (currentTerm.isPresent() && taskTerm < currentTerm.getAsLong()) {
LOG.info("Ignoring {} since SCM leader has new term ({} < {})",
this, taskTerm, currentTerm.getAsLong());
reconstructionCommandInfo, taskTerm, currentTerm.getAsLong());
return;
}

SortedMap<Integer, DatanodeDetails> sourceNodeMap =
reconstructionCommandInfo.getSources().stream().collect(Collectors
.toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex,
DatanodeDetailsAndReplicaIndex::getDnDetails, (v1, v2) -> v1,
TreeMap::new));
SortedMap<Integer, DatanodeDetails> targetNodeMap = IntStream
.range(0, reconstructionCommandInfo.getTargetDatanodes().size())
.boxed().collect(Collectors.toMap(i -> (int) reconstructionCommandInfo
.getMissingContainerIndexes()[i],
i -> reconstructionCommandInfo.getTargetDatanodes().get(i),
(v1, v2) -> v1, TreeMap::new));

reconstructionCoordinator.reconstructECContainerGroup(
reconstructionCommandInfo.getContainerID(),
reconstructionCommandInfo.getEcReplicationConfig(), sourceNodeMap,
targetNodeMap);
LOG.info("Completed the EC reconstruction of the container {}",
reconstructionCommandInfo.getContainerID());
reconstructionCommandInfo.getEcReplicationConfig(),
reconstructionCommandInfo.getSourceNodeMap(),
reconstructionCommandInfo.getTargetNodeMap());
long elapsed = Time.monotonicNow() - start;
LOG.info("Completed {} in {} ms", reconstructionCommandInfo, elapsed);
} catch (IOException e) {
LOG.warn(
"Failed to complete the reconstruction task for the container: "
+ reconstructionCommandInfo.getContainerID(), e);
long elapsed = Time.monotonicNow() - start;
LOG.warn("Failed {} after {} ms", reconstructionCommandInfo, elapsed, e);
} finally {
this.inprogressCounter.remove(containerID);
}
}

@Override
public String toString() {
return "ECReconstructionCoordinatorTask{" + "reconstructionCommandInfo="
+ reconstructionCommandInfo + '}';
return "ECReconstructionTask{info=" + reconstructionCommandInfo + '}';
}
}