Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* The information of the request of pipeline.
*/
public final class PipelineRequestInformation {
private long size;
private final long size;

/**
* Builder for PipelineRequestInformation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,97 +80,64 @@ public ContainerInfo getContainer(final long size,
So we can use different kind of policies.
*/

ContainerInfo containerInfo = null;
String failureReason = null;

//TODO we need to continue the refactor to use repConfig everywhere
//in downstream managers.

PipelineRequestInformation req =
PipelineRequestInformation.Builder.getBuilder().setSize(size).build();

while (true) {
List<Pipeline> availablePipelines;
Pipeline pipeline;
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
try {
availablePipelines =
findPipelinesByState(repConfig,
excludeList,
Pipeline.PipelineState.OPEN);
if (availablePipelines.size() != 0) {
containerInfo = selectContainer(availablePipelines, size, owner,
excludeList);
}
if (containerInfo != null) {
return containerInfo;
}
} finally {
pipelineManager.releaseReadLock();
}
ContainerInfo containerInfo =
getContainer(repConfig, owner, excludeList, req);
if (containerInfo != null) {
return containerInfo;
}

if (availablePipelines.size() == 0) {
try {
// TODO: #CLUTIL Remove creation logic when all replication types
// and factors are handled by pipeline creator
Pipeline pipeline = pipelineManager.createPipeline(repConfig);

// wait until pipeline is ready
pipelineManager.waitPipelineReady(pipeline.getId(), 0);

} catch (SCMException se) {
LOG.warn("Pipeline creation failed for repConfig {} " +
"Datanodes may be used up. Try to see if any pipeline is in " +
"ALLOCATED state, and then will wait for it to be OPEN",
repConfig, se);
List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
excludeList,
Pipeline.PipelineState.ALLOCATED);
if (!allocatedPipelines.isEmpty()) {
List<PipelineID> allocatedPipelineIDs =
allocatedPipelines.stream()
.map(p -> p.getId())
.collect(Collectors.toList());
try {
// TODO: #CLUTIL Remove creation logic when all replication types
// and factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(repConfig);

// wait until pipeline is ready
pipelineManager.waitPipelineReady(pipeline.getId(), 0);

} catch (SCMException se) {
LOG.warn("Pipeline creation failed for repConfig {} " +
"Datanodes may be used up. Try to see if any pipeline is in " +
"ALLOCATED state, and then will wait for it to be OPEN",
repConfig, se);
List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
excludeList,
Pipeline.PipelineState.ALLOCATED);
if (!allocatedPipelines.isEmpty()) {
List<PipelineID> allocatedPipelineIDs =
allocatedPipelines.stream()
.map(p -> p.getId())
.collect(Collectors.toList());
try {
pipelineManager
.waitOnePipelineReady(allocatedPipelineIDs, 0);
} catch (IOException e) {
LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
allocatedPipelineIDs, e);
failureReason = "Waiting for one of pipelines to be OPEN failed. "
+ e.getMessage();
}
} else {
failureReason = se.getMessage();
}
pipelineManager
.waitOnePipelineReady(allocatedPipelineIDs, 0);
} catch (IOException e) {
LOG.warn("Pipeline creation failed for repConfig: {}. "
+ "Retrying get pipelines call once.", repConfig, e);
failureReason = e.getMessage();
}

pipelineManager.acquireReadLock();
try {
// If Exception occurred or successful creation of pipeline do one
// final try to fetch pipelines.
availablePipelines = findPipelinesByState(repConfig,
excludeList,
Pipeline.PipelineState.OPEN);
if (availablePipelines.size() == 0) {
LOG.info("Could not find available pipeline of repConfig: {} "
+ "even after retrying", repConfig);
break;
}
containerInfo = selectContainer(availablePipelines, size, owner,
excludeList);
if (containerInfo != null) {
return containerInfo;
}
} finally {
pipelineManager.releaseReadLock();
LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
allocatedPipelineIDs, e);
failureReason = "Waiting for one of pipelines to be OPEN failed. "
+ e.getMessage();
}
} else {
failureReason = se.getMessage();
}
} catch (IOException e) {
LOG.warn("Pipeline creation failed for repConfig: {}. "
+ "Retrying get pipelines call once.", repConfig, e);
failureReason = e.getMessage();
}

// If Exception occurred or successful creation of pipeline do one
// final try to fetch pipelines.
containerInfo = getContainer(repConfig, owner, excludeList, req);
if (containerInfo != null) {
return containerInfo;
}

// we have tried all strategies we know but somehow we are not able
Expand All @@ -182,6 +150,22 @@ public ContainerInfo getContainer(final long size,
+ ", replicationConfig: " + repConfig + ". " + failureReason);
}

@Nullable
private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
ExcludeList excludeList, PipelineRequestInformation req) {
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
try {
List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
excludeList, Pipeline.PipelineState.OPEN);
return selectContainer(availablePipelines, req, owner, excludeList);
} finally {
pipelineManager.releaseReadLock();
}
}

private List<Pipeline> findPipelinesByState(
final ReplicationConfig repConfig,
final ExcludeList excludeList,
Expand All @@ -197,23 +181,26 @@ private List<Pipeline> findPipelinesByState(
return pipelines;
}

private ContainerInfo selectContainer(List<Pipeline> availablePipelines,
long size, String owner, ExcludeList excludeList) {
Pipeline pipeline;
ContainerInfo containerInfo;
private @Nullable ContainerInfo selectContainer(
List<Pipeline> availablePipelines, PipelineRequestInformation req,
String owner, ExcludeList excludeList) {

PipelineRequestInformation pri =
PipelineRequestInformation.Builder.getBuilder().setSize(size)
.build();
pipeline = pipelineChoosePolicy.choosePipeline(
availablePipelines, pri);
while (!availablePipelines.isEmpty()) {
Pipeline pipeline = pipelineChoosePolicy.choosePipeline(
availablePipelines, req);

// look for OPEN containers that match the criteria.
containerInfo = containerManager.getMatchingContainer(size, owner,
pipeline, excludeList.getContainerIds());
// look for OPEN containers that match the criteria.
final ContainerInfo containerInfo = containerManager.getMatchingContainer(
req.getSize(), owner, pipeline, excludeList.getContainerIds());

return containerInfo;
if (containerInfo != null) {
return containerInfo;
}

availablePipelines.remove(pipeline);
}

return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;

import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class TestWritableRatisContainerProvider {

private static final ReplicationConfig REPLICATION_CONFIG =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
private static final String OWNER = "owner";
private static final int CONTAINER_SIZE = 1234;
private static final ExcludeList NO_EXCLUSION = new ExcludeList();

private final OzoneConfiguration conf = new OzoneConfiguration();
private final PipelineChoosePolicy policy = new RandomPipelineChoosePolicy();
private final AtomicLong containerID = new AtomicLong(1);

@Mock
private PipelineManager pipelineManager;

@Mock
private ContainerManager containerManager;

@Test
void returnsExistingContainer() throws Exception {
Pipeline pipeline = MockPipeline.createPipeline(3);
ContainerInfo existingContainer = pipelineHasContainer(pipeline);

existingPipelines(pipeline);

ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION);

assertSame(existingContainer, container);
verifyPipelineNotCreated();
}

@Test
void skipsPipelineWithoutContainer() throws Exception {
Pipeline pipeline = MockPipeline.createPipeline(3);
ContainerInfo existingContainer = pipelineHasContainer(pipeline);

Pipeline pipelineWithoutContainer = MockPipeline.createPipeline(3);
existingPipelines(pipelineWithoutContainer, pipeline);

ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION);

assertSame(existingContainer, container);
verifyPipelineNotCreated();
}

@Test
void createsNewContainerIfNoneFound() throws Exception {
ContainerInfo newContainer = createNewContainerOnDemand();

ContainerInfo container = createSubject().getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION);

assertSame(newContainer, container);
verifyPipelineCreated();
}

@Test
void failsIfContainerCannotBeCreated() throws Exception {
throwWhenCreatePipeline();

assertThrows(IOException.class,
() -> createSubject().getContainer(CONTAINER_SIZE, REPLICATION_CONFIG, OWNER, NO_EXCLUSION));

verifyPipelineCreated();
}

private void existingPipelines(Pipeline... pipelines) {
existingPipelines(asList(pipelines));
}

private void existingPipelines(List<Pipeline> pipelines) {
when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet()))
.thenReturn(pipelines);
}

private ContainerInfo pipelineHasContainer(Pipeline pipeline) {
ContainerInfo container = new ContainerInfo.Builder()
.setContainerID(containerID.getAndIncrement())
.setPipelineID(pipeline.getId())
.build();

when(containerManager.getMatchingContainer(CONTAINER_SIZE, OWNER, pipeline, emptySet()))
.thenReturn(container);

return container;
}

private ContainerInfo createNewContainerOnDemand() throws IOException {
Pipeline newPipeline = MockPipeline.createPipeline(3);
when(pipelineManager.createPipeline(REPLICATION_CONFIG))
.thenReturn(newPipeline);

when(pipelineManager.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet()))
.thenReturn(emptyList())
.thenReturn(singletonList(newPipeline));

return pipelineHasContainer(newPipeline);
}

private void throwWhenCreatePipeline() throws IOException {
when(pipelineManager.createPipeline(REPLICATION_CONFIG))
.thenThrow(new SCMException(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE));
}

private WritableRatisContainerProvider createSubject() {
return new WritableRatisContainerProvider(conf,
pipelineManager, containerManager, policy);
}

private void verifyPipelineCreated() throws IOException {
verify(pipelineManager, times(2))
.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet());
verify(pipelineManager)
.createPipeline(REPLICATION_CONFIG);
}

private void verifyPipelineNotCreated() throws IOException {
verify(pipelineManager, times(1))
.getPipelines(REPLICATION_CONFIG, OPEN, emptySet(), emptySet());
verify(pipelineManager, never())
.createPipeline(REPLICATION_CONFIG);
}

}
Loading