Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
} finally {
releaseWriteLock();
}

LOG.info("Created pipeline {}.", pipeline);
recordMetricsForPipeline(pipeline);
return pipeline;
}
Expand Down Expand Up @@ -403,7 +403,7 @@ protected void removePipeline(Pipeline pipeline)
} finally {
releaseWriteLock();
}

LOG.info("Pipeline {} removed.", pipeline);
metrics.incNumPipelineDestroyed();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,14 @@ private void initialize() throws IOException {
@Override
public void addPipeline(HddsProtos.Pipeline pipelineProto)
throws IOException {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
lock.writeLock().lock();
try {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
if (pipelineStore != null) {
pipelineStateMap.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
transactionBuffer
.addToBuffer(pipelineStore, pipeline.getId(), pipeline);
LOG.info("Created pipeline {}.", pipeline);
}
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -234,44 +233,47 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
@Override
public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
throws IOException {
lock.writeLock().lock();
PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
try {
PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
nodeManager.removePipeline(pipeline);
if (pipelineStore != null) {
transactionBuffer.removeFromBuffer(pipelineStore, pipelineID);
Pipeline pipeline;
lock.writeLock().lock();
try {
pipeline = pipelineStateMap.removePipeline(pipelineID);
nodeManager.removePipeline(pipeline);
if (pipelineStore != null) {
transactionBuffer.removeFromBuffer(pipelineStore, pipelineID);
}
} finally {
lock.writeLock().unlock();
}
LOG.info("Pipeline {} removed.", pipeline);
} catch (PipelineNotFoundException pnfe) {
LOG.warn("Pipeline {} is not found in the pipeline Map. Pipeline"
+ " may have been deleted already.", pipelineIDProto.getId());
} finally {
lock.writeLock().unlock();
}
}


@Override
public void removeContainerFromPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
// Typica;;y, SCM can send a pipeline close Action to datanode and receive
// pipelineCloseAction to close the pipeline which will remove the
// pipelineId both from the piplineStateMap as well as
// pipeline2containerMap Subsequently, close container handler event can
// also try to close the container as a part of which , it will also
// try to remove the container from the pipeline2container Map which will
// fail with PipelineNotFoundException. These are executed over ratis, and
// if the exception is propagated to SCMStateMachine., it will bring down
// the SCM. Ignoring it here.
pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
lock.writeLock().lock();
try {
// Typically, SCM can send a pipeline close Action to datanode and
// receive pipelineCloseAction to close the pipeline which will remove
// the pipelineId both from the piplineStateMap as well as
// pipeline2containerMap Subsequently, close container handler event can
// also try to close the container as a part of which , it will also
// try to remove the container from the pipeline2container Map which
// will fail with PipelineNotFoundException. These are executed over
// ratis, and if the exception is propagated to SCMStateMachine, it will
// bring down the SCM. Ignoring it here.
pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
} catch (PipelineNotFoundException pnfe) {
LOG.info("Pipeline {} is not found in the pipeline2ContainerMap. Pipeline"
+ " may have been closed already.", pipelineID);
} finally {
lock.writeLock().unlock();
}
}

Expand All @@ -280,25 +282,28 @@ public void updatePipelineState(
HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
throws IOException {
PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
lock.writeLock().lock();
Pipeline.PipelineState newPipelineState =
Pipeline.PipelineState.fromProtobuf(newState);
try {
// null check is here to prevent the case where SCM store
// is closed but the staleNode handlers/pipeline creations
// still try to access it.
if (pipelineStore != null) {
pipelineStateMap.updatePipelineState(pipelineID,
Pipeline.PipelineState.fromProtobuf(newState));
transactionBuffer
.addToBuffer(pipelineStore, pipelineID, getPipeline(pipelineID));
lock.writeLock().lock();
try {
// null check is here to prevent the case where SCM store
// is closed but the staleNode handlers/pipeline creations
// still try to access it.
if (pipelineStore != null) {
pipelineStateMap.updatePipelineState(pipelineID, newPipelineState);
transactionBuffer
.addToBuffer(pipelineStore, pipelineID, getPipeline(pipelineID));
}
} finally {
lock.writeLock().unlock();
}
} catch (PipelineNotFoundException pnfe) {
LOG.warn("Pipeline {} is not found in the pipeline Map. Pipeline"
+ " may have been deleted already.", pipelineID);
} catch (IOException ex) {
LOG.error("Pipeline {} state update failed", pipelineID);
throw ex;
} finally {
lock.writeLock().unlock();
}
}

Expand All @@ -311,7 +316,7 @@ public void close() throws Exception {
pipelineStore = null;
}
} catch (Exception ex) {
LOG.error("Pipeline store close failed", ex);
LOG.error("Pipeline store close failed", ex);
} finally {
lock.writeLock().unlock();
}
Expand Down