diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 33c587995a32..cbfe9a6a076b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -221,13 +221,12 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, } acquireWriteLock(); + final Pipeline pipeline; try { - Pipeline pipeline = pipelineFactory.create(replicationConfig, + pipeline = pipelineFactory.create(replicationConfig, excludedNodes, favoredNodes); stateManager.addPipeline(pipeline.getProtobufMessage( ClientVersion.CURRENT_VERSION)); - recordMetricsForPipeline(pipeline); - return pipeline; } catch (IOException | TimeoutException ex) { LOG.debug("Failed to create pipeline with replicationConfig {}.", replicationConfig, ex); @@ -236,6 +235,9 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, } finally { releaseWriteLock(); } + + recordMetricsForPipeline(pipeline); + return pipeline; } private boolean factorOne(ReplicationConfig replicationConfig) { @@ -309,14 +311,14 @@ public List getPipelines( .getPipelines(replicationConfig, state, excludeDns, excludePipelines); } - @Override /** * Returns the count of pipelines meeting the given ReplicationConfig and * state. - * @param replicationConfig The ReplicationConfig of the pipelines to count + * @param config The ReplicationConfig of the pipelines to count * @param state The current state of the pipelines to count * @return The count of pipelines meeting the above criteria */ + @Override public int getPipelineCount(ReplicationConfig config, Pipeline.PipelineState state) { return stateManager.getPipelineCount(config, state); @@ -357,22 +359,29 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { @Override public void openPipeline(PipelineID pipelineId) throws IOException, TimeoutException { + HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf(); acquireWriteLock(); + final Pipeline pipeline; + boolean opened = false; try { - Pipeline pipeline = stateManager.getPipeline(pipelineId); + pipeline = stateManager.getPipeline(pipelineId); if (pipeline.isClosed()) { throw new IOException("Closed pipeline can not be opened"); } if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { - LOG.info("Pipeline {} moved to OPEN state", pipeline); - stateManager.updatePipelineState( - pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_OPEN); + stateManager.updatePipelineState(pipelineIdProtobuf, + HddsProtos.PipelineState.PIPELINE_OPEN); + opened = true; } - metrics.incNumPipelineCreated(); - metrics.createPerPipelineMetrics(pipeline); } finally { releaseWriteLock(); } + + if (opened) { + LOG.info("Pipeline {} moved to OPEN state", pipeline); + } + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); } /** @@ -384,17 +393,18 @@ public void openPipeline(PipelineID pipelineId) protected void removePipeline(Pipeline pipeline) throws IOException, TimeoutException { pipelineFactory.close(pipeline.getType(), pipeline); - PipelineID pipelineID = pipeline.getId(); + HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf(); acquireWriteLock(); try { - stateManager.removePipeline(pipelineID.getProtobuf()); - metrics.incNumPipelineDestroyed(); + stateManager.removePipeline(pipelineID); } catch (IOException ex) { metrics.incNumPipelineDestroyFailed(); throw ex; } finally { releaseWriteLock(); } + + metrics.incNumPipelineDestroyed(); } /** @@ -402,7 +412,7 @@ protected void removePipeline(Pipeline pipeline) * @param pipelineId - ID of the pipeline. * @throws IOException */ - protected void closeContainersForPipeline(final PipelineID pipelineId) + private void closeContainersForPipeline(final PipelineID pipelineId) throws IOException, TimeoutException { Set containerIDs = stateManager.getContainers(pipelineId); ContainerManager containerManager = scmContext.getScm() @@ -432,19 +442,23 @@ protected void closeContainersForPipeline(final PipelineID pipelineId) public void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException, TimeoutException { PipelineID pipelineID = pipeline.getId(); + HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); // close containers. closeContainersForPipeline(pipelineID); - acquireWriteLock(); - try { - if (!pipeline.isClosed()) { - stateManager.updatePipelineState(pipelineID.getProtobuf(), + + if (!pipeline.isClosed()) { + acquireWriteLock(); + try { + stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_CLOSED); - LOG.info("Pipeline {} moved to CLOSED state", pipeline); + } finally { + releaseWriteLock(); } - metrics.removePipelineMetrics(pipelineID); - } finally { - releaseWriteLock(); + LOG.info("Pipeline {} moved to CLOSED state", pipeline); } + + metrics.removePipelineMetrics(pipelineID); + if (!onTimeout) { // close pipeline right away. removePipeline(pipeline); @@ -497,7 +511,7 @@ List getStalePipelines(DatanodeDetails datanodeDetails) { @Override public void scrubPipelines() throws IOException, TimeoutException { Instant currentTime = clock.instant(); - Long pipelineScrubTimeoutInMills = conf.getTimeDuration( + long pipelineScrubTimeoutInMills = conf.getTimeDuration( ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); @@ -595,10 +609,11 @@ public int minPipelineLimit(Pipeline pipeline) { @Override public void activatePipeline(PipelineID pipelineID) throws IOException, TimeoutException { + HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); acquireWriteLock(); try { - stateManager.updatePipelineState(pipelineID.getProtobuf(), - HddsProtos.PipelineState.PIPELINE_OPEN); + stateManager.updatePipelineState(pipelineIDProtobuf, + HddsProtos.PipelineState.PIPELINE_OPEN); } finally { releaseWriteLock(); } @@ -613,9 +628,10 @@ public void activatePipeline(PipelineID pipelineID) @Override public void deactivatePipeline(PipelineID pipelineID) throws IOException, TimeoutException { + HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); acquireWriteLock(); try { - stateManager.updatePipelineState(pipelineID.getProtobuf(), + stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_DORMANT); } finally { releaseWriteLock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java index af3c7b2c4142..a697e2e293cb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java @@ -56,8 +56,8 @@ public final class SCMPipelineMetrics implements MetricsSource { private @Metric MutableCounterLong numPipelineReportProcessed; private @Metric MutableCounterLong numPipelineReportProcessingFailed; private @Metric MutableCounterLong numPipelineContainSameDatanodes; - private Map numBlocksAllocated; - private Map numBytesWritten; + private final Map numBlocksAllocated; + private final Map numBytesWritten; /** Private constructor. */ private SCMPipelineMetrics() { @@ -84,7 +84,7 @@ public static synchronized SCMPipelineMetrics create() { /** * Unregister the metrics instance. */ - public static void unRegister() { + public static synchronized void unRegister() { instance = null; MetricsSystem ms = DefaultMetricsSystem.instance(); ms.unregisterSource(SOURCE_NAME); @@ -112,9 +112,7 @@ void createPerPipelineMetrics(Pipeline pipeline) { numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns .info(getBlockAllocationMetricName(pipeline), "Number of blocks allocated in pipeline " + pipeline.getId()), 0L)); - numBytesWritten.put(pipeline.getId(), new MutableCounterLong(Interns - .info(getBytesWrittenMetricName(pipeline), - "Number of bytes written into pipeline " + pipeline.getId()), 0L)); + numBytesWritten.put(pipeline.getId(), bytesWrittenCounter(pipeline, 0L)); } public static String getBlockAllocationMetricName(Pipeline pipeline) { @@ -159,9 +157,16 @@ void incNumPipelineCreated() { * Increments the number of total bytes that write into the pipeline. */ void incNumPipelineBytesWritten(Pipeline pipeline, long bytes) { - numBytesWritten.put(pipeline.getId(), new MutableCounterLong( - Interns.info(getBytesWrittenMetricName(pipeline), "Number of" + - " bytes written into pipeline " + pipeline.getId()), bytes)); + numBytesWritten.computeIfPresent(pipeline.getId(), + (k, v) -> bytesWrittenCounter(pipeline, bytes)); + } + + private static MutableCounterLong bytesWrittenCounter( + Pipeline pipeline, long bytes) { + return new MutableCounterLong( + Interns.info(getBytesWrittenMetricName(pipeline), + "Number of bytes written into pipeline " + pipeline.getId()), + bytes); } /**