Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,12 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
}

acquireWriteLock();
final Pipeline pipeline;
try {
Pipeline pipeline = pipelineFactory.create(replicationConfig,
pipeline = pipelineFactory.create(replicationConfig,
excludedNodes, favoredNodes);
stateManager.addPipeline(pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION));
recordMetricsForPipeline(pipeline);
return pipeline;
} catch (IOException | TimeoutException ex) {
LOG.debug("Failed to create pipeline with replicationConfig {}.",
replicationConfig, ex);
Expand All @@ -236,6 +235,9 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
} finally {
releaseWriteLock();
}

recordMetricsForPipeline(pipeline);
return pipeline;
}

private boolean factorOne(ReplicationConfig replicationConfig) {
Expand Down Expand Up @@ -309,14 +311,14 @@ public List<Pipeline> getPipelines(
.getPipelines(replicationConfig, state, excludeDns, excludePipelines);
}

@Override
/**
* Returns the count of pipelines meeting the given ReplicationConfig and
* state.
* @param replicationConfig The ReplicationConfig of the pipelines to count
* @param config The ReplicationConfig of the pipelines to count
* @param state The current state of the pipelines to count
* @return The count of pipelines meeting the above criteria
*/
@Override
public int getPipelineCount(ReplicationConfig config,
Pipeline.PipelineState state) {
return stateManager.getPipelineCount(config, state);
Expand Down Expand Up @@ -357,22 +359,29 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
@Override
public void openPipeline(PipelineID pipelineId)
throws IOException, TimeoutException {
HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();
acquireWriteLock();
final Pipeline pipeline;
boolean opened = false;
try {
Pipeline pipeline = stateManager.getPipeline(pipelineId);
pipeline = stateManager.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
}
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
LOG.info("Pipeline {} moved to OPEN state", pipeline);
stateManager.updatePipelineState(
pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_OPEN);
stateManager.updatePipelineState(pipelineIdProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
opened = true;
}
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
} finally {
releaseWriteLock();
}

if (opened) {
LOG.info("Pipeline {} moved to OPEN state", pipeline);
}
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
}

/**
Expand All @@ -384,25 +393,26 @@ public void openPipeline(PipelineID pipelineId)
protected void removePipeline(Pipeline pipeline)
throws IOException, TimeoutException {
pipelineFactory.close(pipeline.getType(), pipeline);
PipelineID pipelineID = pipeline.getId();
HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
acquireWriteLock();
try {
stateManager.removePipeline(pipelineID.getProtobuf());
metrics.incNumPipelineDestroyed();
stateManager.removePipeline(pipelineID);
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
releaseWriteLock();
}

metrics.incNumPipelineDestroyed();
}

/**
* Fire events to close all containers related to the input pipeline.
* @param pipelineId - ID of the pipeline.
* @throws IOException
*/
protected void closeContainersForPipeline(final PipelineID pipelineId)
private void closeContainersForPipeline(final PipelineID pipelineId)
throws IOException, TimeoutException {
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
ContainerManager containerManager = scmContext.getScm()
Expand Down Expand Up @@ -432,19 +442,23 @@ protected void closeContainersForPipeline(final PipelineID pipelineId)
public void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException, TimeoutException {
PipelineID pipelineID = pipeline.getId();
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
acquireWriteLock();
try {
if (!pipeline.isClosed()) {
stateManager.updatePipelineState(pipelineID.getProtobuf(),

if (!pipeline.isClosed()) {
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
LOG.info("Pipeline {} moved to CLOSED state", pipeline);
} finally {
releaseWriteLock();
}
metrics.removePipelineMetrics(pipelineID);
} finally {
releaseWriteLock();
LOG.info("Pipeline {} moved to CLOSED state", pipeline);
}

metrics.removePipelineMetrics(pipelineID);

if (!onTimeout) {
// close pipeline right away.
removePipeline(pipeline);
Expand Down Expand Up @@ -497,7 +511,7 @@ List<Pipeline> getStalePipelines(DatanodeDetails datanodeDetails) {
@Override
public void scrubPipelines() throws IOException, TimeoutException {
Instant currentTime = clock.instant();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -595,10 +609,11 @@ public int minPipelineLimit(Pipeline pipeline) {
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException, TimeoutException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineID.getProtobuf(),
HddsProtos.PipelineState.PIPELINE_OPEN);
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
} finally {
releaseWriteLock();
}
Expand All @@ -613,9 +628,10 @@ public void activatePipeline(PipelineID pipelineID)
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException, TimeoutException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineID.getProtobuf(),
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_DORMANT);
} finally {
releaseWriteLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public final class SCMPipelineMetrics implements MetricsSource {
private @Metric MutableCounterLong numPipelineReportProcessed;
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
private @Metric MutableCounterLong numPipelineContainSameDatanodes;
private Map<PipelineID, MutableCounterLong> numBlocksAllocated;
private Map<PipelineID, MutableCounterLong> numBytesWritten;
private final Map<PipelineID, MutableCounterLong> numBlocksAllocated;
private final Map<PipelineID, MutableCounterLong> numBytesWritten;

/** Private constructor. */
private SCMPipelineMetrics() {
Expand All @@ -84,7 +84,7 @@ public static synchronized SCMPipelineMetrics create() {
/**
* Unregister the metrics instance.
*/
public static void unRegister() {
public static synchronized void unRegister() {
instance = null;
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
Expand Down Expand Up @@ -112,9 +112,7 @@ void createPerPipelineMetrics(Pipeline pipeline) {
numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns
.info(getBlockAllocationMetricName(pipeline),
"Number of blocks allocated in pipeline " + pipeline.getId()), 0L));
numBytesWritten.put(pipeline.getId(), new MutableCounterLong(Interns
.info(getBytesWrittenMetricName(pipeline),
"Number of bytes written into pipeline " + pipeline.getId()), 0L));
numBytesWritten.put(pipeline.getId(), bytesWrittenCounter(pipeline, 0L));
}

public static String getBlockAllocationMetricName(Pipeline pipeline) {
Expand Down Expand Up @@ -159,9 +157,16 @@ void incNumPipelineCreated() {
* Increments the number of total bytes that write into the pipeline.
*/
void incNumPipelineBytesWritten(Pipeline pipeline, long bytes) {
numBytesWritten.put(pipeline.getId(), new MutableCounterLong(
Interns.info(getBytesWrittenMetricName(pipeline), "Number of" +
" bytes written into pipeline " + pipeline.getId()), bytes));
numBytesWritten.computeIfPresent(pipeline.getId(),
(k, v) -> bytesWrittenCounter(pipeline, bytes));
}

private static MutableCounterLong bytesWrittenCounter(
Pipeline pipeline, long bytes) {
return new MutableCounterLong(
Interns.info(getBytesWrittenMetricName(pipeline),
"Number of bytes written into pipeline " + pipeline.getId()),
bytes);
}

/**
Expand Down