Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.Objects;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -45,6 +49,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
Expand All @@ -53,10 +58,12 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
Expand Down Expand Up @@ -112,7 +119,7 @@ public class StateContext {
private final Map<InetSocketAddress, List<Message>>
incrementalReportsQueue;
private final Map<InetSocketAddress, Queue<ContainerAction>> containerActions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jianghuazhu Will this issue also occur with container reporting? cc: @szetszwo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes? The code looks similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this incident, we did not find any exceptions related to containerActions. Therefore, I did not improve containerActions.
Do you think it is necessary to improve it together? @szetszwo .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. If there is a need for fixing containerActions, let's do it separately.

Copy link
Contributor

@slfan1989 slfan1989 Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contributions! We have updated the code, and its readability has improved, which is a great thing. The code for container reporting and pipeline reporting is similar, but since we have not encountered issues with the container code, does this imply that there are no problems with this segment of the code, considering that the number of containers is much larger than the number of pipelines?

Upon careful consideration of the differences between container and pipeline reporting, I personally suspect that the issue might be related to the Ratis state management in the pipeline. We have identified some details and will be submitting an issue. I hope to continue discussing this with you. @szetszwo

Copy link
Contributor

@slfan1989 slfan1989 Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding lifeline reporting, I understand that this is a standard operation in HDFS. However, I have concerns about the current implementation of this feature.

For example, if a pipeline causes a DataNode (DN) to become unavailable—meaning the DN cannot provide data services and the client cannot retrieve data from it—then marking the DN as DEAD is reasonable.
However, if there is a lifeline, the DN may appear to be healthy even though it is actually not, which can prevent maintenance personnel from detecting the issue.

Lifeline reporting is more suitable for scenarios where heavy operations impact the heartbeat, but the heartbeat can recover once the heavy operation is complete. Both pipeline and container reporting are lightweight, and from my perspective, I haven't observed these reports causing any significant load on the SCM.

cc: @ChenSammi

private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
private final Map<InetSocketAddress, PipelineActionMap> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
private boolean shutdownOnError = false;
private boolean shutdownGracefully = false;
Expand Down Expand Up @@ -178,7 +185,7 @@ public StateContext(ConfigurationSource conf,
pipelineReports = new AtomicReference<>();
endpoints = new HashSet<>();
containerActions = new HashMap<>();
pipelineActions = new HashMap<>();
pipelineActions = new ConcurrentHashMap<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
threadPoolNotAvailableCount = new AtomicLong(0);
Expand Down Expand Up @@ -518,47 +525,16 @@ public List<ContainerAction> getPendingContainerAction(
}
}

/**
* Helper function for addPipelineActionIfAbsent that check if inputs are the
* same close pipeline action.
*
* Important Note: Make sure to double check for correctness before using this
* helper function for other purposes!
*
* @return true if a1 and a2 are the same close pipeline action,
* false otherwise
*/
boolean isSameClosePipelineAction(PipelineAction a1, PipelineAction a2) {
return a1.getAction() == a2.getAction()
&& a1.hasClosePipeline()
&& a2.hasClosePipeline()
&& a1.getClosePipeline().getPipelineID()
.equals(a2.getClosePipeline().getPipelineID());
}

/**
* Add PipelineAction to PipelineAction queue if it's not present.
*
* @param pipelineAction PipelineAction to be added
*/
public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
synchronized (pipelineActions) {
/**
* If pipelineAction queue already contains entry for the pipeline id
* with same action, we should just return.
* Note: We should not use pipelineActions.contains(pipelineAction) here
* as, pipelineAction has a msg string. So even if two msgs differ though
* action remains same on the given pipeline, it will end up adding it
* multiple times here.
*/
for (InetSocketAddress endpoint : endpoints) {
final Queue<PipelineAction> actionsForEndpoint =
pipelineActions.get(endpoint);
if (actionsForEndpoint.stream().noneMatch(
action -> isSameClosePipelineAction(action, pipelineAction))) {
actionsForEndpoint.add(pipelineAction);
}
}
// Put only if the pipeline id with the same action is absent.
final PipelineKey key = new PipelineKey(pipelineAction);
for (InetSocketAddress endpoint : endpoints) {
pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction);
}
}

Expand All @@ -571,34 +547,17 @@ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
public List<PipelineAction> getPendingPipelineAction(
InetSocketAddress endpoint,
int maxLimit) {
List<PipelineAction> pipelineActionList = new ArrayList<>();
List<PipelineAction> persistPipelineAction = new ArrayList<>();
synchronized (pipelineActions) {
if (!pipelineActions.isEmpty() &&
CollectionUtils.isNotEmpty(pipelineActions.get(endpoint))) {
Queue<PipelineAction> actionsForEndpoint =
this.pipelineActions.get(endpoint);
int size = actionsForEndpoint.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
// Add closePipeline back to the pipelineAction queue until
// pipeline is closed and removed from the DN.
PipelineAction action = actionsForEndpoint.poll();
if (action.hasClosePipeline()) {
if (parentDatanodeStateMachine.getContainer().getPipelineReport()
.getPipelineReportList().stream().noneMatch(
report -> action.getClosePipeline().getPipelineID()
.equals(report.getPipelineID()))) {
continue;
}
persistPipelineAction.add(action);
}
pipelineActionList.add(action);
}
actionsForEndpoint.addAll(persistPipelineAction);
}
return pipelineActionList;
final PipelineActionMap map = pipelineActions.get(endpoint);
if (map == null) {
return Collections.emptyList();
}
final OzoneContainer ozoneContainer = parentDatanodeStateMachine.
getContainer();
if (ozoneContainer == null) {
return Collections.emptyList();
}
final PipelineReportsProto reports = ozoneContainer.getPipelineReport();
return map.getActions(reports.getPipelineReportList(), maxLimit);
}

/**
Expand Down Expand Up @@ -927,7 +886,7 @@ public void addEndpoint(InetSocketAddress endpoint) {
if (!endpoints.contains(endpoint)) {
this.endpoints.add(endpoint);
this.containerActions.put(endpoint, new LinkedList<>());
this.pipelineActions.put(endpoint, new LinkedList<>());
this.pipelineActions.put(endpoint, new PipelineActionMap());
this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
Map<String, AtomicBoolean> mp = new HashMap<>();
fullReportTypeList.forEach(e -> {
Expand Down Expand Up @@ -988,4 +947,79 @@ public DatanodeQueueMetrics getQueueMetrics() {
public String getThreadNamePrefix() {
return threadNamePrefix;
}

static class PipelineActionMap {
private final LinkedHashMap<PipelineKey, PipelineAction> map =
new LinkedHashMap<>();

synchronized int size() {
return map.size();
}

synchronized void putIfAbsent(PipelineKey key,
PipelineAction pipelineAction) {
map.putIfAbsent(key, pipelineAction);
}

synchronized List<PipelineAction> getActions(List<PipelineReport> reports,
int max) {
if (map.isEmpty()) {
return Collections.emptyList();
}
final List<PipelineAction> pipelineActionList = new ArrayList<>();
final int limit = Math.min(map.size(), max);
final Iterator<Map.Entry<PipelineKey, PipelineAction>> i =
map.entrySet().iterator();
for (int count = 0; count < limit && i.hasNext(); count++) {
final Map.Entry<PipelineKey, PipelineAction> entry = i.next();
final PipelineAction action = entry.getValue();
// Add closePipeline back to the pipelineAction queue until
// pipeline is closed and removed from the DN.
if (action.hasClosePipeline()) {
if (reports.stream().noneMatch(entry.getKey()::equalsId)) {
// pipeline is removed from the DN, this action is no longer needed.
i.remove();
continue;
}
// pipeline is closed but not yet removed from the DN.
} else {
i.remove();
}
pipelineActionList.add(action);
}
// add all
return pipelineActionList;
}
}

static class PipelineKey {
private final HddsProtos.PipelineID pipelineID;
private final PipelineAction.Action action;

PipelineKey(PipelineAction p) {
this.pipelineID = p.getClosePipeline().getPipelineID();
this.action = p.getAction();
}

boolean equalsId(PipelineReport report) {
return pipelineID.equals(report.getPipelineID());
}

@Override
public int hashCode() {
return Objects.hashCode(pipelineID);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
} else if (!(obj instanceof PipelineKey)) {
return false;
}
final PipelineKey that = (PipelineKey) obj;
return Objects.equals(this.action, that.action)
&& Objects.equals(this.pipelineID, that.pipelineID);
}
}
}