Skip to content

Commit

Permalink
Add initial implementation of CLPForwardIndexCreatorV2 and the associ…
Browse files Browse the repository at this point in the history
…ated reader, unit test, table config updates, etc.
  • Loading branch information
jackluo923 committed Oct 23, 2024
1 parent 281478e commit 86aa832
Show file tree
Hide file tree
Showing 12 changed files with 874 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,13 @@ public CLPStats getCLPStats() {
throw new IllegalStateException(
"CLP stats not available for column: " + _dataSourceMetadata.getFieldSpec().getName());
}

@Override
public CLPV2Stats getCLPV2Stats() {
if (_forwardIndex instanceof CLPMutableForwardIndexV2) {
return ((CLPMutableForwardIndexV2) _forwardIndex).getCLPV2Stats();
}
throw new IllegalStateException(
"CLPV2 stats not available for column: " + _dataSourceMetadata.getFieldSpec().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ public CLPStatsProvider.CLPStats getCLPStats() {
totalNumberOfEncodedVars, maxNumberOfEncodedVars);
}

public CLPStatsProvider.CLPV2Stats getCLPV2Stats() {
return new CLPStatsProvider.CLPV2Stats(this);
}

public String[] getSortedDictionaryValuesAsStrings(BytesOffHeapMutableDictionary dict, Charset charset) {
// Adapted from StringOffHeapMutableDictionary#getSortedValues()
int numValues = dict.length();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionTy
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
}

/**
* Constructor for the class
*
* @param indexFile Index file to write to
* @param compressionType Type of compression to use
* @param totalDocs Total number of documents to index
* @param valueType Total number of documents to index
* @param writerVersion writer format version
* @throws IOException
*/
public SingleValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
DataType valueType, int writerVersion)
throws IOException {
_indexWriter = new FixedByteChunkForwardIndexWriter(indexFile, compressionType, totalDocs,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK, valueType.size(), writerVersion);
_valueType = valueType;
}

/**
* Constructor for the class
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,57 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion,
int targetMaxChunkSizeBytes, int targetDocsPerChunk)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
this(new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION), compressionType,
totalDocs, valueType, maxLength, deriveNumDocsPerChunk, writerVersion, targetMaxChunkSizeBytes,
targetDocsPerChunk);
}

/**
* Create a var-byte raw index creator for the given column
* @param indexFile Index file
* @param compressionType Type of compression to use
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
* @param maxLength length of longest entry (in bytes)
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
* @param writerVersion writer format version
* @throws IOException
*/
public SingleValueVarByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
this(indexFile, compressionType, totalDocs, valueType, maxLength, deriveNumDocsPerChunk, writerVersion,
ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES, ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
}

/**
* Create a var-byte raw index creator for the given column
* @param indexFile Index file
* @param compressionType Type of compression to use
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
* @param maxLength length of longest entry (in bytes)
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
* @param writerVersion writer format version
* @param targetMaxChunkSizeBytes target max chunk size in bytes, applicable only for V4 & V5 or when
* deriveNumDocsPerChunk is true
* @param targetDocsPerChunk target number of docs per chunk
* @throws IOException
*/
public SingleValueVarByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion, int targetMaxChunkSizeBytes,
int targetDocsPerChunk)
throws IOException {
if (writerVersion < VarByteChunkForwardIndexWriterV4.VERSION) {
int numDocsPerChunk =
deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength, targetMaxChunkSizeBytes) : targetDocsPerChunk;
_indexWriter = new VarByteChunkForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength,
writerVersion);
_indexWriter =
new VarByteChunkForwardIndexWriter(indexFile, compressionType, totalDocs, numDocsPerChunk, maxLength,
writerVersion);
} else {
int chunkSize =
ForwardIndexUtils.getDynamicTargetChunkSize(maxLength, targetDocsPerChunk, targetMaxChunkSizeBytes);
_indexWriter = new VarByteChunkForwardIndexWriterV4(file, compressionType, chunkSize);
_indexWriter = new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, chunkSize);
}
_valueType = valueType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.stats;

import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;


public interface CLPStatsProvider {

CLPStats getCLPStats();

default CLPV2Stats getCLPV2Stats() {
throw new IllegalStateException(
"This method should only be implemented and used in MutableNoDictionaryColStatistics class.");
}

class CLPStats {
int _totalNumberOfDictVars = 0;
int _totalNumberOfEncodedVars = 0;
Expand Down Expand Up @@ -63,4 +71,23 @@ public String[] getSortedDictVarValues() {
return _sortedDictVarValues;
}
}

/**
* CLPV2Stats maintains a reference to CLPMutableForwardIndexV2. In CLP V2 forward indexes,
* to convert a mutable forward index to an immutable one, it bypasses the need to decode
* and re-encode the CLP-encoded data. Instead, it directly transfers the already encoded
* columnar data from the mutable index to the immutable index along with its dictionary
* entries.
*/
class CLPV2Stats {
private CLPMutableForwardIndexV2 _clpMutableForwardIndexV2;

public CLPV2Stats(CLPMutableForwardIndexV2 clpMutableForwardIndexV2) {
_clpMutableForwardIndexV2 = clpMutableForwardIndexV2;
}

public CLPMutableForwardIndexV2 getClpMutableForwardIndexV2() {
return _clpMutableForwardIndexV2;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
Expand Down Expand Up @@ -74,6 +75,9 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) {
return new CLPForwardIndexCreatorV1(indexDir, columnName, numTotalDocs, context.getColumnStatistics());
}
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2) {
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics());
}
ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType();
if (chunkCompressionType == null) {
chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
Expand Down Expand Up @@ -95,6 +97,13 @@ public static ForwardIndexReader createIndexReader(PinotDataBuffer dataBuffer, C
return new CLPForwardIndexReaderV1(dataBuffer, metadata.getTotalDocs());
}
}
if (dataBuffer.size() >= CLPForwardIndexCreatorV2.MAGIC_BYTES.length) {
byte[] magicBytes = new byte[CLPForwardIndexCreatorV2.MAGIC_BYTES.length];
dataBuffer.copyTo(0, magicBytes);
if (Arrays.equals(magicBytes, CLPForwardIndexCreatorV2.MAGIC_BYTES)) {
return new CLPForwardIndexReaderV2(dataBuffer, metadata.getTotalDocs());
}
}
return createRawIndexReader(dataBuffer, metadata.getDataType().getStoredType(), metadata.isSingleValue());
}
}
Expand Down
Loading

0 comments on commit 86aa832

Please sign in to comment.