Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class AbstractContainerReportHandler {
*
* @throws IOException In case of any Exception while processing the report
*/
void processContainerReplica(final DatanodeDetails datanodeDetails,
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerReplicaProto replicaProto)
throws IOException {
final ContainerID containerId = ContainerID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public interface ContainerManager extends Closeable {
ContainerInfo getContainer(ContainerID containerID)
throws ContainerNotFoundException;

boolean exists(ContainerID containerID);

/**
* Returns containers under certain conditions.
* Search container IDs from start ID(exclusive),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,22 @@ ContainerInfo allocateContainer(
.setReplicationFactor(pipeline.getFactor())
.setReplicationType(pipeline.getType())
.build();
addContainerInfo(containerID, containerInfo, pipelineManager, pipeline);
if (LOG.isTraceEnabled()) {
LOG.trace("New container allocated: {}", containerInfo);
}
return containerInfo;
}

public void addContainerInfo(long containerID,
ContainerInfo containerInfo,
PipelineManager pipelineManager,
Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
pipelineManager.addContainerToPipeline(pipeline.getId(),
ContainerID.valueof(containerID));
containerStateCount.incrementAndGet(containerInfo.getState());
if (LOG.isTraceEnabled()) {
LOG.trace("New container allocated: {}", containerInfo);
}
return containerInfo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,

}

protected NodeManager getNodeManager() {
return this.nodeManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ public ContainerInfo getContainer(final ContainerID containerID)
return containerStateManager.getContainer(containerID);
}

@Override
public boolean exists(ContainerID containerID) {
lock.lock();
try {
return (containerStateManager.getContainer(containerID) != null);
} catch (ContainerNotFoundException e) {
return false;
} finally {
lock.unlock();
}
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -441,7 +453,7 @@ public ContainerInfo getMatchingContainer(final long sizeRequired,
* @param containerInfo
* @throws IOException
*/
private void addContainerToDB(ContainerInfo containerInfo)
protected void addContainerToDB(ContainerInfo containerInfo)
throws IOException {
try {
final byte[] containerIDBytes = Longs.toByteArray(
Expand Down Expand Up @@ -583,4 +595,12 @@ protected File getContainerDBPath(Configuration conf) {
File metaDir = ServerUtils.getScmDbDir(conf);
return new File(metaDir, SCM_CONTAINER_DB);
}

protected PipelineManager getPipelineManager() {
return pipelineManager;
}

public Lock getLock() {
return lock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.ozone.recon.ReconUtils;

Expand Down Expand Up @@ -55,4 +58,29 @@ protected File getContainerDBPath(Configuration conf) {
File metaDir = ReconUtils.getReconScmDbDir(conf);
return new File(metaDir, RECON_SCM_CONTAINER_DB);
}

/**
* Adds a new container to Recon's container manager.
* @param containerId id
* @param containerWithPipeline containerInfo with pipeline info
* @throws IOException on Error.
*/
public void addNewContainer(long containerId,
ContainerWithPipeline containerWithPipeline)
throws IOException {
ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
getLock().lock();
try {
getContainerStateManager().addContainerInfo(containerId, containerInfo,
getPipelineManager(), containerWithPipeline.getPipeline());
addContainerToDB(containerInfo);
} catch (IOException ex) {
getPipelineManager().removeContainerFromPipeline(
containerInfo.getPipelineID(),
new ContainerID(containerInfo.getContainerID()));
throw ex;
} finally {
getLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon.scm;

import java.io.IOException;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Recon ICR handler.
*/
public class ReconIncrementalContainerReportHandler
extends IncrementalContainerReportHandler {

private static final Logger LOG = LoggerFactory.getLogger(
ReconIncrementalContainerReportHandler.class);

private StorageContainerServiceProvider scmClient;

public ReconIncrementalContainerReportHandler(NodeManager nodeManager,
ContainerManager containerManager,
StorageContainerServiceProvider scmClient) {
super(nodeManager, containerManager);
this.scmClient = scmClient;
}

@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing incremental container report from data node {}",
report.getDatanodeDetails());
}

ReconContainerManager containerManager =
(ReconContainerManager) getContainerManager();
boolean success = true;
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
try {
final DatanodeDetails dd = report.getDatanodeDetails();
final ContainerID id = ContainerID.valueof(
replicaProto.getContainerID());
if (!getContainerManager().exists(id)) {
LOG.info("New container {} got from {}.", id,
report.getDatanodeDetails());
try {
ContainerWithPipeline containerWithPipeline =
scmClient.getContainerWithPipeline(id.getId());
containerManager.addNewContainer(id.getId(), containerWithPipeline);
} catch (IOException ioEx) {
LOG.error("Exception while getting new container info from SCM",
ioEx);
return;
}
}
getNodeManager().addContainer(dd, id);
processContainerReplica(dd, replicaProto);
} catch (ContainerNotFoundException e) {
success = false;
LOG.warn("Container {} not found!", replicaProto.getContainerID());
} catch (NodeNotFoundException ex) {
success = false;
LOG.error("Received ICR from unknown datanode {} {}",
report.getDatanodeDetails(), ex);
} catch (IOException e) {
success = false;
LOG.error("Exception while processing ICR for container {}",
replicaProto.getContainerID());
}
}
getContainerManager().notifyContainerReportProcessing(false, success);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(scmNodeManager, containerManager);
IncrementalContainerReportHandler icrHandler =
new IncrementalContainerReportHandler(scmNodeManager, containerManager);
new ReconIncrementalContainerReportHandler(scmNodeManager,
containerManager, scmServiceProvider);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, containerManager);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;

/**
Expand All @@ -43,4 +44,13 @@ public interface StorageContainerServiceProvider {
* @throws IOException in case of exception
*/
Pipeline getPipeline(HddsProtos.PipelineID pipelineID) throws IOException;

/**
* Requests SCM for a container given ID.
* @param containerId containerId
* @return ContainerInfo + Pipeline info
* @throws IOException in case of any exception.
*/
ContainerWithPipeline getContainerWithPipeline(long containerId)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.inject.Inject;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
Expand Down Expand Up @@ -54,4 +55,9 @@ public Pipeline getPipeline(HddsProtos.PipelineID pipelineID)
return scmClient.getPipeline(pipelineID);
}

@Override
public ContainerWithPipeline getContainerWithPipeline(long containerId)
throws IOException {
return scmClient.getContainerWithPipeline(containerId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon.scm;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;

import java.io.IOException;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

/**
* Abstract class for Recon Container Manager related tests.
*/
public class AbstractReconContainerManagerTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

private OzoneConfiguration conf;
private SCMStorageConfig scmStorageConfig;
private ReconPipelineManager pipelineManager;
private ReconContainerManager containerManager;

@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
conf.set(OZONE_METADATA_DIRS,
temporaryFolder.newFolder().getAbsolutePath());
conf.set(OZONE_SCM_NAMES, "localhost");
scmStorageConfig = new ReconStorageConfig(conf);
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
NodeManager nodeManager =
new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
pipelineManager = new ReconPipelineManager(conf, nodeManager, eventQueue);
containerManager = new ReconContainerManager(conf, pipelineManager);
}

@After
public void tearDown() throws IOException {
containerManager.close();
pipelineManager.close();
}

protected OzoneConfiguration getConf() {
return conf;
}

protected ReconPipelineManager getPipelineManager() {
return pipelineManager;
}

protected ReconContainerManager getContainerManager() {
return containerManager;
}

}
Loading