Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ Pipeline createPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException;

Pipeline buildECPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes,
List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException;

void addEcPipeline(Pipeline pipeline) throws IOException, TimeoutException;


Pipeline createPipeline(
ReplicationConfig replicationConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,48 @@ public static PipelineManagerImpl newPipelineManager(
return pipelineManager;
}

/**
* Build a new pipeline and return it, but do not add it to the pipeline
* manager. This new pipeline will be in ALLOCATED state, but also unavailable
* to clients in the system until it is added to the pipeline manager via the
* addPipeline method.
* @param replicationConfig
* @param excludedNodes
* @param favoredNodes
* @return The created pipeline.
* @throws IOException
*/
@Override
public Pipeline buildECPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException {
if (replicationConfig.getReplicationType() != ReplicationType.EC) {
throw new IllegalArgumentException("Replication type must be EC");
}
checkIfPipelineCreationIsAllowed(replicationConfig);
return pipelineFactory.create(replicationConfig, excludedNodes,
favoredNodes);
}

/**
* Add a previously built pipeline to the pipeline manager. This will allow
* the pipline to be used by clients in the system.
* @param pipeline
* @throws IOException
* @throws TimeoutException
*/
@Override
public void addEcPipeline(Pipeline pipeline)
throws IOException, TimeoutException {
if (pipeline.getReplicationConfig().getReplicationType()
!= ReplicationType.EC) {
throw new IllegalArgumentException(
"Pipeline replication type must be EC");
}
checkIfPipelineCreationIsAllowed(pipeline.getReplicationConfig());
addPipelineToManager(pipeline);
}

@Override
public Pipeline createPipeline(ReplicationConfig replicationConfig)
throws IOException, TimeoutException {
Expand All @@ -206,6 +248,27 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig)
public Pipeline createPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException {
checkIfPipelineCreationIsAllowed(replicationConfig);

acquireWriteLock();
final Pipeline pipeline;
try {
try {
pipeline = pipelineFactory.create(replicationConfig,
excludedNodes, favoredNodes);
} catch (IOException e) {
metrics.incNumPipelineCreationFailed();
throw e;
}
addPipelineToManager(pipeline);
return pipeline;
} finally {
releaseWriteLock();
}
}

private void checkIfPipelineCreationIsAllowed(
ReplicationConfig replicationConfig) throws IOException {
if (!isPipelineCreationAllowed() && !factorOne(replicationConfig)) {
LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
"complete");
Expand All @@ -219,25 +282,24 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
LOG.info(message);
throw new IOException(message);
}
}

private void addPipelineToManager(Pipeline pipeline)
throws IOException, TimeoutException {
HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION);
acquireWriteLock();
final Pipeline pipeline;
try {
pipeline = pipelineFactory.create(replicationConfig,
excludedNodes, favoredNodes);
stateManager.addPipeline(pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION));
stateManager.addPipeline(pipelineProto);
} catch (IOException | TimeoutException ex) {
LOG.debug("Failed to create pipeline with replicationConfig {}.",
replicationConfig, ex);
LOG.debug("Failed to add pipeline {}.", pipeline, ex);
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
releaseWriteLock();
}
LOG.info("Created pipeline {}.", pipeline);
LOG.info("Added pipeline {}.", pipeline);
recordMetricsForPipeline(pipeline);
return pipeline;
}

private boolean factorOne(ReplicationConfig replicationConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,20 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig)
public Pipeline createPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException {
Pipeline pipeline = buildECPipeline(replicationConfig, excludedNodes,
favoredNodes);

stateManager.addPipeline(pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION));
return pipeline;
}

@Override
public Pipeline buildECPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException {
final List<DatanodeDetails> nodes = Stream.generate(
MockDatanodeDetails::randomDatanodeDetails)
MockDatanodeDetails::randomDatanodeDetails)
.limit(replicationConfig.getRequiredNodes())
.collect(Collectors.toList());
final Pipeline pipeline = Pipeline.newBuilder()
Expand All @@ -81,10 +93,14 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
.setNodes(nodes)
.setState(Pipeline.PipelineState.OPEN)
.build();
return pipeline;
}

@Override
public void addEcPipeline(Pipeline pipeline)
throws IOException, TimeoutException {
stateManager.addPipeline(pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION));
return pipeline;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -194,6 +195,16 @@ public void testCreatePipeline() throws Exception {
RatisReplicationConfig.getInstance(ReplicationFactor.ONE));
Assertions.assertEquals(2, pipelineManager.getPipelines().size());
Assertions.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));

Pipeline builtPipeline = pipelineManager.buildECPipeline(
new ECReplicationConfig(3, 2),
Collections.emptyList(), Collections.emptyList());
pipelineManager.addEcPipeline(builtPipeline);

Assertions.assertEquals(3, pipelineManager.getPipelines().size());
Assertions.assertTrue(pipelineManager.containsPipeline(
builtPipeline.getId()));

buffer1.close();
pipelineManager.close();

Expand All @@ -203,11 +214,11 @@ public void testCreatePipeline() throws Exception {
createPipelineManager(true, buffer2);
// Should be able to load previous pipelines.
Assertions.assertFalse(pipelineManager2.getPipelines().isEmpty());
Assertions.assertEquals(2, pipelineManager.getPipelines().size());
Assertions.assertEquals(3, pipelineManager.getPipelines().size());
Pipeline pipeline3 = pipelineManager2.createPipeline(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
buffer2.close();
Assertions.assertEquals(3, pipelineManager2.getPipelines().size());
Assertions.assertEquals(4, pipelineManager2.getPipelines().size());
Assertions.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId()));

pipelineManager2.close();
Expand Down