Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.NonNull;
import software.amazon.s3.analyticsaccelerator.common.Metrics;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
import software.amazon.s3.analyticsaccelerator.util.BlockKey;
import software.amazon.s3.analyticsaccelerator.util.MetricKey;

/**
* Represents a block of data from an object stream, identified by a {@link BlockKey} and a
Expand All @@ -42,6 +44,9 @@ public class DataBlock implements Closeable {

@Getter private final BlockKey blockKey;
@Getter private final long generation;

private final BlobStoreIndexCache indexCache;
private final Metrics aggregatingMetrics;
/**
* A synchronization aid that allows threads to wait until the block's data is available.
*
Expand All @@ -65,8 +70,14 @@ public class DataBlock implements Closeable {
*
* @param blockKey the key identifying the object and byte range
* @param generation the generation number of this block in a sequential read pattern
* @param indexCache blobstore index cache
* @param aggregatingMetrics blobstore metrics
*/
public DataBlock(@NonNull BlockKey blockKey, long generation) {
public DataBlock(
@NonNull BlockKey blockKey,
long generation,
@NonNull BlobStoreIndexCache indexCache,
@NonNull Metrics aggregatingMetrics) {
long start = blockKey.getRange().getStart();
long end = blockKey.getRange().getEnd();
Preconditions.checkArgument(
Expand All @@ -76,6 +87,8 @@ public DataBlock(@NonNull BlockKey blockKey, long generation) {

this.blockKey = blockKey;
this.generation = generation;
this.indexCache = indexCache;
this.aggregatingMetrics = aggregatingMetrics;
}

/**
Expand All @@ -88,6 +101,7 @@ public DataBlock(@NonNull BlockKey blockKey, long generation) {
public int read(long pos) throws IOException {
Preconditions.checkArgument(0 <= pos, "`pos` must not be negative");
awaitData();
indexCache.recordAccess(this.blockKey);
int contentOffset = posToOffset(pos);
return Byte.toUnsignedInt(this.data[contentOffset]);
}
Expand All @@ -111,6 +125,7 @@ public int read(byte @NonNull [] buf, int off, int len, long pos) throws IOExcep

awaitData();

indexCache.recordAccess(this.blockKey);
int contentOffset = posToOffset(pos);
int available = this.data.length - contentOffset;
int bytesToCopy = Math.min(len, available);
Expand All @@ -120,6 +135,15 @@ public int read(byte @NonNull [] buf, int off, int len, long pos) throws IOExcep
return bytesToCopy;
}

/**
* Checks if data of the block is ready
*
* @return true if data is ready, false otherwise
*/
public boolean isDataReady() {
return dataReadyLatch.getCount() == 0;
}

/**
* Converts an absolute object position to an offset within this block's data.
*
Expand All @@ -138,6 +162,8 @@ private int posToOffset(long pos) {
*/
public void setData(final byte[] data) {
this.data = data;
this.aggregatingMetrics.add(MetricKey.MEMORY_USAGE, data.length);
Copy link
Contributor

@rajdchak rajdchak Jun 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putting into the index cache and adding the metrics should be done before dataReadyLatch.countDown(), as per the current logic we do everything inside the data future

            this.source.thenApply(
                objectContent -> {
                  try {
                    byte[] bytes =
                        StreamUtils.toByteArray(
                            objectContent,
                            this.blockKey.getObjectKey(),
                            this.blockKey.getRange(),
                            this.readTimeout);
                    int blockRange = blockKey.getRange().getLength();
                    this.aggregatingMetrics.add(MetricKey.MEMORY_USAGE, blockRange);
                    this.indexCache.put(blockKey, blockRange);
                    return bytes;
                  } catch (IOException | TimeoutException e) {
                    throw new RuntimeException(
                        "Error while converting InputStream to byte array", e);
                  }

and the read starts after we join on the data.

This is likely to cause issue in the cleanup logic where we are doing

if (block.isDataReady() && !indexCache.contains(block.getBlockKey())) {
        try {
          iterator.remove();
          BlockKey blockKey = block.getBlockKey();
          aggregatingMetrics.reduce(MetricKey.MEMORY_USAGE, blockKey.getRange().getLength());
          LOG.debug(
              "Removed block with key {}-{}-{} from block store during cleanup",
              blockKey.getObjectKey().getS3URI(),
              blockKey.getRange().getStart(),
              blockKey.getRange().getEnd());
        } catch (Exception e) {
          LOG.error("Error in removing block {}", e.getMessage());
        }
      }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. Addressed in next rev

this.indexCache.put(this.blockKey, this.blockKey.getRange().getLength());
dataReadyLatch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,9 @@ public class DataBlockManager implements Closeable {
private final Telemetry telemetry;

private final PhysicalIOConfiguration configuration;

@SuppressFBWarnings(
value = "URF_UNREAD_FIELD",
justification = "Field is injected and may be used in the future")
private final Metrics aggregatingMetrics;

@SuppressFBWarnings(
value = "URF_UNREAD_FIELD",
justification = "Field is injected and may be used in the future")
private final BlobStoreIndexCache indexCache;

private final StreamReader streamReader;

private final DataBlockStore blockStore;

/**
Expand Down Expand Up @@ -89,7 +79,7 @@ public DataBlockManager(
this.indexCache = indexCache;
this.streamReader =
new StreamReader(objectClient, objectKey, threadPool, openStreamInformation);
this.blockStore = new DataBlockStore(configuration);
this.blockStore = new DataBlockStore(indexCache, aggregatingMetrics, configuration);
}

/**
Expand All @@ -100,9 +90,6 @@ public DataBlockManager(
*/
public synchronized void makePositionAvailable(long pos, ReadMode readMode) {
Preconditions.checkArgument(0 <= pos, "`pos` must not be negative");

if (getBlock(pos).isPresent()) return;

makeRangeAvailable(pos, 1, readMode);
}

Expand Down Expand Up @@ -134,7 +121,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
blockIndex * configuration.getReadBufferSize(),
Math.min((blockIndex + 1) * configuration.getReadBufferSize(), getLastObjectByte()));
BlockKey blockKey = new BlockKey(objectKey, range);
DataBlock block = new DataBlock(blockKey, 0);
DataBlock block = new DataBlock(blockKey, 0, this.indexCache, this.aggregatingMetrics);
blockStore.add(block);
blocksToFill.add(block);
}
Expand Down Expand Up @@ -191,7 +178,9 @@ public boolean isBlockStoreEmpty() {
}

/** cleans data from memory */
public void cleanUp() {}
public void cleanUp() {
this.blockStore.cleanUp();
}

/** Closes the {@link DataBlockManager} and frees up all resources it holds */
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,31 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.common.Metrics;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration;
import software.amazon.s3.analyticsaccelerator.util.BlockKey;
import software.amazon.s3.analyticsaccelerator.util.MetricKey;

/**
* A container that manages a collection of {@link DataBlock} instances. Each {@code DataBlock}
* corresponds to a fixed-size chunk of data based on the configured block size. This class provides
* methods to retrieve, add, and track missing blocks within a specified data range.
*/
public class DataBlockStore implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(DataBlockStore.class);

private final BlobStoreIndexCache indexCache;
private final Metrics aggregatingMetrics;
private final PhysicalIOConfiguration configuration;
// It is safe to use Integer as key since maximum single file size is 5TB in S3
// and if we assume that block size will be 8KB, total number of blocks is within range
Expand All @@ -44,8 +55,15 @@ public class DataBlockStore implements Closeable {
*
* @param configuration the {@link PhysicalIOConfiguration} used to define block size and other
* I/O settings
* @param indexCache blobstore index cache
* @param aggregatingMetrics blobstore metrics
*/
public DataBlockStore(@NonNull PhysicalIOConfiguration configuration) {
public DataBlockStore(
@NonNull BlobStoreIndexCache indexCache,
@NonNull Metrics aggregatingMetrics,
@NonNull PhysicalIOConfiguration configuration) {
this.indexCache = indexCache;
this.aggregatingMetrics = aggregatingMetrics;
this.configuration = configuration;
blocks = new ConcurrentHashMap<>();
}
Expand All @@ -69,6 +87,7 @@ public Optional<DataBlock> getBlock(long pos) {
* {@link Optional}
*/
public Optional<DataBlock> getBlockByIndex(int index) {
Preconditions.checkArgument(0 <= index, "`index` must not be negative");
return Optional.ofNullable(blocks.get(index));
}

Expand Down Expand Up @@ -98,11 +117,44 @@ private List<Integer> getMissingBlockIndexesInRange(int startIndex, int endIndex
List<Integer> missingBlockIndexes = new ArrayList<>();

for (int i = startIndex; i <= endIndex; i++) {
if (!blocks.containsKey(i)) missingBlockIndexes.add(i);
if (!blocks.containsKey(i)) {
missingBlockIndexes.add(i);
aggregatingMetrics.add(MetricKey.CACHE_MISS, 1L);
} else {
aggregatingMetrics.add(MetricKey.CACHE_HIT, 1L);
}
}
return missingBlockIndexes;
}

/**
* Cleans data from memory by removing blocks that are no longer needed. This method iterates
* through all blocks in memory and removes those that: 1. Have their data loaded AND 2. Are not
* present in the index cache For each removed block, the method: - Removes the block from the
* internal block store - Updates memory usage metrics
*/
public void cleanUp() {
Iterator<Map.Entry<Integer, DataBlock>> iterator = blocks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, DataBlock> entry = iterator.next();
DataBlock block = entry.getValue();
BlockKey blockKey = block.getBlockKey();
if (block.isDataReady() && !indexCache.contains(blockKey)) {
try {
iterator.remove();
aggregatingMetrics.reduce(MetricKey.MEMORY_USAGE, blockKey.getRange().getLength());
LOG.debug(
"Removed block with key {}-{}-{} from block store during cleanup",
blockKey.getObjectKey().getS3URI(),
blockKey.getRange().getStart(),
blockKey.getRange().getEnd());
} catch (Exception e) {
LOG.error("Error in removing block {}", e.getMessage());
}
}
}
}

private int getBlockIndex(DataBlock block) {
return getPositionIndex(block.getBlockKey().getRange().getStart());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package software.amazon.s3.analyticsaccelerator.io.physical.data;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;

import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import software.amazon.s3.analyticsaccelerator.common.Metrics;
import software.amazon.s3.analyticsaccelerator.request.Range;
import software.amazon.s3.analyticsaccelerator.util.BlockKey;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
Expand All @@ -36,7 +38,8 @@ public void testValidConstructor() {
Range range = new Range(0, 10);
BlockKey blockKey = new BlockKey(TEST_OBJECT_KEY, range);

DataBlock block = new DataBlock(blockKey, 2);
DataBlock block =
new DataBlock(blockKey, 2, mock(BlobStoreIndexCache.class), mock(Metrics.class));

assertEquals(block.getBlockKey(), blockKey);
assertEquals(block.getGeneration(), 2);
Expand All @@ -47,6 +50,8 @@ void testNegativeGenerationThrows() {
Range range = new Range(0, 10);
BlockKey blockKey = new BlockKey(TEST_OBJECT_KEY, range);

assertThrows(IllegalArgumentException.class, () -> new DataBlock(blockKey, -1));
assertThrows(
IllegalArgumentException.class,
() -> new DataBlock(blockKey, -1, mock(BlobStoreIndexCache.class), mock(Metrics.class)));
}
}