diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java index 476bb5e10b2e..b8d77bd25457 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java @@ -125,4 +125,13 @@ public CLPStats getCLPStats() { throw new IllegalStateException( "CLP stats not available for column: " + _dataSourceMetadata.getFieldSpec().getName()); } + + @Override + public CLPV2Stats getCLPV2Stats() { + if (_forwardIndex instanceof CLPMutableForwardIndexV2) { + return ((CLPMutableForwardIndexV2) _forwardIndex).getCLPV2Stats(); + } + throw new IllegalStateException( + "CLPV2 stats not available for column: " + _dataSourceMetadata.getFieldSpec().getName()); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java index 8cd940d2df24..0b5ca089d939 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java @@ -444,6 +444,10 @@ public CLPStatsProvider.CLPStats getCLPStats() { totalNumberOfEncodedVars, maxNumberOfEncodedVars); } + public CLPStatsProvider.CLPV2Stats getCLPV2Stats() { + return new CLPStatsProvider.CLPV2Stats(this); + } + public String[] getSortedDictionaryValuesAsStrings(BytesOffHeapMutableDictionary dict, Charset charset) { // Adapted from StringOffHeapMutableDictionary#getSortedValues() int numValues = dict.length(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java new file mode 100644 index 000000000000..92742215c739 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java @@ -0,0 +1,441 @@ +package org.apache.pinot.segment.local.segment.creator.impl.fwd; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardOpenOption; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.pinot.segment.local.io.util.VarLengthValueWriter; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5; +import org.apache.pinot.segment.local.realtime.impl.dictionary.BytesOffHeapMutableDictionary; +import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2; +import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex; +import org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.creator.ColumnStatistics; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code CLPForwardIndexCreatorV2} is responsible for creating the final immutable forward index + * from the {@link CLPMutableForwardIndexV2}. This forward index can be either dictionary-encoded using CLP + * or raw-bytes-encoded, depending on the configuration and the characteristics of the data being processed. + * + *
Compared to the previous version, {@link CLPForwardIndexCreatorV1}, this V2 implementation introduces several + * key improvements:
+ * + *Instead of using fixed-bit encoding (uncompressed), this version uses fixed-byte encoding with Zstandard + * chunk compression for dictionary-encoded IDs. In real-world log data, particularly for dictionary-encoded + * columns, the number of dictionary entries is often too large enough for fixed-bit encoding to achieve optimal + * compression ratio. Using fixed-byte encoding with Zstandard compression significantly improves compression + * ratio.
+ *This version uses the V5 writer for the forward index, which was introduced to improve the compression ratio + * for multi-value fixed-width data types (e.g., longs, ints). The compression efficiency of + * {@code CLPForwardIndexCreatorV2} heavily relies on the optimal storage of multi-valued columns for dictionary + * variable IDs and encoded variables.
+ *The conversion from mutable to immutable forward indexes is significantly optimized. In + * {@link CLPForwardIndexCreatorV1}, the conversion had to decode each row using CLP from the mutable forward index + * and re-encode it, introducing non-trivial serialization and deserialization (serdes) overhead. The new + * {@link CLPMutableForwardIndexV2} eliminates this process entirely, avoiding the need for redundant decoding and + * re-encoding. Additionally, primitive types (byte[]) are used for forward indexes to avoid boxing strings into + * {@link String} objects, which improves both performance and memory efficiency (by reducing garbage collection + * overhead on the heap).
+ *+ * The class manages intermediate files during the forward index creation process. These files are cleaned up once + * the index is sealed and written to the final segment file. + *
+ * + * @see CLPMutableForwardIndexV2 + * @see VarByteChunkForwardIndexWriterV5 + * @see ForwardIndexCreator + */ +public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator { + public static final Logger LOGGER = LoggerFactory.getLogger(CLPForwardIndexCreatorV2.class); + public static final byte[] MAGIC_BYTES = "CLP.v2".getBytes(StandardCharsets.UTF_8); + + public final String _column; + private final int _numDoc; + + private final File _intermediateFilesDir; + private final FileChannel _dataFile; + private final ByteBuffer _fileBuffer; + + private final boolean _isClpEncoded; + private int _logtypeDictSize; + private File _logtypeDictFile; + private VarLengthValueWriter _logtypeDict; + private int _dictVarDictSize; + private File _dictVarDictFile; + private VarLengthValueWriter _dictVarDict; + private File _logtypeIdFwdIndexFile; + private SingleValueFixedByteRawIndexCreator _logtypeIdFwdIndex; + private File _dictVarIdFwdIndexFile; + private MultiValueFixedByteRawIndexCreator _dictVarIdFwdIndex; + private File _encodedVarFwdIndexFile; + private MultiValueFixedByteRawIndexCreator _encodedVarFwdIndex; + private File _rawMsgFwdIndexFile; + private SingleValueVarByteRawIndexCreator _rawMsgFwdIndex; + + /** + * Initializes a forward index creator for the given column using the provided base directory and column statistics. + * This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other immutable forward index + * constructors, this one handles the entire process of converting a mutable forward index into an immutable one. + * + *The {@code columnStatistics} object passed into this constructor should contain a reference to the mutable + * forward index ({@link CLPMutableForwardIndexV2}). The data from the mutable index is efficiently copied over + * into this forward index, which helps minimize serdes overhead. Because of this design, the usual + * {@code putString(String value)} method used during the normal conversion process, is effectively a no-op in + * this class.
+ * + * @param baseIndexDir The base directory where the forward index files will be stored. + * @param columnStatistics The column statistics containing the CLP forward index information, including a reference + * to the mutable forward index. + * @throws IOException If there is an error during initialization or while accessing the file system. + */ + public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics) + throws IOException { + this(baseIndexDir, ((CLPStatsProvider) columnStatistics).getCLPV2Stats().getClpMutableForwardIndexV2(), + ChunkCompressionType.ZSTANDARD); + } + + /** + * Initializes a forward index creator for the given column using the provided mutable forward index and + * compression type. This constructor sets up the forward index for batch ingestion based on the provided CLP + * mutable forward index. + * + * @param baseIndexDir The base directory where the forward index files will be stored. + * @param clpMutableForwardIndex The mutable forward index containing the raw data to be ingested. + * @param chunkCompressionType The compression type to be used for encoding the forward index. + * @throws IOException If there is an error during initialization or while accessing the file system. + */ + public CLPForwardIndexCreatorV2(File baseIndexDir, CLPMutableForwardIndexV2 clpMutableForwardIndex, + ChunkCompressionType chunkCompressionType) + throws IOException { + this(baseIndexDir, clpMutableForwardIndex, chunkCompressionType, false); + } + + /** + * Initializes a forward index creator for the given column using the provided mutable forward index, compression + * type, and an option to force raw encoding. If `forceRawEncoding` is true, the forward index will store raw bytes + * instead of using CLP encoding. + * + * @param baseIndexDir The base directory where the forward index files will be stored. + * @param clpMutableForwardIndex The mutable forward index containing the raw data to be ingested. + * @param chunkCompressionType The compression type used for encoding the forward index. + * @param forceRawEncoding If true, raw bytes encoding will be used, bypassing CLP encoding. + * @throws IOException If there is an error during initialization or while accessing the file system. + */ + public CLPForwardIndexCreatorV2(File baseIndexDir, CLPMutableForwardIndexV2 clpMutableForwardIndex, + ChunkCompressionType chunkCompressionType, boolean forceRawEncoding) + throws IOException { + _column = clpMutableForwardIndex.getColumnName(); + _numDoc = clpMutableForwardIndex.getNumDoc(); + _isClpEncoded = !forceRawEncoding && clpMutableForwardIndex.isClpEncoded(); + if (_isClpEncoded) { + initializeDictionaryEncodingMode(chunkCompressionType, clpMutableForwardIndex.getLogtypeDict().length(), + clpMutableForwardIndex.getDictVarDict().length(), clpMutableForwardIndex.getMaxNumDictVarIdPerDoc(), + clpMutableForwardIndex.getMaxNumEncodedVarPerDoc()); + + // Perform columnar ingestion of the dictionaries and forward indexes + putLogtypeDict(clpMutableForwardIndex.getLogtypeDict()); + putDictVarDict(clpMutableForwardIndex.getDictVarDict()); + putLogtypeId(clpMutableForwardIndex.getLogtypeId(), clpMutableForwardIndex.getNumLogtype()); + putDictVarIds(clpMutableForwardIndex.getDictVarOffset(), clpMutableForwardIndex.getDictVarId()); + putEncodedVars(clpMutableForwardIndex.getEncodedVarOffset(), clpMutableForwardIndex.getEncodedVar()); + } else { + // Raw encoding + initializeRawEncodingMode(chunkCompressionType, clpMutableForwardIndex.getLengthOfLongestElement()); + for (int i = 0; i < clpMutableForwardIndex.getNumDoc(); i++) { + putRawMsgBytes(clpMutableForwardIndex.getRawBytes(i)); + } + } + + _intermediateFilesDir = + new File(baseIndexDir, _column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp"); + if (_intermediateFilesDir.exists()) { + FileUtils.cleanDirectory(_intermediateFilesDir); + } else { + FileUtils.forceMkdir(_intermediateFilesDir); + } + + _dataFile = + new RandomAccessFile(new File(baseIndexDir, _column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION), + "rw").getChannel(); + _fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0, Integer.MAX_VALUE); + } + + /** + * Returns whether the current forward index is CLP-encoded. + * + * @return True if the forward index is CLP-encoded, false otherwise. + */ + public boolean isClpEncoded() { + return _isClpEncoded; + } + + /** + * Initializes the necessary components for raw encoding mode, including setting up the forward index file for raw + * message bytes. This method is called when CLP encoding is not used. + * + * @param chunkCompressionType The compression type used for encoding the forward index. + * @param maxLength The maximum length of the raw byte messages. + * @throws IOException If there is an error during initialization or while accessing the file system. + */ + private void initializeRawEncodingMode(ChunkCompressionType chunkCompressionType, int maxLength) + throws IOException { + _rawMsgFwdIndexFile = new File(_intermediateFilesDir, _column + ".rawMsg"); + _rawMsgFwdIndex = new SingleValueVarByteRawIndexCreator(_rawMsgFwdIndexFile, chunkCompressionType, _numDoc, + FieldSpec.DataType.BYTES, maxLength, true, VarByteChunkForwardIndexWriterV5.VERSION); + } + + /** + * Initializes the necessary components for dictionary encoding mode, including setting up the forward index files for + * logtype IDs, dictionary variable IDs, and encoded variables. This method is called when CLP encoding is used. + * + * @param chunkCompressionType The compression type used for encoding the forward index. + * @param logtypeDictSize The size of the logtype dictionary. + * @param dictVarDictSize The size of the variable-length dictionary. + * @param maxNumDictVarIdPerDoc The maximum number of dictionary variable IDs per document. + * @param maxNumEncodedVarPerDoc The maximum number of encoded variables per document. + * @throws IOException If there is an error during initialization or while accessing the file system. + */ + private void initializeDictionaryEncodingMode(ChunkCompressionType chunkCompressionType, int logtypeDictSize, + int dictVarDictSize, int maxNumDictVarIdPerDoc, int maxNumEncodedVarPerDoc) + throws IOException { + _logtypeDictFile = new File(_intermediateFilesDir, _column + ".lt.dict"); + _logtypeDict = new VarLengthValueWriter(_logtypeDictFile, logtypeDictSize); + _logtypeDictSize = logtypeDictSize; + _logtypeIdFwdIndexFile = new File(_intermediateFilesDir, _column + ".lt.id"); + _logtypeIdFwdIndex = new SingleValueFixedByteRawIndexCreator(_logtypeIdFwdIndexFile, chunkCompressionType, _numDoc, + FieldSpec.DataType.INT, VarByteChunkForwardIndexWriterV5.VERSION); + + _dictVarDictFile = new File(_intermediateFilesDir, _column + ".var.dict"); + _dictVarDict = new VarLengthValueWriter(_dictVarDictFile, dictVarDictSize); + _dictVarDictSize = dictVarDictSize; + _dictVarIdFwdIndexFile = new File(_dictVarIdFwdIndexFile, _column + ".dictVars"); + _dictVarIdFwdIndex = new MultiValueFixedByteRawIndexCreator(_dictVarIdFwdIndexFile, chunkCompressionType, _numDoc, + FieldSpec.DataType.INT, maxNumDictVarIdPerDoc, true, VarByteChunkForwardIndexWriterV5.VERSION); + + _encodedVarFwdIndexFile = new File(_intermediateFilesDir, _column + ".encodedVars"); + _encodedVarFwdIndex = new MultiValueFixedByteRawIndexCreator(_encodedVarFwdIndexFile, chunkCompressionType, _numDoc, + FieldSpec.DataType.LONG, maxNumEncodedVarPerDoc, true, VarByteChunkForwardIndexWriterV5.VERSION); + } + + public void putLogtypeDict(BytesOffHeapMutableDictionary logtypeDict) + throws IOException { + for (int i = 0; i < logtypeDict.length(); i++) { + _logtypeDict.add(logtypeDict.get(i)); + } + } + + public void putDictVarDict(BytesOffHeapMutableDictionary dictVarDict) + throws IOException { + for (int i = 0; i < dictVarDict.length(); i++) { + _dictVarDict.add(dictVarDict.get(i)); + } + } + + public void putLogtypeId(FixedByteSVMutableForwardIndex logtypeIdMutableFwdIndex, int numLogtype) { + for (int i = 0; i < numLogtype; i++) { + _logtypeIdFwdIndex.putInt(logtypeIdMutableFwdIndex.getInt(i)); + } + } + + public void putDictVarIds(FixedByteSVMutableForwardIndex dictVarOffsetMutableFwdIndex, + FixedByteSVMutableForwardIndex dictVarIdMutableFwdIndex) { + int dictVarBeginOffset = 0; + for (int docId = 0; docId < _numDoc; docId++) { + int dictVarEndOffset = dictVarOffsetMutableFwdIndex.getInt(docId); + int numDictVars = dictVarEndOffset - dictVarBeginOffset; + int[] dictVarIds = numDictVars > 0 ? new int[numDictVars] : ArrayUtils.EMPTY_INT_ARRAY; + for (int i = 0; i < numDictVars; i++) { + dictVarIds[i] = dictVarIdMutableFwdIndex.getInt(dictVarBeginOffset + i); + } + _dictVarIdFwdIndex.putIntMV(dictVarIds); + dictVarBeginOffset = dictVarEndOffset; + } + } + + public void putEncodedVars(FixedByteSVMutableForwardIndex encodedVarOffset, + FixedByteSVMutableForwardIndex encodedVarForwardIndex) { + int encodedVarBeginOffset = 0; + for (int docId = 0; docId < _numDoc; docId++) { + int encodedVarEndOffset = encodedVarOffset.getInt(docId); + int numEncodedVars = encodedVarEndOffset - encodedVarBeginOffset; + long[] encodedVars = numEncodedVars > 0 ? new long[numEncodedVars] : ArrayUtils.EMPTY_LONG_ARRAY; + for (int i = 0; i < numEncodedVars; i++) { + encodedVars[i] = encodedVarForwardIndex.getLong(encodedVarBeginOffset + i); + } + _encodedVarFwdIndex.putLongMV(encodedVars); + encodedVarBeginOffset = encodedVarEndOffset; + } + } + + public void putRawMsgBytes(byte[] rawMsgBytes) { + _rawMsgFwdIndex.putBytes(rawMsgBytes); + } + + @Override + public void putString(String value) { + // No-op. All rows from CLPForwardIndexV2 has already been ingested in the constructor. + return; + } + + /** + * Seals the forward index by finalizing and writing all the data to the underlying file storage. This method + * closes all intermediate files and writes the final forward index to the memory-mapped buffer. + */ + @Override + public void seal() { + try { + // Close intermediate files + if (isClpEncoded()) { + _logtypeIdFwdIndex.seal(); + _dictVarIdFwdIndex.seal(); + _encodedVarFwdIndex.seal(); + } else { + _rawMsgFwdIndex.seal(); + } + + if (isClpEncoded()) { + try { + _logtypeDict.close(); + _logtypeIdFwdIndex.close(); + _dictVarDict.close(); + _dictVarIdFwdIndex.close(); + _encodedVarFwdIndex.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close dictionaries and forward indexes for column: " + _column, e); + } + } else { + try { + _rawMsgFwdIndex.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close raw message forward index for column: " + _column, e); + } + } + + // Write intermediate files to memory mapped buffer + long totalSize = 0; + _fileBuffer.putInt(MAGIC_BYTES.length); + totalSize += Integer.BYTES; + _fileBuffer.put(MAGIC_BYTES); + totalSize += MAGIC_BYTES.length; + + _fileBuffer.putInt(2); // version + totalSize += Integer.BYTES; + + _fileBuffer.putInt(_isClpEncoded ? 1 : 0); // isClpEncoded + totalSize += Integer.BYTES; + + if (_isClpEncoded) { + _fileBuffer.putInt(_logtypeDictSize); + totalSize += Integer.BYTES; + + _fileBuffer.putInt(_dictVarDictSize); + totalSize += Integer.BYTES; + + _fileBuffer.putInt((int) _logtypeDictFile.length()); + totalSize += Integer.BYTES; + + _fileBuffer.putInt((int) _dictVarDictFile.length()); + totalSize += Integer.BYTES; + + _fileBuffer.putInt((int) _logtypeIdFwdIndexFile.length()); + totalSize += Integer.BYTES; + + _fileBuffer.putInt((int) _dictVarIdFwdIndexFile.length()); + totalSize += Integer.BYTES; + + _fileBuffer.putInt((int) _encodedVarFwdIndexFile.length()); + totalSize += Integer.BYTES; + + copyFileIntoBuffer(_logtypeDictFile); + totalSize += _logtypeDictFile.length(); + + copyFileIntoBuffer(_dictVarDictFile); + totalSize += _dictVarDictFile.length(); + + copyFileIntoBuffer(_logtypeIdFwdIndexFile); + totalSize += _logtypeIdFwdIndexFile.length(); + + copyFileIntoBuffer(_dictVarIdFwdIndexFile); + totalSize += _dictVarIdFwdIndexFile.length(); + + copyFileIntoBuffer(_encodedVarFwdIndexFile); + totalSize += _encodedVarFwdIndexFile.length(); + } else { + _fileBuffer.putInt((int) _rawMsgFwdIndexFile.length()); + totalSize += Integer.BYTES; + + copyFileIntoBuffer(_rawMsgFwdIndexFile); + totalSize += _rawMsgFwdIndexFile.length(); + } + + // Truncate memory mapped file to actual size + _dataFile.truncate(totalSize); + } catch (IOException e) { + throw new RuntimeException("Failed to seal forward indexes for column: " + _column, e); + } + } + + /** + * Closes the forward index creator, deleting all intermediate files and releasing any resources held by the class. + * + * @throws IOException If there is an error while closing the forward index or deleting the intermediate files. + */ + @Override + public void close() + throws IOException { + // Delete all temp files + FileUtils.deleteDirectory(_intermediateFilesDir); + _dataFile.close(); + } + + @Override + public boolean isDictionaryEncoded() { + return false; + } + + @Override + public boolean isSingleValue() { + return true; + } + + @Override + public FieldSpec.DataType getValueType() { + return FieldSpec.DataType.STRING; + } + + /** + * Copies the contents of the given file into the memory-mapped buffer. + * + * @param file The file to be copied into the memory-mapped buffer. + * @throws IOException If there is an error while reading the file or writing to the buffer. + */ + private void copyFileIntoBuffer(File file) + throws IOException { + try (FileChannel from = (FileChannel.open(file.toPath(), StandardOpenOption.READ))) { + _fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0, file.length())); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java index c509650ee215..a407b45f691a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java @@ -53,6 +53,24 @@ public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionTy ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK); } + /** + * Constructor for the class + * + * @param indexFile Index file to write to + * @param compressionType Type of compression to use + * @param totalDocs Total number of documents to index + * @param valueType Total number of documents to index + * @param writerVersion writer format version + * @throws IOException + */ + public SingleValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs, + DataType valueType, int writerVersion) + throws IOException { + _indexWriter = new FixedByteChunkForwardIndexWriter(indexFile, compressionType, totalDocs, + ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK, valueType.size(), writerVersion); + _valueType = valueType; + } + /** * Constructor for the class * diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java index 5b5a1ff0e335..6fe14b68ec2a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java @@ -77,16 +77,57 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion, int targetMaxChunkSizeBytes, int targetDocsPerChunk) throws IOException { - File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); + this(new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION), compressionType, + totalDocs, valueType, maxLength, deriveNumDocsPerChunk, writerVersion, targetMaxChunkSizeBytes, + targetDocsPerChunk); + } + + /** + * Create a var-byte raw index creator for the given column + * @param indexFile Index file + * @param compressionType Type of compression to use + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxLength length of longest entry (in bytes) + * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk + * @param writerVersion writer format version + * @throws IOException + */ + public SingleValueVarByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs, + DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion) + throws IOException { + this(indexFile, compressionType, totalDocs, valueType, maxLength, deriveNumDocsPerChunk, writerVersion, + ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES, ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK); + } + + /** + * Create a var-byte raw index creator for the given column + * @param indexFile Index file + * @param compressionType Type of compression to use + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxLength length of longest entry (in bytes) + * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk + * @param writerVersion writer format version + * @param targetMaxChunkSizeBytes target max chunk size in bytes, applicable only for V4 & V5 or when + * deriveNumDocsPerChunk is true + * @param targetDocsPerChunk target number of docs per chunk + * @throws IOException + */ + public SingleValueVarByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs, + DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion, int targetMaxChunkSizeBytes, + int targetDocsPerChunk) + throws IOException { if (writerVersion < VarByteChunkForwardIndexWriterV4.VERSION) { int numDocsPerChunk = deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength, targetMaxChunkSizeBytes) : targetDocsPerChunk; - _indexWriter = new VarByteChunkForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength, - writerVersion); + _indexWriter = + new VarByteChunkForwardIndexWriter(indexFile, compressionType, totalDocs, numDocsPerChunk, maxLength, + writerVersion); } else { int chunkSize = ForwardIndexUtils.getDynamicTargetChunkSize(maxLength, targetDocsPerChunk, targetMaxChunkSizeBytes); - _indexWriter = new VarByteChunkForwardIndexWriterV4(file, compressionType, chunkSize); + _indexWriter = new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, chunkSize); } _valueType = valueType; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java index b8611886206b..f77613b70d21 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java @@ -18,10 +18,18 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.stats; +import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2; + + public interface CLPStatsProvider { CLPStats getCLPStats(); + default CLPV2Stats getCLPV2Stats() { + throw new IllegalStateException( + "This method should only be implemented and used in MutableNoDictionaryColStatistics class."); + } + class CLPStats { int _totalNumberOfDictVars = 0; int _totalNumberOfEncodedVars = 0; @@ -63,4 +71,23 @@ public String[] getSortedDictVarValues() { return _sortedDictVarValues; } } + + /** + * CLPV2Stats maintains a reference to CLPMutableForwardIndexV2. In CLP V2 forward indexes, + * to convert a mutable forward index to an immutable one, it bypasses the need to decode + * and re-encode the CLP-encoded data. Instead, it directly transfers the already encoded + * columnar data from the mutable index to the immutable index along with its dictionary + * entries. + */ + class CLPV2Stats { + private CLPMutableForwardIndexV2 _clpMutableForwardIndexV2; + + public CLPV2Stats(CLPMutableForwardIndexV2 clpMutableForwardIndexV2) { + _clpMutableForwardIndexV2 = clpMutableForwardIndexV2; + } + + public CLPMutableForwardIndexV2 getClpMutableForwardIndexV2() { + return _clpMutableForwardIndexV2; + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java index 7d51a09a3b89..87cb7262225f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; @@ -74,6 +75,9 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) { return new CLPForwardIndexCreatorV1(indexDir, columnName, numTotalDocs, context.getColumnStatistics()); } + if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2) { + return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics()); + } ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType(); if (chunkCompressionType == null) { chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java index cc7201ed985f..59a69047c0df 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java @@ -23,7 +23,9 @@ import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5; import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2; import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1; +import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; @@ -95,6 +97,13 @@ public static ForwardIndexReader createIndexReader(PinotDataBuffer dataBuffer, C return new CLPForwardIndexReaderV1(dataBuffer, metadata.getTotalDocs()); } } + if (dataBuffer.size() >= CLPForwardIndexCreatorV2.MAGIC_BYTES.length) { + byte[] magicBytes = new byte[CLPForwardIndexCreatorV2.MAGIC_BYTES.length]; + dataBuffer.copyTo(0, magicBytes); + if (Arrays.equals(magicBytes, CLPForwardIndexCreatorV2.MAGIC_BYTES)) { + return new CLPForwardIndexReaderV2(dataBuffer, metadata.getTotalDocs()); + } + } return createRawIndexReader(dataBuffer, metadata.getDataType().getStoredType(), metadata.isSingleValue()); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java new file mode 100644 index 000000000000..a828814bf181 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java @@ -0,0 +1,242 @@ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions; +import com.yscope.clp.compressorfrontend.FlattenedByteArrayFactory; +import com.yscope.clp.compressorfrontend.MessageDecoder; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.pinot.segment.local.io.util.PinotDataBitSet; +import org.apache.pinot.segment.local.io.util.VarLengthValueReader; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * {@code CLPForwardIndexReaderV2} is a forward index reader for CLP-encoded forward indexes. It supports reading both + * CLP-encoded and raw message forward indexes created by {@link CLPForwardIndexCreatorV2}. + * + *This class supports two modes of reading: + *
This constructor reads the metadata from the data buffer and initializes the appropriate readers for either + * CLP-encoded or raw message forward indexes.
+ * + * @param pinotDataBuffer The data buffer containing the forward index. + * @param numDocs The number of documents in the forward index. + * @throws UnsupportedOperationException If the magic bytes do not match the expected CLP forward index format. + */ + public CLPForwardIndexReaderV2(PinotDataBuffer pinotDataBuffer, int numDocs) { + _numDocs = numDocs; + int offset = 0; + int magicBytesLength = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + byte[] magicBytes = new byte[magicBytesLength]; + pinotDataBuffer.copyTo(offset, magicBytes); + + // Validate against supported version + if (!Arrays.equals(magicBytes, CLPForwardIndexCreatorV2.MAGIC_BYTES)) { + throw new UnsupportedOperationException("Unsupported magic bytes"); + } + offset += CLPForwardIndexCreatorV2.MAGIC_BYTES.length; + + _version = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + + _isClpEncoded = pinotDataBuffer.getInt(offset) == 1; // 1 -> true, 0 -> false + offset += Integer.BYTES; + + if (_isClpEncoded) { + int logtypeDictSize = pinotDataBuffer.getInt(offset); + _logtypeDictNumBytesPerValue = PinotDataBitSet.getNumBitsPerValue(logtypeDictSize - 1); + offset += Integer.BYTES; + + int dictVarDictSize = pinotDataBuffer.getInt(offset); + _dictVarDictNumBytesPerValue = PinotDataBitSet.getNumBitsPerValue(dictVarDictSize - 1); + offset += Integer.BYTES; + + int logtypeDictLength = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + int dictVarDictLength = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + int logtypeIdFwdIndexLength = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + int dictVarIdFwdIndexLength = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + int encodedVarFwdIndexLength = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + + _logTypeDictReader = new VarLengthValueReader(pinotDataBuffer.view(offset, offset + logtypeDictLength)); + offset += logtypeDictLength; + + _dictVarDictReader = new VarLengthValueReader(pinotDataBuffer.view(offset, offset + dictVarDictLength)); + offset += dictVarDictLength; + + _logTypeIdFwdIndexReader = + new FixedBytePower2ChunkSVForwardIndexReader(pinotDataBuffer.view(offset, offset + logtypeIdFwdIndexLength), + FieldSpec.DataType.INT); + offset += logtypeIdFwdIndexLength; + + _dictVarIdFwdIndexReader = + new VarByteChunkForwardIndexReaderV5(pinotDataBuffer.view(offset, offset + dictVarIdFwdIndexLength), + FieldSpec.DataType.INT, false); + offset += dictVarIdFwdIndexLength; + + _encodedVarFwdIndexReader = + new VarByteChunkForwardIndexReaderV5(pinotDataBuffer.view(offset, offset + encodedVarFwdIndexLength), + FieldSpec.DataType.LONG, false); + offset += encodedVarFwdIndexLength; + + _clpMessageDecoder = new MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2, + BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1); + } else { + int rawMsgFwdIndexLength = pinotDataBuffer.getInt(offset); + offset += Integer.BYTES; + + _rawMsgFwdIndexReader = + new VarByteChunkForwardIndexReaderV5(pinotDataBuffer.view(offset, offset + rawMsgFwdIndexLength), + FieldSpec.DataType.BYTES, false); + offset += rawMsgFwdIndexLength; + } + } + + /** + * Creates a new {@code CLPReaderContext} for reading data from the forward index. + * + * @return A new {@code CLPReaderContext} initialized with the appropriate reader contexts for the forward index. + */ + public CLPForwardIndexReaderV2.CLPReaderContext createContext() { + if (_isClpEncoded) { + return new CLPReaderContext(_logTypeIdFwdIndexReader.createContext(), _dictVarIdFwdIndexReader.createContext(), + _encodedVarFwdIndexReader.createContext()); + } else { + return new CLPReaderContext(_rawMsgFwdIndexReader.createContext()); + } + } + + @Override + public boolean isDictionaryEncoded() { + return false; + } + + @Override + public boolean isSingleValue() { + return true; + } + + @Override + public FieldSpec.DataType getStoredType() { + return FieldSpec.DataType.STRING; + } + + @Override + public String getString(int docId, CLPReaderContext context) { + if (_isClpEncoded) { + byte[] logtype = + _logTypeDictReader.getBytes(_logTypeIdFwdIndexReader.getInt(docId, context._logTypeIdReaderContext), + _logtypeDictNumBytesPerValue); + + int[] dictVarIds = _dictVarIdFwdIndexReader.getIntMV(docId, context._dictVarIdReaderContext); + byte[][] dictVars = new byte[dictVarIds.length][]; + for (int i = 0; i < dictVars.length; i++) { + dictVars[i] = _dictVarDictReader.getBytes(dictVarIds[i], _dictVarDictNumBytesPerValue); + } + + long[] encodedVars = _encodedVarFwdIndexReader.getLongMV(docId, context._encodedVarReaderContext); + try { + return _clpMessageDecoder.decodeMessage(logtype, FlattenedByteArrayFactory.fromByteArrays(dictVars), + encodedVars); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + byte[] rawMsg = _rawMsgFwdIndexReader.getBytes(docId, context._rawMsgReaderContext); + return new String(rawMsg, StandardCharsets.UTF_8); + } + } + + @Override + public void close() + throws IOException { + } + + /** + * The {@code CLPReaderContext} is a context class used to hold reader-specific state during forward index reading. + * It contains references to reader contexts for logtype IDs, dictionary variable IDs, encoded variables, or raw + * messages. + */ + public static final class CLPReaderContext implements ForwardIndexReaderContext { + private final ChunkReaderContext _logTypeIdReaderContext; + private final VarByteChunkForwardIndexReaderV5.ReaderContext _dictVarIdReaderContext; + private final VarByteChunkForwardIndexReaderV5.ReaderContext _encodedVarReaderContext; + private final VarByteChunkForwardIndexReaderV4.ReaderContext _rawMsgReaderContext; + + public CLPReaderContext(ChunkReaderContext logTypeIdReaderContext, + VarByteChunkForwardIndexReaderV5.ReaderContext dictVarIdReaderContext, + VarByteChunkForwardIndexReaderV5.ReaderContext encodedVarReaderContext) { + this(logTypeIdReaderContext, dictVarIdReaderContext, encodedVarReaderContext, null); + } + + public CLPReaderContext(VarByteChunkForwardIndexReaderV4.ReaderContext rawMsgReaderContext) { + this(null, null, null, rawMsgReaderContext); + } + + public CLPReaderContext(ChunkReaderContext logTypeIdReaderContext, + VarByteChunkForwardIndexReaderV5.ReaderContext dictVarIdReaderContext, + VarByteChunkForwardIndexReaderV5.ReaderContext encodedVarReaderContext, + VarByteChunkForwardIndexReaderV4.ReaderContext rawMsgReaderContext) { + _logTypeIdReaderContext = logTypeIdReaderContext; + _dictVarIdReaderContext = dictVarIdReaderContext; + _encodedVarReaderContext = encodedVarReaderContext; + _rawMsgReaderContext = rawMsgReaderContext; + } + + @Override + public void close() + throws IOException { + if (null != _logTypeIdReaderContext) { + _logTypeIdReaderContext.close(); + } + if (null != _dictVarIdReaderContext) { + _dictVarIdReaderContext.close(); + } + if (null != _encodedVarReaderContext) { + _encodedVarReaderContext.close(); + } + if (null != _rawMsgReaderContext) { + _rawMsgReaderContext.close(); + } + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java new file mode 100644 index 000000000000..3123eb2663bb --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java @@ -0,0 +1,71 @@ +package org.apache.pinot.segment.local.segment.index.creator; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; +import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2; +import org.apache.pinot.segment.local.segment.index.forward.mutable.VarByteSVMutableForwardIndexTest; +import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class CLPForwardIndexCreatorV2Test { + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), CLPForwardIndexCreatorV2Test.class.getSimpleName()); + private PinotDataBufferMemoryManager _memoryManager; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); + _memoryManager = new DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName()); + } + + @Test + public void testCLPWriter() + throws IOException { + List