Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ public List<DatanodeDetails> chooseDatanodes(
placementStatus.expectedPlacementCount();
throw new SCMException(errorMsg, null);
}
if (nodesRequiredToChoose != chosenNodes.size()) {
String reason = "Chosen nodes size: " + chosenNodes
.size() + ", but required nodes to choose: " + nodesRequiredToChoose
+ " do not match.";
LOG.warn("Placement policy could not choose the enough nodes."
+ " {} Available nodes count: {}, Excluded nodes count: {}",
reason, totalNodesCount, excludedNodesCount);
throw new SCMException(reason,
SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
}
Comment on lines +230 to +239
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if validateContainerPlacement should verify the number of chosen nodes vs. required nodes (either in base class or this specific subclass).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that is more validating the placement with respective to racks. Some how we are still seeing the issue of getting less nodes than requested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I see the message is referencing racks.

return chosenNodes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,27 @@ public Pipeline create(
ReplicationConfig replicationConfig, List<DatanodeDetails> excludedNodes,
List<DatanodeDetails> favoredNodes)
throws IOException {
return providers
.get(replicationConfig.getReplicationType())
Pipeline pipeline = providers.get(replicationConfig.getReplicationType())
.create(replicationConfig, excludedNodes, favoredNodes);
checkPipeline(pipeline);
return pipeline;
}

private void checkPipeline(Pipeline pipeline) throws IOException {
// In case in case if provided pipeline provider returns null.
if (pipeline == null) {
throw new SCMException("Pipeline cannot be null",
SCMException.ResultCodes.INTERNAL_ERROR);
}
// In case if provided pipeline returns less number of nodes than
// required.
if (pipeline.getNodes().size() != pipeline.getReplicationConfig()
.getRequiredNodes()) {
throw new SCMException("Nodes size= " + pipeline.getNodes()
.size() + ", replication factor= " + pipeline.getReplicationConfig()
.getRequiredNodes() + " do not match",
SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
}
}

public Pipeline create(ReplicationConfig replicationConfig,
Expand Down