diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInput.java similarity index 68% rename from server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java rename to server/src/main/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInput.java index 94c9cf7edfe7f..3cd1e9eae87c2 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInput.java @@ -18,25 +18,28 @@ import java.io.EOFException; import java.io.IOException; import java.lang.ref.Cleaner; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Class acts as a virtual file mechanism for the accessed files and only fetches the required blocks of the actual file. - * Original/Main IndexInput file will be split using {@link OnDemandBlockIndexInput.Builder#DEFAULT_BLOCK_SIZE_SHIFT}. This class has all the - * logic of how and when to fetch specific block of the main file. Each block is identified by {@link OnDemandBlockIndexInput#currentBlockId}. + * Original/Main IndexInput file will be split using {@link AbstractBlockIndexInput.Builder#DEFAULT_BLOCK_SIZE_SHIFT}. This class has all the + * logic of how and when to fetch specific block of the main file. Each block is identified by {@link AbstractBlockIndexInput#currentBlockId}. *
* This class delegate the responsibility of actually fetching the block when demanded to its subclasses using - * {@link OnDemandBlockIndexInput#fetchBlock(int)}. + * {@link AbstractBlockIndexInput#fetchBlock(int)}. *

* Like {@link IndexInput}, this class may only be used from one thread as it is not thread safe. * However, a cleaning action may run from another thread triggered by the {@link Cleaner}, but - * this is okay because at that point the {@link OnDemandBlockIndexInput} instance is phantom + * this is okay because at that point the {@link AbstractBlockIndexInput} instance is phantom * reachable and therefore not able to be accessed by any other thread. * * @opensearch.internal */ -public abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput { - private static final Logger logger = LogManager.getLogger(OnDemandBlockIndexInput.class); +public abstract class AbstractBlockIndexInput extends IndexInput implements RandomAccessInput { + private static final Logger logger = LogManager.getLogger(AbstractBlockIndexInput.class); public static final String CLEANER_THREAD_NAME_PREFIX = "index-input-cleaner"; @@ -47,7 +50,7 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand * the cleaning action is a no-op. For an open IndexInput, the close action * will decrement a reference count. */ - private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX)); + protected static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX)); /** * Start offset of the virtual file : non-zero in the slice case @@ -75,11 +78,12 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand /** * ID of the current block */ - private int currentBlockId; + protected int currentBlockId; private final BlockHolder blockHolder = new BlockHolder(); + protected final Cleaner.Cleanable cleanable; - OnDemandBlockIndexInput(Builder builder) { + protected AbstractBlockIndexInput(Builder builder) { super(builder.resourceDescription); this.isClone = builder.isClone; this.offset = builder.offset; @@ -87,13 +91,13 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand this.blockSizeShift = builder.blockSizeShift; this.blockSize = builder.blockSize; this.blockMask = builder.blockMask; - CLEANER.register(this, blockHolder); + this.cleanable = CLEANER.register(this, blockHolder); } /** * Builds the actual sliced IndexInput (may apply extra offset in subclasses). **/ - protected abstract OnDemandBlockIndexInput buildSlice(String sliceDescription, long offset, long length); + protected abstract AbstractBlockIndexInput buildSlice(String sliceDescription, long offset, long length); /** * Given a blockId, fetch it's IndexInput which might be partial/split/cloned one @@ -103,7 +107,7 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand protected abstract IndexInput fetchBlock(int blockId) throws IOException; @Override - public abstract OnDemandBlockIndexInput clone(); + public abstract AbstractBlockIndexInput clone(); @Override public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { @@ -302,6 +306,107 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException { } + /** + * Utility method to get the blockSize given blockSizeShift. + * @param blockSizeShift blockSizeShift used to calculate blockSize. + * @return returns blockSize + */ + public static int getBlockSize(int blockSizeShift) { + return 1 << blockSizeShift; + } + + /** + * Utility method to get the blockId corresponding to the file offset passed. + * @param pos file offset whose blockId is requested. + * @param blockSizeShift blockSizeShift used to calculate blockSize. + * @return blockId for the given pos. + */ + public static int getBlock(long pos, int blockSizeShift) { + return (int) (pos >>> blockSizeShift); + } + + /** + * Utility method to convert file offset to block level offset. + * @param pos fileOffset whose block offset is requested. + * @param blockSizeShift blockSizeShift used to calculate blockSize. + * @return returns block offset for the given pos. + */ + public static long getBlockOffset(long pos, int blockSizeShift) { + return (long) (pos & (getBlockSize(blockSizeShift) - 1)); + } + + /** + * Utility method to get the starting file offset of the given block. + * @param blockId blockId whose start offset is requested. + * @param blockSizeShift blockSizeShift used to calculate blockSize. + * @return returns the file offset corresponding to the start of the block. + */ + public static long getBlockStart(int blockId, int blockSizeShift) { + return (long) blockId << blockSizeShift; + } + + /** + * Utility method to get the number of blocks in a file. + * @param fileSize fileSize of the original file. + * @param blockSizeShift blockSizeShift used to calculate blockSize. + * @return returns the number of blocks in the file. + */ + public static int getNumberOfBlocks(long fileSize, int blockSizeShift) { + return (int) getBlock(fileSize - 1, blockSizeShift) + 1; + } + + /** + * Utility method get the size of the given blockId. + * @param blockId blockId whose size is requested + * @param blockSizeShift blockSizeShift used to calculate blockSize. + * @param fileSize fileSize of the original file. + * @return returns the size of the block whose blockId is passed. + */ + public static long getActualBlockSize(int blockId, int blockSizeShift, long fileSize) { + assert blockId >= 0 : "blockId cannot be negative"; + return (blockId != getBlock(fileSize - 1, blockSizeShift)) + ? getBlockSize(blockSizeShift) + : getBlockOffset(fileSize - 1, blockSizeShift) + 1; + } + + /** + * Utility method to a list of blockIds for a given fileSize. + * @param fileSize size of the file for which blockIds are requested. + * @param blockSizeShift blockSizeShift (used to calculate blockSize) used to create blocks. + * @return returns a list of integers representing blockIds. + */ + public static List getAllBlockIdsForFile(long fileSize, int blockSizeShift) { + return IntStream.rangeClosed(0, getNumberOfBlocks(fileSize, blockSizeShift) - 1).boxed().collect(Collectors.toList()); + } + + /** + * Utility method to validate if a given fileName is a blockFileName. + * @param fileName fileName to check + * @return returns true if the passed fileName is a valid block file name. + */ + public static boolean isBlockFilename(String fileName) { + return fileName.contains("_block_"); + } + + /** + * Utility method to generate block file name for a given fileName and blockId as per naming convention. + * @param fileName fileName whose block file name is required + * @param blockId blockId of the file whose block file name is required + * @return returns the blockFileName + */ + public static String getBlockFileName(String fileName, int blockId) { + return fileName + "_block_" + blockId; + } + + /** + * Utility method to get the original file name given the block file name. . + * @param blockFileName name of the block file whose original file name is required. + * @return returns the original file name, No op if blockFileName is not a valid name for a block file. + */ + public static String getFileNameFromBlockFileName(String blockFileName) { + return blockFileName.contains("_block_") ? blockFileName.substring(0, blockFileName.indexOf("_block_")) : blockFileName; + } + /** * Seek to a block position, download the block if it's necessary * NOTE: the pos should be an adjusted position for slices @@ -341,7 +446,7 @@ private void demandBlock(int blockId) throws IOException { currentBlockId = blockId; } - protected void cloneBlock(OnDemandBlockIndexInput other) { + protected void cloneBlock(AbstractBlockIndexInput other) { if (other.blockHolder.block != null) { this.blockHolder.set(other.blockHolder.block.clone()); this.currentBlockId = other.currentBlockId; @@ -373,64 +478,69 @@ public static Builder builder() { } /** - * Builder for {@link OnDemandBlockIndexInput}. The default block size is 8MiB + * Builder for {@link AbstractBlockIndexInput}. The default block size is 8MiB * (see {@link Builder#DEFAULT_BLOCK_SIZE_SHIFT}). */ - public static class Builder { + public static class Builder> { // Block size shift (default value is 23 == 2^23 == 8MiB) public static final int DEFAULT_BLOCK_SIZE_SHIFT = 23; public static final int DEFAULT_BLOCK_SIZE = 1 << DEFAULT_BLOCK_SIZE_SHIFT;; - private String resourceDescription; - private boolean isClone; - private long offset; - private long length; - private int blockSizeShift = DEFAULT_BLOCK_SIZE_SHIFT; - private int blockSize = 1 << blockSizeShift; - private int blockMask = blockSize - 1; + protected String resourceDescription; + protected boolean isClone; + protected long offset; + protected long length; + protected int blockSizeShift = DEFAULT_BLOCK_SIZE_SHIFT; + protected int blockSize = 1 << blockSizeShift; + protected int blockMask = blockSize - 1; - private Builder() {} + protected Builder() {} + + @SuppressWarnings("unchecked") + protected final T self() { + return (T) this; + } - public Builder resourceDescription(String resourceDescription) { - this.resourceDescription = resourceDescription; - return this; + public T resourceDescription(String resourceDescription) { + this.resourceDescription = Objects.requireNonNull(resourceDescription, "Resource description cannot be null"); + return self(); } - public Builder isClone(boolean clone) { - isClone = clone; - return this; + public T isClone(boolean clone) { + this.isClone = clone; + return self(); } - public Builder offset(long offset) { + public T offset(long offset) { this.offset = offset; - return this; + return self(); } - public Builder length(long length) { + public T length(long length) { this.length = length; - return this; + return self(); } - public Builder blockSizeShift(int blockSizeShift) { + public T blockSizeShift(int blockSizeShift) { assert blockSizeShift < 31 : "blockSizeShift must be < 31"; this.blockSizeShift = blockSizeShift; this.blockSize = 1 << blockSizeShift; this.blockMask = blockSize - 1; - return this; + return self(); } } /** * Simple class to hold the currently open IndexInput backing an instance - * of an {@link OnDemandBlockIndexInput}. Lucene may clone one of these + * of an {@link AbstractBlockIndexInput}. Lucene may clone one of these * instances, and per the contract[1], the clones will never be closed. * However, closing the instances is critical for our reference counting. * Therefore, we are using the {@link Cleaner} mechanism from the JDK to * close these clones when they become phantom reachable. The clean action - * must not hold a reference to the {@link OnDemandBlockIndexInput} itself + * must not hold a reference to the {@link AbstractBlockIndexInput} itself * (otherwise it would never become phantom reachable!) so we need a wrapper * instance to hold the current underlying IndexInput, while allowing it to - * be changed out with different instances as {@link OnDemandBlockIndexInput} + * be changed out with different instances as {@link AbstractBlockIndexInput} * reads through the data. *

* This class implements {@link Runnable} so that it can be passed directly diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index 374ddd5add696..7adf05c03989b 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -21,13 +21,13 @@ import java.util.List; /** - * This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files. + * This is an implementation of {@link AbstractBlockIndexInput} where this class provides the main IndexInput using shard snapshot files. *
* This class rely on {@link TransferManager} to really fetch the snapshot files from the remote blob store and maybe cache them * * @opensearch.internal */ -public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput { +public class OnDemandBlockSnapshotIndexInput extends AbstractBlockIndexInput { private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class); /** * Where this class fetches IndexInput parts from @@ -89,7 +89,7 @@ public OnDemandBlockSnapshotIndexInput( TransferManager transferManager ) { this( - OnDemandBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length), + AbstractBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length), fileInfo, directory, transferManager @@ -97,7 +97,7 @@ public OnDemandBlockSnapshotIndexInput( } protected OnDemandBlockSnapshotIndexInput( - OnDemandBlockIndexInput.Builder builder, + AbstractBlockIndexInput.Builder builder, FileInfo fileInfo, FSDirectory directory, TransferManager transferManager @@ -122,7 +122,7 @@ protected OnDemandBlockSnapshotIndexInput( @Override protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, long offset, long length) { return new OnDemandBlockSnapshotIndexInput( - OnDemandBlockIndexInput.builder() + AbstractBlockIndexInput.builder() .blockSizeShift(blockSizeShift) .isClone(true) .offset(this.offset + offset) @@ -137,10 +137,10 @@ protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, lo @Override protected IndexInput fetchBlock(int blockId) throws IOException { logger.trace("fetchBlock called with blockId -> {}", blockId); - final String blockFileName = fileName + "_block_" + blockId; + final String blockFileName = getBlockFileName(fileName, blockId); final long blockStart = getBlockStart(blockId); - final long blockEnd = blockStart + getActualBlockSize(blockId); + final long blockEnd = blockStart + getActualBlockSize(blockId, blockSizeShift, originalFileSize); logger.trace( "File: {} , Block File: {} , BlockStart: {} , BlockEnd: {} , OriginalFileSize: {}", fileName, @@ -196,8 +196,4 @@ public OnDemandBlockSnapshotIndexInput clone() { clone.cloneBlock(this); return clone; } - - protected long getActualBlockSize(int blockId) { - return (blockId != getBlock(originalFileSize - 1)) ? blockSize : getBlockOffset(originalFileSize - 1) + 1; - } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInputLifecycleTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInputLifecycleTests.java similarity index 79% rename from server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInputLifecycleTests.java rename to server/src/test/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInputLifecycleTests.java index b6e8c9c1b536a..1df325d3e3ad3 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInputLifecycleTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInputLifecycleTests.java @@ -24,12 +24,12 @@ import static org.hamcrest.Matchers.hasSize; /** - * Unit test to ensure that {@link OnDemandBlockIndexInput} properly closes + * Unit test to ensure that {@link AbstractBlockIndexInput} properly closes * all of its backing IndexInput instances, as the reference counting logic * relies on this behavior. */ @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) -public class OnDemandBlockIndexInputLifecycleTests extends OpenSearchTestCase { +public class AbstractBlockIndexInputLifecycleTests extends OpenSearchTestCase { private static final int ONE_MB_SHIFT = 20; private static final int ONE_MB = 1 << ONE_MB_SHIFT; private static final int TWO_MB = ONE_MB * 2; @@ -47,13 +47,13 @@ public void tearDown() throws Exception { } public void testClose() throws IOException { - try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + try (AbstractBlockIndexInput indexInput = createTestAbstractBlockIndexInput()) { indexInput.seek(0); } } public void testCloseWhenSeekingMultipleChunks() throws IOException { - try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + try (AbstractBlockIndexInput indexInput = createTestAbstractBlockIndexInput()) { indexInput.seek(0); indexInput.seek(ONE_MB + 1); } @@ -61,7 +61,7 @@ public void testCloseWhenSeekingMultipleChunks() throws IOException { } public void testUnclosedCloneIsClosed() throws IOException { - try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + try (AbstractBlockIndexInput indexInput = createTestAbstractBlockIndexInput()) { indexInput.seek(0); // Clone is abandoned without closing @@ -70,7 +70,7 @@ public void testUnclosedCloneIsClosed() throws IOException { } public void testUnclosedSliceIsClosed() throws IOException { - try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + try (AbstractBlockIndexInput indexInput = createTestAbstractBlockIndexInput()) { indexInput.seek(0); // Clone is abandoned without closing @@ -78,8 +78,8 @@ public void testUnclosedSliceIsClosed() throws IOException { } } - private OnDemandBlockIndexInput createTestOnDemandBlockIndexInput() { - return new TestOnDemandBlockIndexInput(this::createCloseTrackingIndexInput, false); + private AbstractBlockIndexInput createTestAbstractBlockIndexInput() { + return new TestAbstractBlockIndexInput(this::createCloseTrackingIndexInput, false); } private IndexInput createCloseTrackingIndexInput() { @@ -89,27 +89,27 @@ private IndexInput createCloseTrackingIndexInput() { } /** - * Concrete implementation of {@link OnDemandBlockIndexInput} that creates + * Concrete implementation of {@link AbstractBlockIndexInput} that creates * {@link CloseTrackingIndexInput} index inputs when it needs to fetch a * new block. */ - private static class TestOnDemandBlockIndexInput extends OnDemandBlockIndexInput { + private static class TestAbstractBlockIndexInput extends AbstractBlockIndexInput { private final Supplier indexInputSupplier; - TestOnDemandBlockIndexInput(Supplier indexInputSupplier, boolean isClone) { + TestAbstractBlockIndexInput(Supplier indexInputSupplier, boolean isClone) { super( builder().blockSizeShift(ONE_MB_SHIFT) .offset(0) .length(TWO_MB) .isClone(isClone) - .resourceDescription(TestOnDemandBlockIndexInput.class.getName()) + .resourceDescription(TestAbstractBlockIndexInput.class.getName()) ); this.indexInputSupplier = indexInputSupplier; } @Override - protected OnDemandBlockIndexInput buildSlice(String sliceDescription, long offset, long length) { - return new TestOnDemandBlockIndexInput(this.indexInputSupplier, true); + protected AbstractBlockIndexInput buildSlice(String sliceDescription, long offset, long length) { + return new TestAbstractBlockIndexInput(this.indexInputSupplier, true); } @Override @@ -118,8 +118,8 @@ protected IndexInput fetchBlock(int blockId) throws IOException { } @Override - public OnDemandBlockIndexInput clone() { - return new TestOnDemandBlockIndexInput(this.indexInputSupplier, true); + public AbstractBlockIndexInput clone() { + return new TestAbstractBlockIndexInput(this.indexInputSupplier, true); } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInputTests.java new file mode 100644 index 0000000000000..0696c91ed4877 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/file/AbstractBlockIndexInputTests.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.file; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Assert; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Unit tests for {@link AbstractBlockIndexInput} covering all static methods, + * builder validation, and core functionality. + */ +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class AbstractBlockIndexInputTests extends OpenSearchTestCase { + private static final int ONE_MB_SHIFT = 20; + private static final int ONE_MB = 1 << ONE_MB_SHIFT; + private static final int TWO_MB = ONE_MB * 2; + + public void testBuilderDefaults() { + AbstractBlockIndexInput.Builder builder = AbstractBlockIndexInput.builder(); + assertEquals(builder.blockSizeShift, AbstractBlockIndexInput.Builder.DEFAULT_BLOCK_SIZE_SHIFT); + assertEquals(builder.blockSize, AbstractBlockIndexInput.Builder.DEFAULT_BLOCK_SIZE); + } + + public void testBuilderBlockSizeShiftValidation() { + expectThrows(AssertionError.class, () -> { AbstractBlockIndexInput.builder().blockSizeShift(31); }); + } + + public void testGetActualBlockSizeThrowsErrorForNegativeBlockId() { + expectThrows(AssertionError.class, () -> { AbstractBlockIndexInput.getActualBlockSize(-1, 23, 23); }); + } + + public void testStaticBlockCalculations() { + assertEquals(1024, AbstractBlockIndexInput.getBlockSize(10)); + assertEquals(0, AbstractBlockIndexInput.getBlock(512, 10)); + assertEquals(1, AbstractBlockIndexInput.getBlock(1024, 10)); + assertEquals(512, AbstractBlockIndexInput.getBlockOffset(512, 10)); + assertEquals(0, AbstractBlockIndexInput.getBlockStart(0, 10)); + assertEquals(1024, AbstractBlockIndexInput.getBlockStart(1, 10)); + } + + public void testGetNumberOfBlocks() { + assertEquals(1, AbstractBlockIndexInput.getNumberOfBlocks(1024, 10)); + assertEquals(2, AbstractBlockIndexInput.getNumberOfBlocks(1025, 10)); + assertEquals(2, AbstractBlockIndexInput.getNumberOfBlocks(2048, 10)); + } + + public void testGetActualBlockSize() { + assertEquals(1024, AbstractBlockIndexInput.getActualBlockSize(0, 10, 2048)); + assertEquals(1024, AbstractBlockIndexInput.getActualBlockSize(1, 10, 2048)); + assertEquals(1, AbstractBlockIndexInput.getActualBlockSize(1, 10, 1025)); + } + + public void testFileNameUtilities() { + assertTrue(AbstractBlockIndexInput.isBlockFilename("file_block_0")); + assertFalse(AbstractBlockIndexInput.isBlockFilename("file.txt")); + assertEquals("file_block_5", AbstractBlockIndexInput.getBlockFileName("file", 5)); + assertEquals("original", AbstractBlockIndexInput.getFileNameFromBlockFileName("original_block_3")); + assertEquals("noblock", AbstractBlockIndexInput.getFileNameFromBlockFileName("noblock")); + } + + public void testGetAllBlockIdsForFile() { + List blockIds = AbstractBlockIndexInput.getAllBlockIdsForFile(2048, 10); + assertEquals(2, blockIds.size()); + assertEquals(Integer.valueOf(0), blockIds.get(0)); + assertEquals(Integer.valueOf(1), blockIds.get(1)); + } + + public void testSeekBeyondLength() throws IOException { + try (AbstractBlockIndexInput indexInput = createTestIndexInput()) { + expectThrows(EOFException.class, () -> indexInput.seek(TWO_MB + 1)); + } + } + + public void testSliceValidation() throws IOException { + try (AbstractBlockIndexInput indexInput = createTestIndexInput()) { + expectThrows(IllegalArgumentException.class, () -> indexInput.slice("test", -1, 100)); + expectThrows(IllegalArgumentException.class, () -> indexInput.slice("test", 0, -1)); + expectThrows(IllegalArgumentException.class, () -> indexInput.slice("test", 0, TWO_MB + 1)); + } + } + + public void testReadOperationsWithoutSeek() throws IOException { + try (AbstractBlockIndexInput indexInput = createTestIndexInput()) { + indexInput.readByte(); + } + } + + public void testReadBytesAcrossBlocks() throws IOException { + try (TestAbstractBlockIndexInput indexInput = createTestIndexInput()) { + indexInput.seek(ONE_MB - 10); + assertEquals(0, indexInput.getCurrentBlockId()); + byte[] buffer = new byte[20]; + indexInput.readBytes(buffer, 0, 20); + assertEquals(1, indexInput.getCurrentBlockId()); + } + } + + public void testRandomAccessReads() throws IOException { + try (TestAbstractBlockIndexInput indexInput = createTestIndexInput()) { + indexInput.seek(0); + assertEquals(0, indexInput.getCurrentBlockId()); + indexInput.readByte(ONE_MB + 100); + } + } + + public void testGetFilePointer() throws IOException { + try (TestAbstractBlockIndexInput indexInput = createTestIndexInput()) { + assertEquals(0, indexInput.getFilePointer()); + indexInput.seek(100); + assertEquals(100, indexInput.getFilePointer()); + } + } + + public void testLength() throws IOException { + try (TestAbstractBlockIndexInput indexInput = createTestIndexInput()) { + assertEquals(TWO_MB, indexInput.length()); + } + } + + public void testMultiByteReads() throws IOException { + try (AbstractBlockIndexInput indexInput = createTestIndexInput()) { + indexInput.seek(0); + indexInput.readShort(); + indexInput.readInt(); + indexInput.readLong(); + indexInput.readVInt(); + indexInput.readVLong(); + } + } + + public void testRandomAccessMultiByteReads() throws IOException { + try (TestAbstractBlockIndexInput indexInput = createTestIndexInput()) { + indexInput.seek(0); + indexInput.readShort(100); + indexInput.readInt(200); + indexInput.readLong(300); + Assert.assertEquals(0, indexInput.getCurrentBlockId()); + } + } + + public void testBlockTransitionDuringRead() throws IOException { + try (TestAbstractBlockIndexInput indexInput = createTestIndexInput()) { + indexInput.seek(ONE_MB - 1); + Assert.assertEquals(0, indexInput.getCurrentBlockId()); + indexInput.readByte(); + indexInput.readByte(); + Assert.assertEquals(1, indexInput.getCurrentBlockId()); + } + } + + public void testSliceCreation() throws IOException { + try (TestAbstractBlockIndexInput indexInput = createTestIndexInput()) { + TestAbstractBlockIndexInput slice = (TestAbstractBlockIndexInput) indexInput.slice("test-slice", ONE_MB, 1000); + slice.readByte(); + Assert.assertEquals(1, slice.getCurrentBlockId()); + assertNotNull(slice); + slice.close(); + } + } + + public void testCloneAndOriginalIndependence() throws IOException { + try (AbstractBlockIndexInput indexInput = createTestIndexInput()) { + indexInput.seek(0); + AbstractBlockIndexInput clone = indexInput.clone(); + clone.seek(ONE_MB); + assertEquals(0, indexInput.getFilePointer()); + assertTrue(clone.getFilePointer() > 0); + assertEquals(ONE_MB, clone.getFilePointer()); + clone.close(); + } + } + + public void testEmptyBlockIds() { + List blockIds = AbstractBlockIndexInput.getAllBlockIdsForFile(0, 10); + assertEquals(0, blockIds.size()); + } + + public void testBlockCalculationEdgeCases() { + assertEquals(0, AbstractBlockIndexInput.getBlock(0, 10)); + assertEquals(0, AbstractBlockIndexInput.getBlockOffset(0, 10)); + assertEquals(1023, AbstractBlockIndexInput.getBlockOffset(1023, 10)); + } + + private TestAbstractBlockIndexInput createTestIndexInput() { + return new TestAbstractBlockIndexInput(false); + } + + private static class TestAbstractBlockIndexInput extends AbstractBlockIndexInput { + TestAbstractBlockIndexInput(boolean isClone) { + super( + builder().blockSizeShift(ONE_MB_SHIFT) + .offset(0) + .length(TWO_MB) + .isClone(isClone) + .resourceDescription(TestAbstractBlockIndexInput.class.getName()) + ); + cleanable.clean(); + } + + TestAbstractBlockIndexInput(boolean isClone, long offset, long length) { + super( + builder().blockSizeShift(ONE_MB_SHIFT) + .offset(0) + .length(TWO_MB) + .offset(offset) + .length(length) + .isClone(isClone) + .resourceDescription(TestAbstractBlockIndexInput.class.getName()) + ); + cleanable.clean(); + } + + @Override + protected IndexInput fetchBlock(int blockId) throws IOException { + return new ByteArrayIndexInput( + "", + new byte[(int) AbstractBlockIndexInput.getActualBlockSize(blockId, this.blockSizeShift, this.length)] + ); + } + + @Override + public TestAbstractBlockIndexInput clone() { + return buildSlice("", 0L, length); + } + + public TestAbstractBlockIndexInput buildSlice(String sliceDescription, long offset, long length) { + return new TestAbstractBlockIndexInput(true, this.offset + offset, length); + } + + private int getCurrentBlockId() { + return currentBlockId; + } + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index 5518b37d79dfd..40da4a3cd4c4b 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -107,7 +107,7 @@ private void verifyChunkedRepository(long blockSize, long repositoryChunkSize, l try ( FSDirectory directory = new MMapDirectory(path, lockFactory); IndexInput indexInput = new OnDemandBlockSnapshotIndexInput( - OnDemandBlockIndexInput.builder() + AbstractBlockIndexInput.builder() .resourceDescription(RESOURCE_DESCRIPTION) .offset(BLOCK_SNAPSHOT_FILE_OFFSET) .length(FILE_SIZE) @@ -136,7 +136,7 @@ private void runAllTestsFor(int blockSizeShift) throws Exception { TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE); TestGroup.testGetBlockOffset(blockedSnapshotFile, blockSize, FILE_SIZE); TestGroup.testGetBlockStart(blockedSnapshotFile, blockSize); - TestGroup.testGetBlobParts(blockedSnapshotFile); + TestGroup.testGetBlobParts(blockedSnapshotFile, blockSizeShift); TestGroup.testCurrentBlockStart(blockedSnapshotFile, blockSize); TestGroup.testCurrentBlockPosition(blockedSnapshotFile, blockSize); TestGroup.testClone(blockedSnapshotFile, blockSize); @@ -184,7 +184,7 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in initBlockFiles(blockSize, directory); return new OnDemandBlockSnapshotIndexInput( - OnDemandBlockIndexInput.builder() + AbstractBlockIndexInput.builder() .resourceDescription(RESOURCE_DESCRIPTION) .offset(BLOCK_SNAPSHOT_FILE_OFFSET) .length(FILE_SIZE) @@ -274,11 +274,11 @@ public static void testGetBlockStart(OnDemandBlockSnapshotIndexInput blockedSnap assertEquals(blockSize * 2, blockedSnapshotFile.getBlockStart(2)); } - public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnapshotFile) { + public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSizeShift) { // block id 0 int blockId = 0; long blockStart = blockedSnapshotFile.getBlockStart(blockId); - long blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + long blockEnd = blockStart + AbstractBlockIndexInput.getActualBlockSize(blockId, blockSizeShift, FILE_SIZE); assertEquals( (blockEnd - blockStart), blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() @@ -287,7 +287,7 @@ public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnaps // block 1 blockId = 1; blockStart = blockedSnapshotFile.getBlockStart(blockId); - blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + blockEnd = blockStart + AbstractBlockIndexInput.getActualBlockSize(blockId, blockSizeShift, FILE_SIZE); assertEquals( (blockEnd - blockStart), blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() @@ -296,7 +296,7 @@ public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnaps // block 2 blockId = 2; blockStart = blockedSnapshotFile.getBlockStart(blockId); - blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + blockEnd = blockStart + AbstractBlockIndexInput.getActualBlockSize(blockId, blockSizeShift, FILE_SIZE); assertEquals( (blockEnd - blockStart), blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum()