Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
Expand All @@ -41,6 +42,7 @@
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
Expand Down Expand Up @@ -89,6 +91,7 @@ public class HddsVolume extends StorageVolume {
private ContainerController controller;

private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full
private Function<HddsVolume, Long> gatherContainerUsages = (K) -> 0L;

// Mentions the type of volume
private final VolumeType type = VolumeType.DATA_VOLUME;
Expand Down Expand Up @@ -405,6 +408,38 @@ public long getFreeSpaceToSpare(long volumeCapacity) {
return getDatanodeConfig().getMinFreeSpace(volumeCapacity);
}

@Override
public void setGatherContainerUsages(Function<HddsVolume, Long> gatherContainerUsages) {
this.gatherContainerUsages = gatherContainerUsages;
}

@Override
protected long containerUsedSpace() {
return gatherContainerUsages.apply(this);
}

@Override
public File getContainerDirsPath() {
if (getStorageState() != VolumeState.NORMAL) {
return null;
}
File hddsVolumeRootDir = getHddsRootDir();
//filtering storage directory
File[] storageDirs = hddsVolumeRootDir.listFiles(File::isDirectory);
if (storageDirs == null) {
LOG.error("IO error for the volume {}, directory not found", hddsVolumeRootDir);
return null;
}
File clusterIDDir = new File(hddsVolumeRootDir, getClusterID());
if (storageDirs.length == 1 && !clusterIDDir.exists()) {
// If this volume was formatted pre SCM HA, this will be the SCM ID.
// A cluster ID symlink will exist in this case only if this cluster is finalized for SCM HA.
// If the volume was formatted post SCM HA, this will be the cluster ID.
clusterIDDir = storageDirs[0];
}
return new File(clusterIDDir, Storage.STORAGE_DIR_CURRENT);
}

public void setDbVolume(DbVolume dbVolume) {
this.dbVolume = dbVolume;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
Expand Down Expand Up @@ -273,10 +274,20 @@ public void checkVolumeAsync(StorageVolume volume) {
});
}

public void startAllVolume() throws IOException {
for (Map.Entry<String, StorageVolume> entry : volumeMap.entrySet()) {
entry.getValue().start();
}
}

public void refreshAllVolumeUsage() {
volumeMap.forEach((k, v) -> v.refreshVolumeUsage());
}

public void setGatherContainerUsages(Function<HddsVolume, Long> gatherContainerUsages) {
volumeMap.forEach((k, v) -> v.setGatherContainerUsages(gatherContainerUsages));
}

/**
* Acquire Volume Set Read lock.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand Down Expand Up @@ -148,7 +150,8 @@ protected StorageVolume(Builder<?> b) throws IOException {
if (!b.failedVolume) {
StorageLocation location = StorageLocation.parse(volumeRoot);
storageDir = new File(location.getUri().getPath(), b.storageDirStr);
SpaceUsageCheckParams checkParams = getSpaceUsageCheckParams(b);
SpaceUsageCheckParams checkParams = getSpaceUsageCheckParams(b, this::getContainerDirsPath);
checkParams.setContainerUsedSpace(this::containerUsedSpace);
volumeUsage = Optional.of(new VolumeUsage(checkParams, b.conf));
this.volumeSet = b.volumeSet;
this.state = VolumeState.NOT_INITIALIZED;
Expand Down Expand Up @@ -176,13 +179,31 @@ protected StorageVolume(Builder<?> b) throws IOException {
this.storageDirStr = storageDir.getAbsolutePath();
}

protected long containerUsedSpace() {
// container used space applicable only for HddsVolume
return 0;
}

public File getContainerDirsPath() {
// container dir path applicable only for HddsVolume
return null;
}

public void setGatherContainerUsages(Function<HddsVolume, Long> gatherContainerUsages) {
// Operation only for HddsVolume which have container data
}

public void format(String cid) throws IOException {
Preconditions.checkNotNull(cid, "clusterID cannot be null while " +
"formatting Volume");
this.clusterID = cid;
initialize();
}

public void start() throws IOException {
volumeUsage.ifPresent(VolumeUsage::start);
}

/**
* Initializes the volume.
* Creates the Version file if not present,
Expand Down Expand Up @@ -723,7 +744,8 @@ public String toString() {
return getStorageDir().toString();
}

private static SpaceUsageCheckParams getSpaceUsageCheckParams(Builder b) throws IOException {
private static SpaceUsageCheckParams getSpaceUsageCheckParams(Builder b, Supplier<File> exclusionProvider)
throws IOException {
File root = new File(b.volumeRootStr);

boolean succeeded = root.isDirectory() || root.mkdirs();
Expand All @@ -738,6 +760,6 @@ private static SpaceUsageCheckParams getSpaceUsageCheckParams(Builder b) throws
usageCheckFactory = SpaceUsageCheckFactory.create(b.conf);
}

return usageCheckFactory.paramsFor(root);
return usageCheckFactory.paramsFor(root, exclusionProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public class VolumeUsage {
source = new CachingSpaceUsageSource(checkParams);
reservedInBytes = getReserved(conf, checkParams.getPath(), source.getCapacity());
Preconditions.assertTrue(reservedInBytes >= 0, reservedInBytes + " < 0");
start(); // TODO should start only on demand
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -195,6 +196,7 @@ public OzoneContainer(HddsDatanodeService hddsDatanodeService,
this.witnessedContainerMetadataStore = WitnessedContainerMetadataStoreImpl.get(conf);
containerSet = ContainerSet.newRwContainerSet(witnessedContainerMetadataStore.getContainerIdsTable(),
recoveringContainerTimeout);
volumeSet.setGatherContainerUsages(this::gatherContainerUsages);
metadataScanner = null;

metrics = ContainerMetrics.create(conf);
Expand Down Expand Up @@ -503,12 +505,14 @@ public void start(String clusterId) throws IOException {
// Do an immediate check of all volumes to ensure datanode health before
// proceeding.
volumeSet.checkAllVolumes();
volumeSet.startAllVolume();
metaVolumeSet.checkAllVolumes();
metaVolumeSet.startAllVolume();
// DB volume set may be null if dedicated DB volumes are not used.
if (dbVolumeSet != null) {
dbVolumeSet.checkAllVolumes();
dbVolumeSet.startAllVolume();
}

LOG.info("Attempting to start container services.");
startContainerScrub();

Expand Down Expand Up @@ -575,6 +579,15 @@ public ContainerSet getContainerSet() {
return containerSet;
}

public Long gatherContainerUsages(HddsVolume storageVolume) {
AtomicLong usages = new AtomicLong();
containerSet.getContainerMapIterator().forEachRemaining(e -> {
if (e.getValue().getContainerData().getVolume().getStorageID().equals(storageVolume.getStorageID())) {
usages.addAndGet(e.getValue().getContainerData().getBytesUsed());
}
});
return usages.get();
}
/**
* Returns container report.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
Expand Down Expand Up @@ -145,8 +144,10 @@ public void testContainerCloseActionWhenFull(
conf.set(HDDS_DATANODE_DIR_KEY, testDirPath);
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath);
DatanodeDetails dd = randomDatanodeDetails();
MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf,
MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), "test", conf,
null, StorageVolume.VolumeType.DATA_VOLUME, null);
volumeSet.getVolumesList().forEach(e -> e.setState(StorageVolume.VolumeState.NORMAL));
volumeSet.startAllVolume();

try {
UUID scmId = UUID.randomUUID();
Expand Down Expand Up @@ -294,7 +295,7 @@ public void testContainerCloseActionWhenVolumeFull(

HddsVolume.Builder volumeBuilder =
new HddsVolume.Builder(testDirPath).datanodeUuid(dd.getUuidString())
.conf(conf).usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
.conf(conf).usageCheckFactory(MockSpaceUsageCheckFactory.NONE).clusterID("test");
// state of cluster : available (160) > 100 ,datanode volume
// utilisation threshold not yet reached. container creates are successful.
AtomicLong usedSpace = new AtomicLong(340);
Expand All @@ -306,6 +307,8 @@ public void testContainerCloseActionWhenVolumeFull(
MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
when(volumeSet.getVolumesList())
.thenReturn(Collections.singletonList(volumeBuilder.build()));
volumeSet.getVolumesList().get(0).setState(StorageVolume.VolumeState.NORMAL);
volumeSet.getVolumesList().get(0).start();
try {
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
Expand Down Expand Up @@ -566,7 +569,7 @@ static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
OzoneConfiguration conf, TokenVerifier tokenVerifier) throws IOException {
ContainerSet containerSet = newContainerSet();
VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null,
MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
volumeSet.getVolumesList().stream().forEach(v -> {
try {
Expand All @@ -576,6 +579,7 @@ static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
throw new RuntimeException(e);
}
});
volumeSet.startAllVolume();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerType, Handler> handlers = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
Expand Down Expand Up @@ -244,6 +245,7 @@ public void testShutdown() throws Exception {
volumeBuilder.usageCheckFactory(factory);

HddsVolume volume = volumeBuilder.build();
volume.start();

assertEquals(initialUsedSpace, savedUsedSpace.get());
assertEquals(expectedUsedSpace, volume.getCurrentUsage().getUsedSpace());
Expand Down Expand Up @@ -299,6 +301,7 @@ public void testReportUsedBiggerThanActualUsed() throws IOException {
volumeBuilder.usageCheckFactory(factory);

HddsVolume volume = volumeBuilder.build();
volume.start();

SpaceUsageSource usage = volume.getCurrentUsage();
assertEquals(400, usage.getCapacity());
Expand Down Expand Up @@ -354,6 +357,7 @@ public void testOverUsedReservedSpace() throws IOException {
volumeBuilder.usageCheckFactory(factory);

HddsVolume volume = volumeBuilder.build();
volume.start();

SpaceUsageSource usage = volume.getCurrentUsage();
assertEquals(400, usage.getCapacity());
Expand Down Expand Up @@ -381,6 +385,7 @@ public void testOverUsedHddsSpace() throws IOException {
volumeBuilder.usageCheckFactory(factory);

HddsVolume volume = volumeBuilder.build();
volume.start();

SpaceUsageSource usage = volume.getCurrentUsage();
assertEquals(400, usage.getCapacity());
Expand Down Expand Up @@ -538,6 +543,25 @@ public void testDBDirFailureDetected() throws Exception {
volume.shutdown();
}

@Test
public void testGetContainerDirsPath() throws Exception {
HddsVolume volume = volumeBuilder.build();
volume.format(CLUSTER_ID);
volume.createWorkingDir(CLUSTER_ID, null);

File expectedPath = new File(new File(volume.getStorageDir(), CLUSTER_ID), Storage.STORAGE_DIR_CURRENT);
assertEquals(expectedPath, volume.getContainerDirsPath());

volume.shutdown();
}

@Test
public void testGetContainerDirsPathWhenNotFormatted() throws Exception {
HddsVolume volume = volumeBuilder.build();
assertNull(volume.getContainerDirsPath());
volume.shutdown();
}

@Test
public void testVolumeUsagesMetrics() throws Exception {
// Build a volume with mocked usage, with reserved: 100B, Min free: 10B
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private void setup() throws Exception {
clusterId, conf, null, StorageVolume.VolumeType.DATA_VOLUME, null);
createDbInstancesForTestIfNeeded(volumeSet, clusterId, clusterId, conf);
volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
volumeSet.startAllVolume();
}

@AfterEach
Expand Down Expand Up @@ -142,7 +143,7 @@ public void testBuildContainerMap(ContainerTestVersionInfo versionInfo)
keyValueContainer.create(volumeSet, volumeChoosingPolicy, clusterId);
myVolume = keyValueContainer.getContainerData().getVolume();

freeBytes = addBlocks(keyValueContainer, 2, 3);
freeBytes = addBlocks(keyValueContainer, 2, 3, 65536);

// update our expectation of volume committed space in the map
volCommitBytes = commitSpaceMap.get(getVolumeKey(myVolume)).longValue();
Expand All @@ -158,6 +159,8 @@ public void testBuildContainerMap(ContainerTestVersionInfo versionInfo)
ContainerSet containerset = ozoneContainer.getContainerSet();
assertEquals(numTestContainers, containerset.containerCount());
verifyCommittedSpace(ozoneContainer);
// container usage here, nrOfContainer * blocks * chunksPerBlock * datalen
assertEquals(10 * 2 * 3 * 65536, ozoneContainer.gatherContainerUsages(volumes.get(0)));
Set<Long> missingContainers = new HashSet<>();
for (int i = 0; i < numTestContainers; i++) {
if (i % 2 == 0) {
Expand Down Expand Up @@ -262,10 +265,9 @@ private void verifyCommittedSpace(OzoneContainer oc) {
}

private long addBlocks(KeyValueContainer container,
int blocks, int chunksPerBlock) throws Exception {
int blocks, int chunksPerBlock, int datalen) throws Exception {
String strBlock = "block";
String strChunk = "-chunkFile";
int datalen = 65536;
long usedBytes = 0;

long freeBytes = container.getContainerData().getMaxSize();
Expand Down
Loading
Loading