diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index cbfe9a6a076b..e7b6bf1f7820 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -235,7 +235,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, } finally { releaseWriteLock(); } - + LOG.info("Created pipeline {}.", pipeline); recordMetricsForPipeline(pipeline); return pipeline; } @@ -403,7 +403,7 @@ protected void removePipeline(Pipeline pipeline) } finally { releaseWriteLock(); } - + LOG.info("Pipeline {} removed.", pipeline); metrics.incNumPipelineDestroyed(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java index 8d9b7744c1eb..8b8dd79d674d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java @@ -92,15 +92,14 @@ private void initialize() throws IOException { @Override public void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException { + Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto); lock.writeLock().lock(); try { - Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto); if (pipelineStore != null) { pipelineStateMap.addPipeline(pipeline); nodeManager.addPipeline(pipeline); transactionBuffer .addToBuffer(pipelineStore, pipeline.getId(), pipeline); - LOG.info("Created pipeline {}.", pipeline); } } finally { lock.writeLock().unlock(); @@ -234,44 +233,47 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { @Override public void removePipeline(HddsProtos.PipelineID pipelineIDProto) throws IOException { - lock.writeLock().lock(); + PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto); try { - PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto); - Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID); - nodeManager.removePipeline(pipeline); - if (pipelineStore != null) { - transactionBuffer.removeFromBuffer(pipelineStore, pipelineID); + Pipeline pipeline; + lock.writeLock().lock(); + try { + pipeline = pipelineStateMap.removePipeline(pipelineID); + nodeManager.removePipeline(pipeline); + if (pipelineStore != null) { + transactionBuffer.removeFromBuffer(pipelineStore, pipelineID); + } + } finally { + lock.writeLock().unlock(); } - LOG.info("Pipeline {} removed.", pipeline); } catch (PipelineNotFoundException pnfe) { LOG.warn("Pipeline {} is not found in the pipeline Map. Pipeline" + " may have been deleted already.", pipelineIDProto.getId()); - } finally { - lock.writeLock().unlock(); } } - @Override public void removeContainerFromPipeline( PipelineID pipelineID, ContainerID containerID) throws IOException { - lock.writeLock().lock(); try { - // Typica;;y, SCM can send a pipeline close Action to datanode and receive - // pipelineCloseAction to close the pipeline which will remove the - // pipelineId both from the piplineStateMap as well as - // pipeline2containerMap Subsequently, close container handler event can - // also try to close the container as a part of which , it will also - // try to remove the container from the pipeline2container Map which will - // fail with PipelineNotFoundException. These are executed over ratis, and - // if the exception is propagated to SCMStateMachine., it will bring down - // the SCM. Ignoring it here. - pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID); + lock.writeLock().lock(); + try { + // Typically, SCM can send a pipeline close Action to datanode and + // receive pipelineCloseAction to close the pipeline which will remove + // the pipelineId both from the piplineStateMap as well as + // pipeline2containerMap Subsequently, close container handler event can + // also try to close the container as a part of which , it will also + // try to remove the container from the pipeline2container Map which + // will fail with PipelineNotFoundException. These are executed over + // ratis, and if the exception is propagated to SCMStateMachine, it will + // bring down the SCM. Ignoring it here. + pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID); + } finally { + lock.writeLock().unlock(); + } } catch (PipelineNotFoundException pnfe) { LOG.info("Pipeline {} is not found in the pipeline2ContainerMap. Pipeline" + " may have been closed already.", pipelineID); - } finally { - lock.writeLock().unlock(); } } @@ -280,16 +282,21 @@ public void updatePipelineState( HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState) throws IOException { PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto); - lock.writeLock().lock(); + Pipeline.PipelineState newPipelineState = + Pipeline.PipelineState.fromProtobuf(newState); try { - // null check is here to prevent the case where SCM store - // is closed but the staleNode handlers/pipeline creations - // still try to access it. - if (pipelineStore != null) { - pipelineStateMap.updatePipelineState(pipelineID, - Pipeline.PipelineState.fromProtobuf(newState)); - transactionBuffer - .addToBuffer(pipelineStore, pipelineID, getPipeline(pipelineID)); + lock.writeLock().lock(); + try { + // null check is here to prevent the case where SCM store + // is closed but the staleNode handlers/pipeline creations + // still try to access it. + if (pipelineStore != null) { + pipelineStateMap.updatePipelineState(pipelineID, newPipelineState); + transactionBuffer + .addToBuffer(pipelineStore, pipelineID, getPipeline(pipelineID)); + } + } finally { + lock.writeLock().unlock(); } } catch (PipelineNotFoundException pnfe) { LOG.warn("Pipeline {} is not found in the pipeline Map. Pipeline" @@ -297,8 +304,6 @@ public void updatePipelineState( } catch (IOException ex) { LOG.error("Pipeline {} state update failed", pipelineID); throw ex; - } finally { - lock.writeLock().unlock(); } } @@ -311,7 +316,7 @@ public void close() throws Exception { pipelineStore = null; } } catch (Exception ex) { - LOG.error("Pipeline store close failed", ex); + LOG.error("Pipeline store close failed", ex); } finally { lock.writeLock().unlock(); }