Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@
import java.io.EOFException;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Class acts as a virtual file mechanism for the accessed files and only fetches the required blocks of the actual file.
* Original/Main IndexInput file will be split using {@link OnDemandBlockIndexInput.Builder#DEFAULT_BLOCK_SIZE_SHIFT}. This class has all the
* logic of how and when to fetch specific block of the main file. Each block is identified by {@link OnDemandBlockIndexInput#currentBlockId}.
* Original/Main IndexInput file will be split using {@link AbstractBlockIndexInput.Builder#DEFAULT_BLOCK_SIZE_SHIFT}. This class has all the
* logic of how and when to fetch specific block of the main file. Each block is identified by {@link AbstractBlockIndexInput#currentBlockId}.
* <br>
* This class delegate the responsibility of actually fetching the block when demanded to its subclasses using
* {@link OnDemandBlockIndexInput#fetchBlock(int)}.
* {@link AbstractBlockIndexInput#fetchBlock(int)}.
* <p>
* Like {@link IndexInput}, this class may only be used from one thread as it is not thread safe.
* However, a cleaning action may run from another thread triggered by the {@link Cleaner}, but
* this is okay because at that point the {@link OnDemandBlockIndexInput} instance is phantom
* this is okay because at that point the {@link AbstractBlockIndexInput} instance is phantom
* reachable and therefore not able to be accessed by any other thread.
*
* @opensearch.internal
*/
public abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput {
private static final Logger logger = LogManager.getLogger(OnDemandBlockIndexInput.class);
public abstract class AbstractBlockIndexInput extends IndexInput implements RandomAccessInput {
private static final Logger logger = LogManager.getLogger(AbstractBlockIndexInput.class);

public static final String CLEANER_THREAD_NAME_PREFIX = "index-input-cleaner";

Expand All @@ -47,7 +50,7 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand
* the cleaning action is a no-op. For an open IndexInput, the close action
* will decrement a reference count.
*/
private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX));
protected static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX));

/**
* Start offset of the virtual file : non-zero in the slice case
Expand Down Expand Up @@ -75,25 +78,26 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand
/**
* ID of the current block
*/
private int currentBlockId;
protected int currentBlockId;

private final BlockHolder blockHolder = new BlockHolder();
protected final Cleaner.Cleanable cleanable;

OnDemandBlockIndexInput(Builder builder) {
protected AbstractBlockIndexInput(Builder builder) {
super(builder.resourceDescription);
this.isClone = builder.isClone;
this.offset = builder.offset;
this.length = builder.length;
this.blockSizeShift = builder.blockSizeShift;
this.blockSize = builder.blockSize;
this.blockMask = builder.blockMask;
CLEANER.register(this, blockHolder);
this.cleanable = CLEANER.register(this, blockHolder);
}

/**
* Builds the actual sliced IndexInput (may apply extra offset in subclasses).
**/
protected abstract OnDemandBlockIndexInput buildSlice(String sliceDescription, long offset, long length);
protected abstract AbstractBlockIndexInput buildSlice(String sliceDescription, long offset, long length);

/**
* Given a blockId, fetch it's IndexInput which might be partial/split/cloned one
Expand All @@ -103,7 +107,7 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand
protected abstract IndexInput fetchBlock(int blockId) throws IOException;

@Override
public abstract OnDemandBlockIndexInput clone();
public abstract AbstractBlockIndexInput clone();

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
Expand Down Expand Up @@ -302,6 +306,107 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException {

}

/**
* Utility method to get the blockSize given blockSizeShift.
* @param blockSizeShift blockSizeShift used to calculate blockSize.
* @return returns blockSize
*/
public static int getBlockSize(int blockSizeShift) {
return 1 << blockSizeShift;
}

/**
* Utility method to get the blockId corresponding to the file offset passed.
* @param pos file offset whose blockId is requested.
* @param blockSizeShift blockSizeShift used to calculate blockSize.
* @return blockId for the given pos.
*/
public static int getBlock(long pos, int blockSizeShift) {
return (int) (pos >>> blockSizeShift);
}

/**
* Utility method to convert file offset to block level offset.
* @param pos fileOffset whose block offset is requested.
* @param blockSizeShift blockSizeShift used to calculate blockSize.
* @return returns block offset for the given pos.
*/
public static long getBlockOffset(long pos, int blockSizeShift) {
return (long) (pos & (getBlockSize(blockSizeShift) - 1));
}

/**
* Utility method to get the starting file offset of the given block.
* @param blockId blockId whose start offset is requested.
* @param blockSizeShift blockSizeShift used to calculate blockSize.
* @return returns the file offset corresponding to the start of the block.
*/
public static long getBlockStart(int blockId, int blockSizeShift) {
return (long) blockId << blockSizeShift;
}

/**
* Utility method to get the number of blocks in a file.
* @param fileSize fileSize of the original file.
* @param blockSizeShift blockSizeShift used to calculate blockSize.
* @return returns the number of blocks in the file.
*/
public static int getNumberOfBlocks(long fileSize, int blockSizeShift) {
return (int) getBlock(fileSize - 1, blockSizeShift) + 1;
}

/**
* Utility method get the size of the given blockId.
* @param blockId blockId whose size is requested
* @param blockSizeShift blockSizeShift used to calculate blockSize.
* @param fileSize fileSize of the original file.
* @return returns the size of the block whose blockId is passed.
*/
public static long getActualBlockSize(int blockId, int blockSizeShift, long fileSize) {
assert blockId >= 0 : "blockId cannot be negative";
return (blockId != getBlock(fileSize - 1, blockSizeShift))
? getBlockSize(blockSizeShift)
: getBlockOffset(fileSize - 1, blockSizeShift) + 1;
}

/**
* Utility method to a list of blockIds for a given fileSize.
* @param fileSize size of the file for which blockIds are requested.
* @param blockSizeShift blockSizeShift (used to calculate blockSize) used to create blocks.
* @return returns a list of integers representing blockIds.
*/
public static List<Integer> getAllBlockIdsForFile(long fileSize, int blockSizeShift) {
return IntStream.rangeClosed(0, getNumberOfBlocks(fileSize, blockSizeShift) - 1).boxed().collect(Collectors.toList());
}

/**
* Utility method to validate if a given fileName is a blockFileName.
* @param fileName fileName to check
* @return returns true if the passed fileName is a valid block file name.
*/
public static boolean isBlockFilename(String fileName) {
return fileName.contains("_block_");
}

/**
* Utility method to generate block file name for a given fileName and blockId as per naming convention.
* @param fileName fileName whose block file name is required
* @param blockId blockId of the file whose block file name is required
* @return returns the blockFileName
*/
public static String getBlockFileName(String fileName, int blockId) {
return fileName + "_block_" + blockId;
}

/**
* Utility method to get the original file name given the block file name. .
* @param blockFileName name of the block file whose original file name is required.
* @return returns the original file name, No op if blockFileName is not a valid name for a block file.
*/
public static String getFileNameFromBlockFileName(String blockFileName) {
return blockFileName.contains("_block_") ? blockFileName.substring(0, blockFileName.indexOf("_block_")) : blockFileName;
}

/**
* Seek to a block position, download the block if it's necessary
* NOTE: the pos should be an adjusted position for slices
Expand Down Expand Up @@ -341,7 +446,7 @@ private void demandBlock(int blockId) throws IOException {
currentBlockId = blockId;
}

protected void cloneBlock(OnDemandBlockIndexInput other) {
protected void cloneBlock(AbstractBlockIndexInput other) {
if (other.blockHolder.block != null) {
this.blockHolder.set(other.blockHolder.block.clone());
this.currentBlockId = other.currentBlockId;
Expand Down Expand Up @@ -373,64 +478,69 @@ public static Builder builder() {
}

/**
* Builder for {@link OnDemandBlockIndexInput}. The default block size is 8MiB
* Builder for {@link AbstractBlockIndexInput}. The default block size is 8MiB
* (see {@link Builder#DEFAULT_BLOCK_SIZE_SHIFT}).
*/
public static class Builder {
public static class Builder<T extends Builder<T>> {
// Block size shift (default value is 23 == 2^23 == 8MiB)
public static final int DEFAULT_BLOCK_SIZE_SHIFT = 23;
public static final int DEFAULT_BLOCK_SIZE = 1 << DEFAULT_BLOCK_SIZE_SHIFT;;

private String resourceDescription;
private boolean isClone;
private long offset;
private long length;
private int blockSizeShift = DEFAULT_BLOCK_SIZE_SHIFT;
private int blockSize = 1 << blockSizeShift;
private int blockMask = blockSize - 1;
protected String resourceDescription;
protected boolean isClone;
protected long offset;
protected long length;
protected int blockSizeShift = DEFAULT_BLOCK_SIZE_SHIFT;
protected int blockSize = 1 << blockSizeShift;
protected int blockMask = blockSize - 1;

private Builder() {}
protected Builder() {}

@SuppressWarnings("unchecked")
protected final T self() {
return (T) this;
}

public Builder resourceDescription(String resourceDescription) {
this.resourceDescription = resourceDescription;
return this;
public T resourceDescription(String resourceDescription) {
this.resourceDescription = Objects.requireNonNull(resourceDescription, "Resource description cannot be null");
return self();
}

public Builder isClone(boolean clone) {
isClone = clone;
return this;
public T isClone(boolean clone) {
this.isClone = clone;
return self();
}

public Builder offset(long offset) {
public T offset(long offset) {
this.offset = offset;
return this;
return self();
}

public Builder length(long length) {
public T length(long length) {
this.length = length;
return this;
return self();
}

public Builder blockSizeShift(int blockSizeShift) {
public T blockSizeShift(int blockSizeShift) {
assert blockSizeShift < 31 : "blockSizeShift must be < 31";
this.blockSizeShift = blockSizeShift;
this.blockSize = 1 << blockSizeShift;
this.blockMask = blockSize - 1;
return this;
return self();
}
}

/**
* Simple class to hold the currently open IndexInput backing an instance
* of an {@link OnDemandBlockIndexInput}. Lucene may clone one of these
* of an {@link AbstractBlockIndexInput}. Lucene may clone one of these
* instances, and per the contract[1], the clones will never be closed.
* However, closing the instances is critical for our reference counting.
* Therefore, we are using the {@link Cleaner} mechanism from the JDK to
* close these clones when they become phantom reachable. The clean action
* must not hold a reference to the {@link OnDemandBlockIndexInput} itself
* must not hold a reference to the {@link AbstractBlockIndexInput} itself
* (otherwise it would never become phantom reachable!) so we need a wrapper
* instance to hold the current underlying IndexInput, while allowing it to
* be changed out with different instances as {@link OnDemandBlockIndexInput}
* be changed out with different instances as {@link AbstractBlockIndexInput}
* reads through the data.
* <p>
* This class implements {@link Runnable} so that it can be passed directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import java.util.List;

/**
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
* This is an implementation of {@link AbstractBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
* <br>
* This class rely on {@link TransferManager} to really fetch the snapshot files from the remote blob store and maybe cache them
*
* @opensearch.internal
*/
public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
public class OnDemandBlockSnapshotIndexInput extends AbstractBlockIndexInput {
private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class);
/**
* Where this class fetches IndexInput parts from
Expand Down Expand Up @@ -89,15 +89,15 @@ public OnDemandBlockSnapshotIndexInput(
TransferManager transferManager
) {
this(
OnDemandBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length),
AbstractBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length),
fileInfo,
directory,
transferManager
);
}

protected OnDemandBlockSnapshotIndexInput(
OnDemandBlockIndexInput.Builder builder,
AbstractBlockIndexInput.Builder builder,
FileInfo fileInfo,
FSDirectory directory,
TransferManager transferManager
Expand All @@ -122,7 +122,7 @@ protected OnDemandBlockSnapshotIndexInput(
@Override
protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, long offset, long length) {
return new OnDemandBlockSnapshotIndexInput(
OnDemandBlockIndexInput.builder()
AbstractBlockIndexInput.builder()
.blockSizeShift(blockSizeShift)
.isClone(true)
.offset(this.offset + offset)
Expand All @@ -137,10 +137,10 @@ protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, lo
@Override
protected IndexInput fetchBlock(int blockId) throws IOException {
logger.trace("fetchBlock called with blockId -> {}", blockId);
final String blockFileName = fileName + "_block_" + blockId;
final String blockFileName = getBlockFileName(fileName, blockId);

final long blockStart = getBlockStart(blockId);
final long blockEnd = blockStart + getActualBlockSize(blockId);
final long blockEnd = blockStart + getActualBlockSize(blockId, blockSizeShift, originalFileSize);
logger.trace(
"File: {} , Block File: {} , BlockStart: {} , BlockEnd: {} , OriginalFileSize: {}",
fileName,
Expand Down Expand Up @@ -196,8 +196,4 @@ public OnDemandBlockSnapshotIndexInput clone() {
clone.cloneBlock(this);
return clone;
}

protected long getActualBlockSize(int blockId) {
return (blockId != getBlock(originalFileSize - 1)) ? blockSize : getBlockOffset(originalFileSize - 1) + 1;
}
}
Loading
Loading