Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ private OzoneConsts() {
public static final String CHUNKS_PATH = "chunksPath";
public static final String CONTAINER_DB_TYPE = "containerDBType";
public static final String CHECKSUM = "checksum";
public static final String DATA_SCAN_TIMESTAMP = "dataScanTimestamp";
public static final String ORIGIN_PIPELINE_ID = "originPipelineId";
public static final String ORIGIN_NODE_ID = "originNodeId";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
Expand All @@ -39,6 +40,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.CHECKSUM;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ID;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_TYPE;
import static org.apache.hadoop.ozone.OzoneConsts.DATA_SCAN_TIMESTAMP;
import static org.apache.hadoop.ozone.OzoneConsts.LAYOUTVERSION;
import static org.apache.hadoop.ozone.OzoneConsts.MAX_SIZE;
import static org.apache.hadoop.ozone.OzoneConsts.METADATA;
Expand Down Expand Up @@ -89,7 +91,9 @@ public abstract class ContainerData {
private HddsVolume volume;

private String checksum;
public static final Charset CHARSET_ENCODING = Charset.forName("UTF-8");
private Long dataScanTimestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a Java Optional. Then instead of null we can check for Optional.absent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can you add a comment stating what the number means? Is it Unix epoch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. I will address these and update the pull request in the new repo.


public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8;
private static final String DUMMY_CHECKSUM = new String(new byte[64],
CHARSET_ENCODING);

Expand All @@ -103,6 +107,7 @@ public abstract class ContainerData {
METADATA,
MAX_SIZE,
CHECKSUM,
DATA_SCAN_TIMESTAMP,
ORIGIN_PIPELINE_ID,
ORIGIN_NODE_ID));

Expand Down Expand Up @@ -506,6 +511,13 @@ public String getChecksum() {
return this.checksum;
}

public long getDataScanTimestamp() {
return dataScanTimestamp != null ? dataScanTimestamp : 0;
}

public void setDataScanTimestamp(long timestamp) {
this.dataScanTimestamp = timestamp;
}

/**
* Returns the origin pipeline Id of this container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.yaml.snakeyaml.introspector.PropertyUtils;
import org.yaml.snakeyaml.nodes.MappingNode;
import org.yaml.snakeyaml.nodes.Node;
import org.yaml.snakeyaml.nodes.NodeTuple;
import org.yaml.snakeyaml.nodes.ScalarNode;
import org.yaml.snakeyaml.nodes.Tag;
import org.yaml.snakeyaml.representer.Representer;
Expand Down Expand Up @@ -217,6 +218,17 @@ protected Set<Property> getProperties(Class<? extends Object> type)
}
return filtered;
}

/**
* Omit properties with null value.
*/
@Override
protected NodeTuple representJavaBeanProperty(
Object bean, Property property, Object value, Tag tag) {
return value == null
? null
: super.representJavaBeanProperty(bean, property, value, tag);
}
}

/**
Expand Down Expand Up @@ -260,6 +272,10 @@ public Object construct(Node node) {
Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
kvData.setMetadata(meta);
kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM));
Long timestamp = (Long) nodes.get(OzoneConsts.DATA_SCAN_TIMESTAMP);
if (timestamp != null) {
kvData.setDataScanTimestamp(timestamp);
}
String state = (String) nodes.get(OzoneConsts.STATE);
kvData
.setState(ContainerProtos.ContainerDataProto.State.valueOf(state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public Iterator<Container<?>> getContainerIterator() {

/**
* Return an iterator of containers associated with the specified volume.
* The iterator is sorted by last data scan timestamp in increasing order.
*
* @param volume the HDDS volume which should be used to filter containers
* @return {@literal Iterator<Container<?>>}
Expand All @@ -143,6 +144,7 @@ public Iterator<Container<?>> getContainerIterator(HddsVolume volume) {
return containerMap.values().stream()
.filter(x -> volumeUuid.equals(x.getContainerData().getVolume()
.getStorageID()))
.sorted(Container.DATA_SCAN_ORDER)
.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Comparator;
import java.util.Map;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand All @@ -41,6 +42,10 @@
*/
public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {

Comparator<Container<?>> DATA_SCAN_ORDER = Comparator.<Container<?>>
comparingLong(c -> c.getContainerData().getDataScanTimestamp())
.thenComparingLong(c -> c.getContainerData().getContainerID());

/**
* Creates a container.
*
Expand All @@ -66,6 +71,9 @@ void create(VolumeSet volumeSet, VolumeChoosingPolicy volumeChoosingPolicy,
void update(Map<String, String> metaData, boolean forceUpdate)
throws StorageContainerException;

void updateDataScanTimestamp(long timestamp)
throws StorageContainerException;

/**
* Get metadata about the container.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,17 @@ public void close() throws StorageContainerException {
containerData.getBlockCommitSequenceId());
}

@Override
public void updateDataScanTimestamp(long timestamp)
throws StorageContainerException {
writeLock();
try {
updateContainerData(() -> containerData.setDataScanTimestamp(timestamp));
} finally {
writeUnlock();
}
}

/**
*
* Must be invoked with the writeLock held.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,10 @@ public Iterator<Container<?>> getContainers(HddsVolume volume) {
return containerSet.getContainerIterator(volume);
}

void updateDataScanTimestamp(long containerId, long timestamp)
throws IOException {
Container container = containerSet.getContainer(containerId);
container.updateDataScanTimestamp(timestamp);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.hadoop.ozone.container.ozoneimpl;

import java.io.IOException;
import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.slf4j.Logger;
Expand Down Expand Up @@ -95,14 +97,19 @@ public void runIteration() {
while (!stopping && itr.hasNext()) {
Container c = itr.next();
if (c.shouldScanData()) {
ContainerData containerData = c.getContainerData();
long containerId = containerData.getContainerID();
try {
logScanStart(containerData);
if (!c.scanData(throttler, canceler)) {
metrics.incNumUnHealthyContainers();
controller.markContainerUnhealthy(
c.getContainerData().getContainerID());
controller.markContainerUnhealthy(containerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also call logScanCompleted and updateDataScanTimestamp in the failure path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid this for two reasons:

  1. The full scan includes a scan of the metadata, too, and the failure may be due to metadata problem. Eg. if the .container file is missing or invalid etc. In that case we cannot update the timestamp in the file.
  2. Unhealthy containers are skipped during further iterations, so the timestamp would not make much difference anyway.

} else {
long now = System.currentTimeMillis();
logScanCompleted(containerData, now);
controller.updateDataScanTimestamp(containerId, now);
}
} catch (IOException ex) {
long containerId = c.getContainerData().getContainerID();
LOG.warn("Unexpected exception while scanning container "
+ containerId, ex);
} finally {
Expand Down Expand Up @@ -135,6 +142,23 @@ public void runIteration() {
}
}

private static void logScanStart(ContainerData containerData) {
if (LOG.isDebugEnabled()) {
long scanTimestamp = containerData.getDataScanTimestamp();
Object lastScanTime = scanTimestamp <= 0 ? "never"
: ("at " + Instant.ofEpochMilli(scanTimestamp));
LOG.debug("Scanning container {}, last scanned {}",
containerData.getContainerID(), lastScanTime);
}
}

private static void logScanCompleted(ContainerData containerData, long now) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completed scan of container {} at {}",
containerData.getContainerID(), Instant.ofEpochMilli(now));
}
}

public synchronized void shutdown() {
this.stopping = true;
this.canceler.cancel("ContainerDataScanner("+volume+") is shutting down");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -178,6 +182,41 @@ public void testIteratorPerVolume() throws StorageContainerException {
assertEquals(5, count2);
}

@Test
public void iteratorIsOrderedByScanTime() throws StorageContainerException {
HddsVolume vol = Mockito.mock(HddsVolume.class);
Mockito.when(vol.getStorageID()).thenReturn("uuid-1");
Random random = new Random();
ContainerSet containerSet = new ContainerSet();
for (int i=0; i<10; i++) {
KeyValueContainerData kvData = new KeyValueContainerData(i,
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
UUID.randomUUID().toString());
kvData.setDataScanTimestamp(Math.abs(random.nextLong()));
kvData.setVolume(vol);
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
KeyValueContainer kv = new KeyValueContainer(kvData, new
OzoneConfiguration());
containerSet.addContainer(kv);
}
List<Container<?>> expectedOrder =
new ArrayList<>(containerSet.getContainerMap().values());
expectedOrder.sort(Container.DATA_SCAN_ORDER);

List<Container<?>> containers = new ArrayList<>();
containerSet.getContainerIterator(vol).forEachRemaining(containers::add);

if (!Objects.equals(expectedOrder, containers)) {
Function<Container, ?> debug = c ->
c.getContainerData().getContainerID()
+ " " + c.getContainerData().getDataScanTimestamp();
assertEquals(
expectedOrder.stream().map(debug).collect(toList()),
containers.stream().map(debug).collect(toList())
);
}
}

@Test
public void testGetContainerReport() throws IOException {

Expand Down
Loading