Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
becfab8
feat(tsdb): add composable pipeline framework for ES94 TSDB codec
salvatore-campagna Mar 4, 2026
4f5a092
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 4, 2026
4e15b19
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 4, 2026
6822cd0
[CI] Auto commit changes from spotless
Mar 4, 2026
669d8df
fix(tsdb): rename describeStages empty fallback from "default" to "em…
salvatore-campagna Mar 4, 2026
d92933e
test(tsdb): add reverse metadata read test for gather-scatter contract
salvatore-campagna Mar 4, 2026
770902c
feat(tsdb): add configurable compression level to ZstdPayload
salvatore-campagna Mar 4, 2026
2f63f5a
refactor(tsdb): remove final from primitive method parameters
salvatore-campagna Mar 4, 2026
da88804
[CI] Auto commit changes from spotless
Mar 4, 2026
544d8a5
refactor(tsdb): rename fpcStage/alpDoubleStage/alpFloatStage to fpc/a…
salvatore-campagna Mar 4, 2026
f7e2447
refactor(tsdb): tighten visibility across pipeline framework
salvatore-campagna Mar 4, 2026
dbcf4e6
refactor(tsdb): make ZstdPayload compression level constants package-…
salvatore-campagna Mar 4, 2026
250f9b7
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 4, 2026
7c42b01
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 4, 2026
20d7d4a
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 5, 2026
2599725
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 5, 2026
6bfcfee
refactor: strip stages to ES819-parity (delta, offset, gcd, bitpack)
salvatore-campagna Mar 5, 2026
e01c35b
docs: fix missing Javadoc across pipeline classes
salvatore-campagna Mar 5, 2026
52a5ba5
refactor: introduce TransformSpec/PayloadSpec marker interfaces
salvatore-campagna Mar 5, 2026
a76ad35
docs: add package-level Javadoc for pipeline and numeric packages
salvatore-campagna Mar 5, 2026
5e4e692
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 5, 2026
48d6532
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 6, 2026
9d90c31
fix: remove redundant public modifier from MetadataBuffer constructors
salvatore-campagna Mar 5, 2026
79a0c4f
docs: add class-level Javadoc to MetadataWriter/MetadataReader explai…
salvatore-campagna Mar 6, 2026
c47448e
fix: remove unused EncodingContext constructor with metadataCapacity …
salvatore-campagna Mar 6, 2026
3c6da27
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 9, 2026
ed336af
Merge branch 'main' into feature/es94-pipeline-framework
salvatore-campagna Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline;

import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadDecoder;
import org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadEncoder;

import java.io.IOException;

/**
* Defines how each block of encoded values is written to the data file.
*
* <p>Data file layout:
* <pre>
* +------------------+----------------------------------------+
* | Block 0 | [bitmap][payload][stage metadata] |
* | Block 1 | [bitmap][payload][stage metadata] |
* | ... | ... |
* | Block N-1 | [bitmap][payload][stage metadata] |
* +------------------+----------------------------------------+
* | Block Offsets | DirectMonotonicWriter encoded offsets |
* +------------------+----------------------------------------+
* </pre>
*
* <p>Each block contains:
* <ul>
* <li><strong>bitmap</strong>: 1 byte ({@code <= 8} stages) or 2 bytes ({@code > 8} stages)
* indicating which stages were applied</li>
* <li><strong>payload</strong>: the encoded values written by the terminal payload stage</li>
* <li><strong>stage metadata</strong>: per-stage metadata written by transformation stages
* (e.g., GCD divisor)</li>
* </ul>
*
* <p>The layout is designed for sequential decoding: the bitmap comes first so the
* decoder immediately knows which stages to reverse, followed by the payload and
* then stage metadata in reverse stage order (see {@link EncodingContext#writeStageMetadata}).
* This means the decoder can read every section in a single forward pass with no
* seeking or buffering. See {@link FieldDescriptor} for the metadata file format
* that describes pipeline configuration.
*/
public final class BlockFormat {

private BlockFormat() {}

/**
* Writes a block of encoded values to the data output.
*
* @param out the data output stream
* @param values the values to encode
* @param payloadStage the terminal payload encoder
* @param context the encoding context with block metadata
* @throws IOException if an I/O error occurs
*/
public static void writeBlock(
final DataOutput out,
final long[] values,
final PayloadEncoder payloadStage,
final EncodingContext context
) throws IOException {
writeHeader(out, context);
payloadStage.encode(values, context.valueCount(), out, context);
context.writeStageMetadata(out);
}

/**
* Reads a block of encoded values from the data input.
*
* @param in the data input stream
* @param values the output array to populate
* @param payloadStage the terminal payload decoder
* @param context the decoding context with block metadata
* @param payloadPosition the pipeline position of the payload stage
* @return the number of values decoded
* @throws IOException if an I/O error occurs
*/
public static int readBlock(
final DataInput in,
final long[] values,
final PayloadDecoder payloadStage,
final DecodingContext context,
int payloadPosition
) throws IOException {
readHeader(in, context);
if (context.isStageApplied(payloadPosition) == false) {
throw new IOException("Payload stage not applied - possible data corruption");
}
return payloadStage.decode(values, in, context);
}

static void writeHeader(final DataOutput out, final EncodingContext context) throws IOException {
final short bitmap = context.positionBitmap();
if (context.pipelineLength() <= 8) {
out.writeByte((byte) bitmap);
} else {
out.writeShort(bitmap);
}
}

static void readHeader(final DataInput in, final DecodingContext context) throws IOException {
final int pipelineLength = context.pipelineLength();
if (pipelineLength <= 0) {
throw new IOException("Pipeline must be set for decoding");
}

final short bitmap;
if (pipelineLength <= 8) {
bitmap = (short) (in.readByte() & 0xFF);
} else {
bitmap = in.readShort();
}
context.setPositionBitmap(bitmap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline;

import org.apache.lucene.store.DataInput;

import java.io.IOException;

/**
* Mutable per-block context for decoding, tracking the position bitmap and
* delegating metadata reads to the underlying {@link DataInput}. Reused
* across blocks via {@link #clear()}.
*/
public final class DecodingContext implements MetadataReader {

private final int pipelineLength;
private final int blockSize;

private DataInput dataInput;
private short positionBitmap;

/**
* Creates a decoding context.
*
* @param blockSize the number of values per block
* @param pipelineLength the number of stages in the pipeline
*/
public DecodingContext(int blockSize, int pipelineLength) {
this.blockSize = blockSize;
this.pipelineLength = pipelineLength;
}

/**
* Sets the data input stream for reading block data and stage metadata.
*
* @param dataInput the data input stream
*/
public void setDataInput(final DataInput dataInput) {
this.dataInput = dataInput;
}

/**
* Returns the number of stages in the pipeline.
*
* @return the pipeline length
*/
public int pipelineLength() {
return pipelineLength;
}

/**
* Sets the bitmap of applied stage positions, read from the block header.
*
* @param bitmap the position bitmap
*/
void setPositionBitmap(short bitmap) {
this.positionBitmap = bitmap;
}

/**
* Returns {@code true} if the stage at the given position was applied.
*
* @param position the zero-based stage index
* @return whether the stage was applied
*/
public boolean isStageApplied(int position) {
assert position >= 0 && position < pipelineLength : "Position out of range: " + position;
return (positionBitmap & (1 << position)) != 0;
}

/**
* Returns the metadata reader for accessing stage metadata.
*
* @return the metadata reader
*/
public MetadataReader metadata() {
return this;
}

/**
* Returns the block size.
*
* @return the number of values per block
*/
public int blockSize() {
return blockSize;
}

/**
* Resets this context for reuse with the next block.
*
* <p>NOTE: dataInput is intentionally nulled. Unlike EncodingContext which
* owns its MetadataBuffer, DecodingContext does not own the DataInput (it is injected).
* Nulling forces the caller to provide a fresh DataInput via {@link #setDataInput} before
* each block, which is a fail-fast against silently reading garbage from a stale stream.
*/
public void clear() {
positionBitmap = 0;
dataInput = null;
}

@Override
public byte readByte() throws IOException {
return dataInput.readByte();
}

@Override
public int readZInt() throws IOException {
return dataInput.readZInt();
}

@Override
public long readZLong() throws IOException {
return dataInput.readZLong();
}

@Override
public long readLong() throws IOException {
return dataInput.readLong();
}

@Override
public int readInt() throws IOException {
return dataInput.readInt();
}

@Override
public int readVInt() throws IOException {
return dataInput.readVInt();
}

@Override
public long readVLong() throws IOException {
return dataInput.readVLong();
}

@Override
public void readBytes(final byte[] bytes, int offset, int length) throws IOException {
dataInput.readBytes(bytes, offset, length);
}
}
Loading