Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import com.google.protobuf.Message;
import java.io.IOException;
import java.time.Clock;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -41,7 +41,6 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.db.InMemoryTestTable;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
Expand All @@ -57,43 +56,39 @@ public class ContainerSet implements Iterable<Container<?>> {

private static final Logger LOG = LoggerFactory.getLogger(ContainerSet.class);

public static ContainerSet newReadOnlyContainerSet(long recoveringTimeout) {
return new ContainerSet(null, recoveringTimeout);
}

public static ContainerSet newRwContainerSet(Table<Long, String> containerIdsTable, long recoveringTimeout) {
Objects.requireNonNull(containerIdsTable, "containerIdsTable == null");
return new ContainerSet(containerIdsTable, recoveringTimeout);
}

private final ConcurrentSkipListMap<Long, Container<?>> containerMap = new
ConcurrentSkipListMap<>();
private final ConcurrentSkipListSet<Long> missingContainerSet =
new ConcurrentSkipListSet<>();
private final ConcurrentSkipListMap<Long, Long> recoveringContainerMap =
new ConcurrentSkipListMap<>();
private Clock clock;
private final Clock clock;
private long recoveringTimeout;
private final Table<Long, String> containerIdsTable;

@VisibleForTesting
public ContainerSet(long recoveringTimeout) {
this(new InMemoryTestTable<>(), recoveringTimeout);
private ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout) {
this(continerIdsTable, recoveringTimeout, null);
}

public ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout) {
this(continerIdsTable, recoveringTimeout, false);
}

public ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout, boolean readOnly) {
this.clock = Clock.system(ZoneOffset.UTC);
ContainerSet(Table<Long, String> continerIdsTable, long recoveringTimeout, Clock clock) {
this.clock = clock != null ? clock : Clock.systemUTC();
this.containerIdsTable = continerIdsTable;
this.recoveringTimeout = recoveringTimeout;
if (!readOnly && containerIdsTable == null) {
throw new IllegalArgumentException("Container table cannot be null when container set is not read only");
}
}

public long getCurrentTime() {
return clock.millis();
}

@VisibleForTesting
public void setClock(Clock clock) {
this.clock = clock;
}

@VisibleForTesting
public void setRecoveringTimeout(long recoveringTimeout) {
this.recoveringTimeout = recoveringTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public OzoneContainer(HddsDatanodeService hddsDatanodeService,
OZONE_RECOVERING_CONTAINER_TIMEOUT,
OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
this.witnessedContainerMetadataStore = WitnessedContainerMetadataStoreImpl.get(conf);
containerSet = new ContainerSet(witnessedContainerMetadataStore.getContainerIdsTable(), recoveringContainerTimeout);
containerSet = ContainerSet.newRwContainerSet(witnessedContainerMetadataStore.getContainerIdsTable(),
recoveringContainerTimeout);
metadataScanner = null;

metrics = ContainerMetrics.create(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
Expand Down Expand Up @@ -335,7 +335,7 @@ public static ContainerDispatcher getNoopContainerDispatcher() {
}

private static final ContainerController EMPTY_CONTAINER_CONTROLLER
= new ContainerController(new ContainerSet(1000), Collections.emptyMap());
= new ContainerController(ContainerImplTestUtils.newContainerSet(), Collections.emptyMap());

public static ContainerController getEmptyContainerController() {
return EMPTY_CONTAINER_CONTROLLER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.hadoop.ozone.container.common;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.COMMIT_STAGE;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG;
import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
Expand All @@ -50,6 +49,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -168,7 +168,8 @@ private KeyValueContainerData createToDeleteBlocks(ContainerSet containerSet,
} else {
chunkManager = new FilePerChunkStrategy(true, null);
}
byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8);
byte[] arr = new byte[1048576];
ThreadLocalRandom.current().nextBytes(arr);
ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr));
int txnID = 0;
long containerID = ContainerTestHelper.getTestContainerID();
Expand Down Expand Up @@ -425,7 +426,7 @@ public void testPendingDeleteBlockReset(ContainerTestVersionInfo versionInfo)
dnConf.setBlockDeletionLimit(blockDeleteLimit);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();

// Create one container with no actual pending delete blocks, but an
// incorrect metadata value indicating it has enough pending deletes to
Expand Down Expand Up @@ -533,7 +534,7 @@ public void testBlockDeletion(ContainerTestVersionInfo versionInfo)
dnConf.setBlockDeletionLimit(2);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
createToDeleteBlocks(containerSet, 1, 3, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
Expand Down Expand Up @@ -659,7 +660,7 @@ public void testWithUnrecordedBlocks(ContainerTestVersionInfo versionInfo)
dnConf.setBlockDeletionLimit(2);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();

createToDeleteBlocks(containerSet, numOfContainers, numOfBlocksPerContainer,
numOfChunksPerBlock);
Expand Down Expand Up @@ -767,7 +768,7 @@ public void testShutdownService(ContainerTestVersionInfo versionInfo)
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
TimeUnit.MILLISECONDS);

ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
// Create 1 container with 100 blocks
createToDeleteBlocks(containerSet, 1, 100, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Expand Down Expand Up @@ -798,7 +799,7 @@ public void testBlockDeletionTimeout(ContainerTestVersionInfo versionInfo)
blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);

ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
createToDeleteBlocks(containerSet, 1, 3, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
Expand Down Expand Up @@ -900,7 +901,7 @@ public void testContainerThrottle(ContainerTestVersionInfo versionInfo)
dnConf.setBlockDeletionLimit(1);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();

int containerCount = 2;
int chunksPerBlock = 10;
Expand Down Expand Up @@ -960,7 +961,7 @@ public void testContainerMaxLockHoldingTime(
dnConf.setBlockDeletingMaxLockHoldingTime(Duration.ofMillis(-1));
dnConf.setBlockDeletionLimit(3);
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();

int containerCount = 1;
int chunksPerBlock = 10;
Expand Down Expand Up @@ -1024,7 +1025,7 @@ public void testBlockThrottle(ContainerTestVersionInfo versionInfo)
dnConf.setBlockDeletionLimit(10);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
conf.setFromObject(dnConf);
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.common;

import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -564,7 +565,7 @@ private void runBlockDeletingService(KeyValueHandler keyValueHandler)
}

private ContainerSet makeContainerSet() throws Exception {
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
KeyValueContainer container = new KeyValueContainer(newKvData(), conf);
containerSet.addContainer(container);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.PENDING_DELETE_BLOCK_COUNT;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.COMMIT_STAGE;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,7 +132,7 @@ public void setup() throws Exception {
blockManager = new BlockManagerImpl(conf);
chunkManager = new FilePerBlockStrategy(true, blockManager);

containerSet = new ContainerSet(1000);
containerSet = newContainerSet();
keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, ContainerMetrics.create(conf), c -> { });
ozoneContainer = mock(OzoneContainer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyLong;
Expand Down Expand Up @@ -145,8 +146,7 @@ private List<Long> createTestContainers(
public void testScrubbingStaleRecoveringContainers(
ContainerTestVersionInfo versionInfo) throws Exception {
initVersionInfo(versionInfo);
ContainerSet containerSet = new ContainerSet(10);
containerSet.setClock(testClock);
ContainerSet containerSet = newContainerSet(10, testClock);
StaleRecoveringContainerScrubbingService srcss =
new StaleRecoveringContainerScrubbingService(
50, TimeUnit.MILLISECONDS, 10,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.common.impl;

import java.time.Clock;
import org.apache.hadoop.hdds.utils.db.InMemoryTestTable;

/**
* Helper utility to test container impl.
*/
public final class ContainerImplTestUtils {

private ContainerImplTestUtils() {
}

public static ContainerSet newContainerSet() {
return newContainerSet(1000);
}

public static ContainerSet newContainerSet(long recoveringTimeout) {
return ContainerSet.newRwContainerSet(new InMemoryTestTable<>(), recoveringTimeout);
}

public static ContainerSet newContainerSet(long recoveringTimeout, Clock clock) {
return new ContainerSet(new InMemoryTestTable<>(), recoveringTimeout, clock);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.common.impl;

import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void testRandomChoosingPolicy(ContainerLayoutVersion layout)
conf.set(
ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
RandomContainerDeletionChoosingPolicy.class.getName());
containerSet = new ContainerSet(1000);
containerSet = newContainerSet();

int numContainers = 10;
for (int i = 0; i < numContainers; i++) {
Expand Down Expand Up @@ -142,7 +143,7 @@ public void testTopNOrderedChoosingPolicy(ContainerLayoutVersion layout)
conf.set(
ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
TopNOrderedContainerDeletionChoosingPolicy.class.getName());
containerSet = new ContainerSet(1000);
containerSet = newContainerSet();

int numContainers = 10;
Random random = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -151,7 +152,7 @@ public static void shutdown() throws IOException {

@BeforeEach
public void setupPaths() throws IOException {
containerSet = new ContainerSet(1000);
containerSet = newContainerSet();
volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
// Initialize volume directories.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.common.impl;

import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -66,7 +67,7 @@ private void setLayoutVersion(ContainerLayoutVersion layoutVersion) {
public void testAddGetRemoveContainer(ContainerLayoutVersion layout)
throws StorageContainerException {
setLayoutVersion(layout);
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
long containerId = 100L;
ContainerProtos.ContainerDataProto.State state = ContainerProtos
.ContainerDataProto.State.CLOSED;
Expand Down Expand Up @@ -155,7 +156,7 @@ public void testIteratorPerVolume(ContainerLayoutVersion layout)
HddsVolume vol2 = mock(HddsVolume.class);
when(vol2.getStorageID()).thenReturn("uuid-2");

ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
for (int i = 0; i < 10; i++) {
KeyValueContainerData kvData = new KeyValueContainerData(i,
layout,
Expand Down Expand Up @@ -198,7 +199,7 @@ public void iteratorIsOrderedByScanTime(ContainerLayoutVersion layout)
HddsVolume vol = mock(HddsVolume.class);
when(vol.getStorageID()).thenReturn("uuid-1");
Random random = new Random();
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
int containerCount = 50;
for (int i = 0; i < containerCount; i++) {
KeyValueContainerData kvData = new KeyValueContainerData(i,
Expand Down Expand Up @@ -296,7 +297,7 @@ private static void assertContainerIds(int startId, int count,
}

private ContainerSet createContainerSet() throws StorageContainerException {
ContainerSet containerSet = new ContainerSet(1000);
ContainerSet containerSet = newContainerSet();
for (int i = FIRST_ID; i < FIRST_ID + 10; i++) {
KeyValueContainerData kvData = new KeyValueContainerData(i,
layoutVersion,
Expand Down
Loading