diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/BlockFormat.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/BlockFormat.java new file mode 100644 index 0000000000000..447bdb30366eb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/BlockFormat.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadDecoder; +import org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadEncoder; + +import java.io.IOException; + +/** + * Defines how each block of encoded values is written to the data file. + * + *
Data file layout: + *
+ * +------------------+----------------------------------------+ + * | Block 0 | [bitmap][payload][stage metadata] | + * | Block 1 | [bitmap][payload][stage metadata] | + * | ... | ... | + * | Block N-1 | [bitmap][payload][stage metadata] | + * +------------------+----------------------------------------+ + * | Block Offsets | DirectMonotonicWriter encoded offsets | + * +------------------+----------------------------------------+ + *+ * + *
Each block contains: + *
The layout is designed for sequential decoding: the bitmap comes first so the + * decoder immediately knows which stages to reverse, followed by the payload and + * then stage metadata in reverse stage order (see {@link EncodingContext#writeStageMetadata}). + * This means the decoder can read every section in a single forward pass with no + * seeking or buffering. See {@link FieldDescriptor} for the metadata file format + * that describes pipeline configuration. + */ +public final class BlockFormat { + + private BlockFormat() {} + + /** + * Writes a block of encoded values to the data output. + * + * @param out the data output stream + * @param values the values to encode + * @param payloadStage the terminal payload encoder + * @param context the encoding context with block metadata + * @throws IOException if an I/O error occurs + */ + public static void writeBlock( + final DataOutput out, + final long[] values, + final PayloadEncoder payloadStage, + final EncodingContext context + ) throws IOException { + writeHeader(out, context); + payloadStage.encode(values, context.valueCount(), out, context); + context.writeStageMetadata(out); + } + + /** + * Reads a block of encoded values from the data input. + * + * @param in the data input stream + * @param values the output array to populate + * @param payloadStage the terminal payload decoder + * @param context the decoding context with block metadata + * @param payloadPosition the pipeline position of the payload stage + * @return the number of values decoded + * @throws IOException if an I/O error occurs + */ + public static int readBlock( + final DataInput in, + final long[] values, + final PayloadDecoder payloadStage, + final DecodingContext context, + int payloadPosition + ) throws IOException { + readHeader(in, context); + if (context.isStageApplied(payloadPosition) == false) { + throw new IOException("Payload stage not applied - possible data corruption"); + } + return payloadStage.decode(values, in, context); + } + + static void writeHeader(final DataOutput out, final EncodingContext context) throws IOException { + final short bitmap = context.positionBitmap(); + if (context.pipelineLength() <= 8) { + out.writeByte((byte) bitmap); + } else { + out.writeShort(bitmap); + } + } + + static void readHeader(final DataInput in, final DecodingContext context) throws IOException { + final int pipelineLength = context.pipelineLength(); + if (pipelineLength <= 0) { + throw new IOException("Pipeline must be set for decoding"); + } + + final short bitmap; + if (pipelineLength <= 8) { + bitmap = (short) (in.readByte() & 0xFF); + } else { + bitmap = in.readShort(); + } + context.setPositionBitmap(bitmap); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/DecodingContext.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/DecodingContext.java new file mode 100644 index 0000000000000..b83970bc750ac --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/DecodingContext.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +import org.apache.lucene.store.DataInput; + +import java.io.IOException; + +/** + * Mutable per-block context for decoding, tracking the position bitmap and + * delegating metadata reads to the underlying {@link DataInput}. Reused + * across blocks via {@link #clear()}. + */ +public final class DecodingContext implements MetadataReader { + + private final int pipelineLength; + private final int blockSize; + + private DataInput dataInput; + private short positionBitmap; + + /** + * Creates a decoding context. + * + * @param blockSize the number of values per block + * @param pipelineLength the number of stages in the pipeline + */ + public DecodingContext(int blockSize, int pipelineLength) { + this.blockSize = blockSize; + this.pipelineLength = pipelineLength; + } + + /** + * Sets the data input stream for reading block data and stage metadata. + * + * @param dataInput the data input stream + */ + public void setDataInput(final DataInput dataInput) { + this.dataInput = dataInput; + } + + /** + * Returns the number of stages in the pipeline. + * + * @return the pipeline length + */ + public int pipelineLength() { + return pipelineLength; + } + + /** + * Sets the bitmap of applied stage positions, read from the block header. + * + * @param bitmap the position bitmap + */ + void setPositionBitmap(short bitmap) { + this.positionBitmap = bitmap; + } + + /** + * Returns {@code true} if the stage at the given position was applied. + * + * @param position the zero-based stage index + * @return whether the stage was applied + */ + public boolean isStageApplied(int position) { + assert position >= 0 && position < pipelineLength : "Position out of range: " + position; + return (positionBitmap & (1 << position)) != 0; + } + + /** + * Returns the metadata reader for accessing stage metadata. + * + * @return the metadata reader + */ + public MetadataReader metadata() { + return this; + } + + /** + * Returns the block size. + * + * @return the number of values per block + */ + public int blockSize() { + return blockSize; + } + + /** + * Resets this context for reuse with the next block. + * + *
NOTE: dataInput is intentionally nulled. Unlike EncodingContext which + * owns its MetadataBuffer, DecodingContext does not own the DataInput (it is injected). + * Nulling forces the caller to provide a fresh DataInput via {@link #setDataInput} before + * each block, which is a fail-fast against silently reading garbage from a stale stream. + */ + public void clear() { + positionBitmap = 0; + dataInput = null; + } + + @Override + public byte readByte() throws IOException { + return dataInput.readByte(); + } + + @Override + public int readZInt() throws IOException { + return dataInput.readZInt(); + } + + @Override + public long readZLong() throws IOException { + return dataInput.readZLong(); + } + + @Override + public long readLong() throws IOException { + return dataInput.readLong(); + } + + @Override + public int readInt() throws IOException { + return dataInput.readInt(); + } + + @Override + public int readVInt() throws IOException { + return dataInput.readVInt(); + } + + @Override + public long readVLong() throws IOException { + return dataInput.readVLong(); + } + + @Override + public void readBytes(final byte[] bytes, int offset, int length) throws IOException { + dataInput.readBytes(bytes, offset, length); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/EncodingContext.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/EncodingContext.java new file mode 100644 index 0000000000000..79149c9927bda --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/EncodingContext.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +import org.apache.lucene.store.DataOutput; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Mutable per-block context for encoding, tracking the position bitmap, metadata buffer, + * and position offsets. Reused across blocks via {@link #clear()}. + */ +public final class EncodingContext { + + private final MetadataBuffer metadataBuffer; + private final int[] positionOffsets; + private final int blockSize; + private final int pipelineLength; + + private int valueCount; + private short positionBitmap; + private int currentPosition; + + /** + * Creates an encoding context with default metadata buffer capacity. + * + * @param blockSize the number of values per block + * @param pipelineLength the number of stages in the pipeline + */ + public EncodingContext(int blockSize, int pipelineLength) { + this(blockSize, pipelineLength, new MetadataBuffer()); + } + + EncodingContext(int blockSize, int pipelineLength, final MetadataBuffer metadataBuffer) { + this.blockSize = blockSize; + this.pipelineLength = pipelineLength; + this.metadataBuffer = metadataBuffer; + this.positionOffsets = new int[PipelineDescriptor.MAX_PIPELINE_LENGTH]; + this.valueCount = 0; + this.positionBitmap = 0; + this.currentPosition = -1; + } + + /** + * Returns the number of stages in the pipeline. + * + * @return the pipeline length + */ + public int pipelineLength() { + return pipelineLength; + } + + /** + * Sets the current stage position before invoking each stage's encode method. + * + *
NOTE: Internal use only - called by the encode pipeline. + * + * @param position the zero-based stage index + */ + public void setCurrentPosition(int position) { + if (position < 0 || position >= pipelineLength) { + throw new IllegalArgumentException("Position out of range: " + position); + } + this.currentPosition = position; + } + + /** + * Records that the stage at the given position was applied. + * Tracks the metadata buffer offset on the first call for each position. + * + * @param position the zero-based stage index + */ + public void applyStage(int position) { + assert position >= 0 && position < pipelineLength : "Position out of range: " + position; + if ((positionBitmap & (1 << position)) == 0) { + positionOffsets[position] = metadataBuffer.size(); + } + positionBitmap = (short) (positionBitmap | (1 << position)); + } + + /** + * Returns {@code true} if the stage at the given position was applied. + * + * @param position the zero-based stage index + * @return whether the stage was applied + */ + public boolean isStageApplied(int position) { + assert position >= 0 && position < pipelineLength : "Position out of range: " + position; + return (positionBitmap & (1 << position)) != 0; + } + + /** + * Returns the bitmap of applied stage positions. + * + * @return the position bitmap + */ + public short positionBitmap() { + return positionBitmap; + } + + /** + * Returns the metadata writer, marking the current position as applied. + * + * @return the metadata writer + * @throws AssertionError if the current position has not been set + */ + public MetadataWriter metadata() { + assert currentPosition >= 0 : "Current position not set"; + applyStage(currentPosition); + return metadataBuffer; + } + + /** + * Flushes stage metadata to disk in reverse stage order (N-1 first, 0 last). + * During encoding, stages run forward so metadata accumulates in the buffer in + * forward order. During decoding, stages run in reverse and the decoder reads + * metadata directly from the stream with no buffering. By reordering on flush, + * the on-disk layout matches decode execution order, giving the decoder a free + * sequential read without seeking or knowing variable-length metadata sizes. + * + * @param out the data output stream + * @throws IOException if an I/O error occurs + */ + public void writeStageMetadata(final DataOutput out) throws IOException { + // NOTE: pipelineLength() - 1 excludes the payload stage (last position). + // Payload stages write directly to the DataOutput via BlockFormat.writeBlock, + // not through the MetadataBuffer, so there is nothing to flush for them. + final int numStages = pipelineLength() - 1; + int nextEndOffset = metadataBuffer.size(); + for (int pos = numStages - 1; pos >= 0; pos--) { + if (isStageApplied(pos)) { + final int startOffset = positionOffsets[pos]; + final int size = nextEndOffset - startOffset; + if (size > 0) { + metadataBuffer.writeTo(out, startOffset, size); + } + nextEndOffset = startOffset; + } + } + } + + /** + * Returns the block size. + * + * @return the number of values per block + */ + public int blockSize() { + return blockSize; + } + + /** + * Sets the number of values in the current block. + * + * @param count the value count + */ + public void setValueCount(int count) { + this.valueCount = count; + } + + /** + * Returns the number of values in the current block. + * + * @return the value count + */ + public int valueCount() { + return valueCount; + } + + /** Resets this context for reuse with the next block. */ + public void clear() { + metadataBuffer.clear(); + Arrays.fill(positionOffsets, 0); + valueCount = 0; + positionBitmap = 0; + currentPosition = -1; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/FieldDescriptor.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/FieldDescriptor.java new file mode 100644 index 0000000000000..3028d7e7cb7ea --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/FieldDescriptor.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; + +import java.io.IOException; + +/** + * Versioned envelope for the {@link PipelineDescriptor} wire format. + * + *
Every {@link PipelineDescriptor} written to {@code .dvm} metadata is prefixed + * with a format version VInt, enabling future format evolution without breaking + * segment compatibility. Block offsets and value counts are managed by the + * consumer/producer's {@code writeField}/{@code readNumeric} flow. + */ +public final class FieldDescriptor { + + static final int CURRENT_FORMAT_VERSION = 1; + + private FieldDescriptor() {} + + /** + * Writes a versioned pipeline descriptor to the metadata output. + * + * @param meta the metadata output stream + * @param pipeline the pipeline descriptor to persist + * @throws IOException if an I/O error occurs + */ + public static void write(final DataOutput meta, final PipelineDescriptor pipeline) throws IOException { + meta.writeVInt(CURRENT_FORMAT_VERSION); + pipeline.writeTo(meta); + } + + /** + * Reads a versioned pipeline descriptor from the metadata input. + * + * @param meta the metadata input stream + * @return the deserialized pipeline descriptor + * @throws IOException if the format version is unsupported or an I/O error occurs + */ + public static PipelineDescriptor read(final DataInput meta) throws IOException { + final int formatVersion = meta.readVInt(); + if (formatVersion != CURRENT_FORMAT_VERSION) { + throw new IOException( + "Unsupported FieldDescriptor format version: " + + formatVersion + + ". Maximum supported version is " + + CURRENT_FORMAT_VERSION + + ". This may indicate data written by a newer version of Elasticsearch." + ); + } + return PipelineDescriptor.readFrom(meta); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataBuffer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataBuffer.java new file mode 100644 index 0000000000000..f559e714d13c3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataBuffer.java @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; + +import java.io.IOException; + +/** + * In-memory auto-growing buffer for stage metadata. Implements {@link MetadataWriter} + * for use during encoding, and provides {@link #writeTo} for flushing to a + * {@link DataOutput}. + * + *
The byte layout for {@link #writeLong} matches Lucene's {@code DataOutput.writeLong} + * format (two little-endian ints, low int first) for compatibility with + * {@code DataInput.readLong}. + */ +final class MetadataBuffer implements MetadataWriter { + + private static final int DEFAULT_CAPACITY = 64; + private static final int MAX_VINT_BYTES = 5; + private static final int MAX_VLONG_BYTES = 10; + + private byte[] data; + private int dataSize; + + MetadataBuffer() { + this(DEFAULT_CAPACITY); + } + + /** + * Creates a metadata buffer with the specified initial capacity. + * + * @param initialCapacity the initial buffer capacity in bytes + */ + MetadataBuffer(int initialCapacity) { + this.data = new byte[initialCapacity]; + this.dataSize = 0; + } + + /** + * Returns the number of bytes currently stored. + * + * @return the buffer size in bytes + */ + public int size() { + return dataSize; + } + + /** + * Writes a slice of the buffer to the given output. + * + * @param out the output to write to + * @param offset the start offset within this buffer + * @param length the number of bytes to write + */ + public void writeTo(final DataOutput out, int offset, int length) throws IOException { + assert offset >= 0 && length >= 0 && offset + length <= dataSize + : "Invalid slice [offset=" + offset + ", length=" + length + ", size=" + dataSize + "]"; + if (length > 0) { + out.writeBytes(data, offset, length); + } + } + + /** Resets the buffer for reuse without releasing the backing array. */ + public void clear() { + dataSize = 0; + } + + private void ensureCapacity(int additional) { + if (data.length < dataSize + additional) { + data = ArrayUtil.grow(data, dataSize + additional); + } + } + + @Override + public MetadataWriter writeByte(byte value) { + ensureCapacity(Byte.BYTES); + data[dataSize++] = value; + return this; + } + + @Override + public MetadataWriter writeZInt(int value) { + return encodeVInt((value >> 31) ^ (value << 1)); + } + + @Override + public MetadataWriter writeZLong(long value) { + return encodeVLong((value >> 63) ^ (value << 1)); + } + + @Override + public MetadataWriter writeLong(long value) { + ensureCapacity(Long.BYTES); + final int lo = (int) value; + final int hi = (int) (value >> 32); + data[dataSize++] = (byte) lo; + data[dataSize++] = (byte) (lo >> 8); + data[dataSize++] = (byte) (lo >> 16); + data[dataSize++] = (byte) (lo >> 24); + data[dataSize++] = (byte) hi; + data[dataSize++] = (byte) (hi >> 8); + data[dataSize++] = (byte) (hi >> 16); + data[dataSize++] = (byte) (hi >> 24); + return this; + } + + @Override + public MetadataWriter writeInt(int value) { + ensureCapacity(Integer.BYTES); + data[dataSize++] = (byte) value; + data[dataSize++] = (byte) (value >> 8); + data[dataSize++] = (byte) (value >> 16); + data[dataSize++] = (byte) (value >> 24); + return this; + } + + @Override + public MetadataWriter writeVInt(int value) { + assert value >= 0 : "writeVInt requires non-negative value, got: " + value + ". Use writeZInt for signed values"; + return encodeVInt(value); + } + + @Override + public MetadataWriter writeVLong(long value) { + assert value >= 0 : "writeVLong requires non-negative value, got: " + value + ". Use writeZLong for signed values"; + return encodeVLong(value); + } + + private MetadataWriter encodeVInt(int value) { + ensureCapacity(MAX_VINT_BYTES); + while ((value & ~0x7F) != 0) { + data[dataSize++] = (byte) ((value & 0x7F) | 0x80); + value >>>= 7; + } + data[dataSize++] = (byte) value; + return this; + } + + private MetadataWriter encodeVLong(long value) { + ensureCapacity(MAX_VLONG_BYTES); + for (int i = 0; i < 9 && (value & ~0x7FL) != 0; i++) { + data[dataSize++] = (byte) ((value & 0x7FL) | 0x80L); + value >>>= 7; + } + data[dataSize++] = (byte) value; + return this; + } + + @Override + public MetadataWriter writeBytes(final byte[] bytes, int offset, int length) { + ensureCapacity(length); + System.arraycopy(bytes, offset, data, dataSize, length); + dataSize += length; + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataReader.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataReader.java new file mode 100644 index 0000000000000..78d51e6b28c17 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataReader.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +import java.io.IOException; + +/** + * Reads stage metadata values from a buffer during decoding. + * + *
This is the decode-side counterpart of {@link MetadataWriter}. See that interface for the + * rationale behind using a dedicated interface rather than Lucene's {@code DataInput} or + * Elasticsearch's {@code StreamInput}. + * + * @see MetadataWriter + * @see DecodingContext + */ +public interface MetadataReader { + + /** + * Reads a single byte. + * + * @return the byte value + * @throws IOException if an I/O error occurs + */ + byte readByte() throws IOException; + + /** + * Reads a zigzag-encoded variable-length integer. + * + * @return the decoded integer + * @throws IOException if an I/O error occurs + */ + int readZInt() throws IOException; + + /** + * Reads a zigzag-encoded variable-length long. + * + * @return the decoded long + * @throws IOException if an I/O error occurs + */ + long readZLong() throws IOException; + + /** + * Reads a fixed 8-byte long in Lucene format. + * + * @return the long value + * @throws IOException if an I/O error occurs + */ + long readLong() throws IOException; + + /** + * Reads a fixed 4-byte integer in Lucene format. + * + * @return the integer value + * @throws IOException if an I/O error occurs + */ + int readInt() throws IOException; + + /** + * Reads a variable-length integer. + * + * @return the decoded integer + * @throws IOException if an I/O error occurs + */ + int readVInt() throws IOException; + + /** + * Reads a variable-length long. + * + * @return the decoded long + * @throws IOException if an I/O error occurs + */ + long readVLong() throws IOException; + + /** + * Reads bytes into the given array. + * + * @param bytes the destination array + * @param offset the offset in the array + * @param length the number of bytes to read + * @throws IOException if an I/O error occurs + */ + void readBytes(byte[] bytes, int offset, int length) throws IOException; + + /** + * Reads bytes into the given array starting at offset 0. + * + * @param bytes the destination array + * @throws IOException if an I/O error occurs + */ + default void readBytes(final byte[] bytes) throws IOException { + readBytes(bytes, 0, bytes.length); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataWriter.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataWriter.java new file mode 100644 index 0000000000000..8378eb464dd2f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataWriter.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +/** + * Writes stage metadata values to a buffer during encoding. Supports method chaining. + * + *
This is a dedicated interface rather than Lucene's {@code DataOutput} or Elasticsearch's + * {@code StreamOutput} because the block layout places stage metadata after the payload + * on disk ({@code [bitmap][payload][stage metadata]}), while transform stages produce metadata + * before the payload during encoding. The encode pipeline buffers metadata in memory + * and flushes it after the payload is written (see {@link EncodingContext#writeStageMetadata}). + * + *
This design favors the decode path: the decoder reads bitmap, payload, then metadata in a + * single forward pass with no buffering or seeking. Since decoding is far more frequent than + * encoding, the buffering cost is pushed to the encode side (once per block). + * + *
By decoupling stages from the buffering strategy, each stage writes metadata through this + * minimal interface without knowledge of the block layout or metadata ordering. This also + * simplifies backwards compatibility: the block layout or metadata ordering can change without + * modifying any stage implementation. + * + * @see MetadataReader + * @see EncodingContext#writeStageMetadata(org.apache.lucene.store.DataOutput) + */ +public interface MetadataWriter { + + /** + * Writes a single byte. + * + * @param value the byte to write + * @return this writer for chaining + */ + MetadataWriter writeByte(byte value); + + /** + * Writes a zigzag-encoded variable-length integer. + * + * @param value the integer to write + * @return this writer for chaining + */ + MetadataWriter writeZInt(int value); + + /** + * Writes a zigzag-encoded variable-length long. + * + * @param value the long to write + * @return this writer for chaining + */ + MetadataWriter writeZLong(long value); + + /** + * Writes a fixed 8-byte long in Lucene format. + * + * @param value the long to write + * @return this writer for chaining + */ + MetadataWriter writeLong(long value); + + /** + * Writes a fixed 4-byte integer in Lucene format. + * + * @param value the integer to write + * @return this writer for chaining + */ + MetadataWriter writeInt(int value); + + /** + * Writes a variable-length integer. + * + * @param value the integer to write + * @return this writer for chaining + */ + MetadataWriter writeVInt(int value); + + /** + * Writes a variable-length long. + * + * @param value the long to write + * @return this writer for chaining + */ + MetadataWriter writeVLong(long value); + + /** + * Writes bytes from the given array. + * + * @param bytes the source array + * @param offset the offset in the array + * @param length the number of bytes to write + * @return this writer for chaining + */ + MetadataWriter writeBytes(byte[] bytes, int offset, int length); + + /** + * Writes all bytes from the given array. + * + * @param bytes the source array + * @return this writer for chaining + */ + default MetadataWriter writeBytes(final byte[] bytes) { + return writeBytes(bytes, 0, bytes.length); + } + + /** + * Marks the stage as applied without writing any metadata. + * Use this for stages that always transform values but don't need + * to store any parameters for decoding. + */ + default void empty() {} +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/PipelineConfig.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/PipelineConfig.java new file mode 100644 index 0000000000000..39d8fe1bf2801 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/PipelineConfig.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.pipeline; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Immutable specification for a field's encoding pipeline. + * + *
Captures the data type, block size, and ordered stage specifications for
+ * deferred codec construction. Use {@link #forLongs} to start building a
+ * configuration via the fluent builder API.
+ *
+ * @param dataType the numeric data type
+ * @param blockSize the number of values per block (must be a positive power of 2)
+ * @param specs the ordered stage specifications
+ */
+public record PipelineConfig(PipelineDescriptor.DataType dataType, int blockSize, List {@code PipelineDescriptor} is the contract between encoder and decoder. The
+ * {@link #writeTo}/{@link #readFrom} methods are used by {@link FieldDescriptor}
+ * to create self-describing formats. Format evolution is handled by the version
+ * byte in {@link FieldDescriptor}.
+ *
+ * Wire format: {@code [VInt stageCount] [byte blockShift] [byte dataType] [byte[] stageIds]}.
+ * Scalars come before the variable-length stage array so the decoder can configure
+ * block size and data type before iterating stage IDs.
+ *
+ * Example: a {@code delta>gcd>bitPack} pipeline on longs with {@code blockSize=128}
+ * (blockShift=7) serializes as {@code [03] [07] [00] [01 03 A1]}.
+ */
+public final class PipelineDescriptor {
+
+ /** Maximum number of stages in a pipeline. */
+ public static final int MAX_PIPELINE_LENGTH = 16;
+
+ /** The numeric data type stored in encoded blocks. */
+ public enum DataType {
+ /** 64-bit signed integer values. */
+ LONG((byte) 0x00),
+ /** 64-bit IEEE 754 floating-point values. */
+ DOUBLE((byte) 0x01),
+ /** 32-bit IEEE 754 floating-point values. */
+ FLOAT((byte) 0x02);
+
+ /** Persisted byte identifier for this data type. */
+ public final byte id;
+
+ DataType(byte id) {
+ this.id = id;
+ }
+
+ /**
+ * Resolves a persisted byte identifier back to its {@link DataType}.
+ *
+ * @param id the byte identifier read from the encoded data
+ * @return the corresponding {@link DataType}
+ * @throws IOException if the identifier is unknown (corrupt data)
+ */
+ public static DataType fromId(byte id) throws IOException {
+ return switch (id) {
+ case 0x00 -> LONG;
+ case 0x01 -> DOUBLE;
+ case 0x02 -> FLOAT;
+ default -> throw new IOException("Unknown DataType: 0x" + Integer.toHexString(id & 0xFF));
+ };
+ }
+ }
+
+ private final byte[] stageIds;
+ private final byte blockShift;
+ private final DataType dataType;
+
+ /**
+ * Creates a descriptor for a long (integral) pipeline.
+ *
+ * @param stageIds the ordered stage identifiers
+ * @param blockSize the number of values per block (must be a power of 2)
+ */
+ public PipelineDescriptor(final byte[] stageIds, int blockSize) {
+ this(stageIds, blockSize, DataType.LONG);
+ }
+
+ /**
+ * Creates a descriptor with the specified data type.
+ *
+ * @param stageIds the ordered stage identifiers
+ * @param blockSize the number of values per block (must be a power of 2)
+ * @param dataType the numeric data type this pipeline operates on
+ */
+ public PipelineDescriptor(final byte[] stageIds, int blockSize, final DataType dataType) {
+ if (stageIds == null || stageIds.length == 0) {
+ throw new IllegalArgumentException("Pipeline must have at least one stage");
+ }
+ if (stageIds.length > MAX_PIPELINE_LENGTH) {
+ throw new IllegalArgumentException("Pipeline length " + stageIds.length + " exceeds maximum " + MAX_PIPELINE_LENGTH);
+ }
+ if (blockSize <= 0 || (blockSize & (blockSize - 1)) != 0) {
+ throw new IllegalArgumentException("Block size must be a positive power of 2: " + blockSize);
+ }
+ this.stageIds = stageIds.clone();
+ this.blockShift = (byte) Integer.numberOfTrailingZeros(blockSize);
+ this.dataType = dataType;
+ }
+
+ private PipelineDescriptor(final byte[] stageIds, byte blockShift, final DataType dataType) {
+ this.stageIds = stageIds;
+ this.blockShift = blockShift;
+ this.dataType = dataType;
+ }
+
+ /**
+ * Returns the number of stages in this pipeline.
+ *
+ * @return the pipeline length
+ */
+ public int pipelineLength() {
+ return stageIds.length;
+ }
+
+ /**
+ * Returns the stage identifier at the given position.
+ *
+ * @param position the zero-based stage index
+ * @return the byte identifier at that position
+ */
+ public byte stageIdAt(int position) {
+ return stageIds[position];
+ }
+
+ /**
+ * Returns the block size (number of values per block).
+ *
+ * @return the block size
+ */
+ public int blockSize() {
+ return 1 << blockShift;
+ }
+
+ /**
+ * Returns the block shift (log2 of block size).
+ *
+ * @return the block shift
+ */
+ int blockShift() {
+ return blockShift;
+ }
+
+ /**
+ * Returns the number of bytes needed for the per-block position bitmap.
+ * Uses 1 byte for pipelines with 8 or fewer stages, 2 bytes otherwise.
+ *
+ * @return the bitmap size in bytes (1 or 2)
+ */
+ int bitmapBytes() {
+ return pipelineLength() <= 8 ? 1 : 2;
+ }
+
+ /**
+ * Returns a defensive copy of the stage identifier array.
+ *
+ * @return a copy of the stage identifiers
+ */
+ byte[] stageIds() {
+ return stageIds.clone();
+ }
+
+ /**
+ * Returns the data type this pipeline operates on.
+ *
+ * @return the data type
+ */
+ public DataType dataType() {
+ return dataType;
+ }
+
+ /**
+ * Returns a new descriptor with the specified block size, or this instance if unchanged.
+ *
+ * @param blockSize the desired block size (must be a power of 2)
+ * @return a descriptor with the given block size
+ */
+ public PipelineDescriptor withBlockSize(int blockSize) {
+ final byte newBlockShift = (byte) Integer.numberOfTrailingZeros(blockSize);
+ if (newBlockShift == this.blockShift) {
+ return this;
+ }
+ return new PipelineDescriptor(stageIds.clone(), blockSize, dataType);
+ }
+
+ /**
+ * Serializes this descriptor to the given output.
+ *
+ * @param out the data output stream
+ * @throws IOException if an I/O error occurs
+ */
+ void writeTo(final DataOutput out) throws IOException {
+ out.writeVInt(stageIds.length);
+ out.writeByte(blockShift);
+ out.writeByte(dataType.id);
+ out.writeBytes(stageIds, 0, stageIds.length);
+ }
+
+ /**
+ * Deserializes a descriptor from the given input.
+ *
+ * @param in the data input stream
+ * @return the deserialized descriptor
+ * @throws IOException if an I/O error occurs or the data is invalid
+ */
+ static PipelineDescriptor readFrom(final DataInput in) throws IOException {
+ final int length = in.readVInt();
+ if (length <= 0 || length > MAX_PIPELINE_LENGTH) {
+ throw new IOException("Invalid pipeline length: " + length);
+ }
+ final byte blockShift = in.readByte();
+ if (blockShift < 0 || blockShift > 30) {
+ throw new IOException("Invalid block shift: " + blockShift);
+ }
+ final DataType dataType = DataType.fromId(in.readByte());
+ final byte[] stageIds = new byte[length];
+ in.readBytes(stageIds, 0, length);
+ return new PipelineDescriptor(stageIds, blockShift, dataType);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final PipelineDescriptor that = (PipelineDescriptor) o;
+ return blockShift == that.blockShift && dataType == that.dataType && Arrays.equals(stageIds, that.stageIds);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Arrays.hashCode(stageIds);
+ result = 31 * result + blockShift;
+ result = 31 * result + dataType.id;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("PipelineDescriptor{stages=[");
+ for (int i = 0; i < stageIds.length; i++) {
+ if (i > 0) sb.append(", ");
+ try {
+ sb.append(StageId.fromId(stageIds[i]).displayName);
+ } catch (final IllegalArgumentException e) {
+ sb.append("0x").append(Integer.toHexString(stageIds[i] & 0xFF));
+ }
+ }
+ sb.append("], blockSize=").append(blockSize()).append(", dataType=").append(dataType).append("}");
+ return sb.toString();
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/StageId.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/StageId.java
new file mode 100644
index 0000000000000..914e21280f70d
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/StageId.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+/**
+ * Unique byte identifiers for pipeline stages, persisted in the encoded data.
+ *
+ * Stage IDs are the wire format contract between encoder and decoder. Once assigned,
+ * an ID must never change or be reused, as doing so would break the
+ * ability to decode existing data. ID {@code 0x00} is reserved for test-only stages
+ * (see {@code TestPayloadCodecStage}).
+ *
+ * Transform stages (IDs below {@code 0xA1}) are applied in sequence and are reversible.
+ * Terminal payload stages (IDs at or above {@code 0xA1}) serialize values to bytes as the
+ * final step of the encode pipeline. ID {@code 0xA0} is reserved for test-only payload stages.
+ *
+ */
+public enum StageId {
+ /** Delta encoding transform stage. */
+ DELTA_STAGE((byte) 0x01, "delta"),
+ /** Offset removal transform stage. */
+ OFFSET_STAGE((byte) 0x02, "offset"),
+ /** GCD factoring transform stage. */
+ GCD_STAGE((byte) 0x03, "gcd"),
+
+ /** Bit-packing terminal payload stage. */
+ BITPACK_PAYLOAD((byte) 0xA1, "bitPack");
+
+ /** Persisted byte identifier. Must never change once assigned. */
+ public final byte id;
+
+ /** Human-readable name for logging and diagnostics. */
+ public final String displayName;
+
+ /**
+ * @param id the persisted byte identifier
+ * @param displayName the human-readable name for logging
+ */
+ StageId(byte id, final String displayName) {
+ this.id = id;
+ this.displayName = displayName;
+ }
+
+ /**
+ * Resolves a persisted byte identifier back to its {@link StageId} constant.
+ *
+ * @param id the byte identifier read from the encoded data
+ * @return the corresponding {@link StageId}
+ * @throws IllegalArgumentException if the identifier is unknown
+ */
+ public static StageId fromId(byte id) {
+ return switch (id) {
+ case (byte) 0x01 -> DELTA_STAGE;
+ case (byte) 0x02 -> OFFSET_STAGE;
+ case (byte) 0x03 -> GCD_STAGE;
+ case (byte) 0xA1 -> BITPACK_PAYLOAD;
+ default -> throw new IllegalArgumentException("Unknown stage ID: 0x" + Integer.toHexString(id & 0xFF));
+ };
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/StageSpec.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/StageSpec.java
new file mode 100644
index 0000000000000..2dda0dc4050f9
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/StageSpec.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+/**
+ * Sealed hierarchy capturing pipeline stage specifications.
+ *
+ * Each record represents a stage type. References {@link StageId} as the
+ * source of truth for persisted byte identifiers.
+ */
+public sealed interface StageSpec {
+
+ /**
+ * Returns the persisted stage identifier for this specification.
+ *
+ * @return the {@link StageId} for this stage
+ */
+ StageId stageId();
+
+ /** Marker for transform stages that can be chained in the pipeline. */
+ sealed interface TransformSpec extends StageSpec {}
+
+ /** Marker for terminal payload stages that serialize values to bytes. */
+ sealed interface PayloadSpec extends StageSpec {}
+
+ /** Delta encoding: stores differences between consecutive values. */
+ record DeltaStage() implements TransformSpec {
+ @Override
+ public StageId stageId() {
+ return StageId.DELTA_STAGE;
+ }
+ }
+
+ /** Offset removal: subtracts the minimum value from all entries. */
+ record OffsetStage() implements TransformSpec {
+ @Override
+ public StageId stageId() {
+ return StageId.OFFSET_STAGE;
+ }
+ }
+
+ /** GCD factoring: divides all values by their greatest common divisor. */
+ record GcdStage() implements TransformSpec {
+ @Override
+ public StageId stageId() {
+ return StageId.GCD_STAGE;
+ }
+ }
+
+ /** Bit-packing payload: packs values using the minimum number of bits. */
+ record BitPackPayload() implements PayloadSpec {
+ @Override
+ public StageId stageId() {
+ return StageId.BITPACK_PAYLOAD;
+ }
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/PayloadDecoder.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/PayloadDecoder.java
new file mode 100644
index 0000000000000..897d532b7941c
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/PayloadDecoder.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline.numeric;
+
+import org.apache.lucene.store.DataInput;
+import org.elasticsearch.index.codec.tsdb.pipeline.DecodingContext;
+
+import java.io.IOException;
+
+/**
+ * Deserializes bytes back to values as the terminal stage of the decode pipeline.
+ */
+public interface PayloadDecoder {
+
+ /**
+ * Returns the unique stage identifier.
+ *
+ * @return the stage ID byte
+ */
+ byte id();
+
+ /**
+ * Deserializes bytes from the input stream back to values.
+ *
+ * @param values the output array to populate
+ * @param in the data input stream
+ * @param context the decoding context with block metadata
+ * @return the number of values decoded
+ * @throws IOException if an I/O error occurs
+ */
+ int decode(long[] values, DataInput in, DecodingContext context) throws IOException;
+}
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/PayloadEncoder.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/PayloadEncoder.java
new file mode 100644
index 0000000000000..b754fcddf773d
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/PayloadEncoder.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline.numeric;
+
+import org.apache.lucene.store.DataOutput;
+import org.elasticsearch.index.codec.tsdb.pipeline.EncodingContext;
+
+import java.io.IOException;
+
+/**
+ * Serializes values to bytes as the terminal stage of the encode pipeline.
+ */
+public interface PayloadEncoder {
+
+ /**
+ * Returns the unique stage identifier.
+ *
+ * @return the stage ID byte
+ */
+ byte id();
+
+ /**
+ * Serializes values to the output stream.
+ *
+ * @param values the values to encode
+ * @param valueCount the number of valid values in the array
+ * @param out the data output stream
+ * @param context the encoding context with block metadata
+ * @throws IOException if an I/O error occurs
+ */
+ void encode(long[] values, int valueCount, DataOutput out, EncodingContext context) throws IOException;
+}
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/package-info.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/package-info.java
new file mode 100644
index 0000000000000..78d5683abdbab
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/numeric/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+/**
+ * Payload encoder/decoder contracts for numeric pipeline stages.
+ *
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadEncoder} and
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadDecoder} define
+ * the terminal stage contract: serialize transformed {@code long[]} values to bytes
+ * and read them back. Each implementation corresponds to a
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.StageSpec.PayloadSpec} and is
+ * identified by a {@link org.elasticsearch.index.codec.tsdb.pipeline.StageId} for
+ * wire-format lookup during decoding.
+ */
+package org.elasticsearch.index.codec.tsdb.pipeline.numeric;
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/package-info.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/package-info.java
new file mode 100644
index 0000000000000..3c24b463b7bf1
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/package-info.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+/**
+ * Composable encoding pipeline for TSDB numeric doc values.
+ *
+ * A pipeline is an ordered sequence of transform stages followed by a terminal
+ * payload stage. Transform stages (delta, offset, GCD) reduce value entropy;
+ * the payload stage (bit-packing) serializes the result.
+ *
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.StageSpec} is a sealed hierarchy
+ * with two markers:
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.StageSpec.TransformSpec TransformSpec}
+ * for chainable transform stages and
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.StageSpec.PayloadSpec PayloadSpec}
+ * for terminal stages that serialize values to bytes. Each stage type has a
+ * persisted byte identifier ({@link org.elasticsearch.index.codec.tsdb.pipeline.StageId})
+ * that serves as the wire-format contract.
+ *
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.PipelineConfig} provides a
+ * type-safe fluent builder for assembling pipelines at construction time.
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.PipelineDescriptor} is the compact
+ * wire-format representation (stage IDs, block size, data type) persisted in the
+ * metadata file, enabling per-field codec selection. The two are deliberately
+ * separate: {@code PipelineConfig} carries rich stage specifications
+ * ({@link org.elasticsearch.index.codec.tsdb.pipeline.StageSpec}) while
+ * {@code PipelineDescriptor} stores only the byte identifiers needed for decoding.
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.FieldDescriptor} wraps the
+ * descriptor with a format version for segment compatibility.
+ *
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.BlockFormat} is the block-level
+ * entry point for writing and reading encoded values.
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.EncodingContext} and
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.DecodingContext} carry per-block
+ * state (bitmap, value count) and provide metadata I/O via
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.MetadataWriter} /
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.MetadataReader}, decoupling
+ * individual stages from the block layout.
+ *
+ * Metadata is written after the payload so the decoder can read every section
+ * in a single forward pass with no seeking or buffering. See
+ * {@link org.elasticsearch.index.codec.tsdb.pipeline.BlockFormat} for details.
+ *
+ * Building a pipeline configuration:
+ * This stage is used by framework-level tests (e.g., {@link BlockFormatTests}) that need
+ * a concrete payload stage without depending on production stage implementations (PR 2+).
+ * ID {@code 0x00} is reserved for test-only stages and must never be persisted.
+ */
+public final class TestPayloadCodecStage implements PayloadEncoder, PayloadDecoder {
+
+ /** Singleton instance. */
+ public static final TestPayloadCodecStage INSTANCE = new TestPayloadCodecStage();
+
+ private TestPayloadCodecStage() {}
+
+ @Override
+ public byte id() {
+ throw new UnsupportedOperationException("Test-only stage cannot be persisted");
+ }
+
+ @Override
+ public void encode(final long[] values, final int valueCount, final DataOutput out, final EncodingContext context) throws IOException {
+ out.writeVInt(valueCount);
+ for (int i = 0; i < valueCount; i++) {
+ out.writeLong(values[i]);
+ }
+ }
+
+ @Override
+ public int decode(final long[] values, final DataInput in, final DecodingContext context) throws IOException {
+ final int valueCount = in.readVInt();
+ for (int i = 0; i < valueCount; i++) {
+ values[i] = in.readLong();
+ }
+ return valueCount;
+ }
+}
Stage types
+ * Construction vs wire format
+ * Encoding and decoding
+ * Block layout
+ *
+ * [bitmap][payload][stage metadata (reverse stage order)]
+ *
+ * Usage
+ *
+ * {@code
+ * PipelineConfig config = PipelineConfig.forLongs(128)
+ * .delta()
+ * .offset()
+ * .gcd()
+ * .bitPack();
+ * }
+ */
+package org.elasticsearch.index.codec.tsdb.pipeline;
diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/BlockFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/BlockFormatTests.java
new file mode 100644
index 0000000000000..3886830555b42
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/BlockFormatTests.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.ByteArrayDataOutput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class BlockFormatTests extends ESTestCase {
+
+ private static final int BASE_BLOCK_SIZE = 128;
+
+ private static int randomBlockSize() {
+ return BASE_BLOCK_SIZE << randomIntBetween(0, 7);
+ }
+
+ public void testWriteReadBlockRoundTrip() throws IOException {
+ final int blockSize = randomBlockSize();
+ final byte[] stageIds = { StageId.DELTA_STAGE.id, StageId.OFFSET_STAGE.id, StageId.BITPACK_PAYLOAD.id };
+ final EncodingContext encodingContext = createEncodingContext(stageIds, blockSize);
+
+ encodingContext.setCurrentPosition(0);
+ encodingContext.metadata().writeZLong(1000L);
+ encodingContext.applyStage(2);
+
+ final long[] values = new long[blockSize];
+ for (int i = 0; i < blockSize; i++) {
+ values[i] = randomLong();
+ }
+
+ final long[] decoded = new long[blockSize];
+ final DecodingContext decodingContext = writeAndRead(stageIds, values, decoded, encodingContext, blockSize, 2);
+
+ assertEquals(blockSize, decodingContext.blockSize());
+ assertTrue(decodingContext.isStageApplied(0));
+ assertFalse(decodingContext.isStageApplied(1));
+ assertTrue(decodingContext.isStageApplied(2));
+ assertArrayEquals(values, decoded);
+ }
+
+ public void testBitmapRoundTrip() throws IOException {
+ final int blockSize = randomBlockSize();
+ final int numStages = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final byte[] stageIds = new byte[numStages];
+ for (int i = 0; i < numStages; i++) {
+ stageIds[i] = (byte) (i + 1);
+ }
+ final EncodingContext encodingContext = createEncodingContext(stageIds, blockSize);
+
+ final boolean[] applied = new boolean[numStages];
+ applied[numStages - 1] = true;
+ encodingContext.applyStage(numStages - 1);
+ for (int i = 0; i < numStages - 1; i++) {
+ applied[i] = randomBoolean();
+ if (applied[i]) {
+ encodingContext.applyStage(i);
+ }
+ }
+
+ final DecodingContext decodingContext = writeAndRead(
+ stageIds,
+ new long[blockSize],
+ new long[blockSize],
+ encodingContext,
+ blockSize,
+ numStages - 1
+ );
+
+ for (int i = 0; i < numStages; i++) {
+ assertEquals("Position " + i, applied[i], decodingContext.isStageApplied(i));
+ }
+ }
+
+ public void testReadBlockThrowsWhenPipelineNotSet() {
+ final int blockSize = randomBlockSize();
+ final DecodingContext decodingContext = new DecodingContext(blockSize, 0);
+
+ final byte[] buffer = new byte[256];
+ final ByteArrayDataInput in = new ByteArrayDataInput(buffer);
+ final long[] values = new long[blockSize];
+
+ final IOException e = expectThrows(
+ IOException.class,
+ () -> BlockFormat.readBlock(in, values, TestPayloadCodecStage.INSTANCE, decodingContext, 0)
+ );
+ assertEquals("Pipeline must be set for decoding", e.getMessage());
+ }
+
+ public void testEmptyBitmapThrowsOnMissingPayload() throws IOException {
+ final int blockSize = randomBlockSize();
+ final byte[] stageIds = { 0x01, 0x02 };
+ final EncodingContext encodingContext = createEncodingContext(stageIds, blockSize);
+
+ final byte[] buffer = new byte[blockSize * Long.BYTES + 256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
+ BlockFormat.writeBlock(out, new long[blockSize], TestPayloadCodecStage.INSTANCE, encodingContext);
+
+ final DecodingContext decodingContext = new DecodingContext(blockSize, stageIds.length);
+ final ByteArrayDataInput in = new ByteArrayDataInput(buffer, 0, out.getPosition());
+
+ final IOException e = expectThrows(
+ IOException.class,
+ () -> BlockFormat.readBlock(in, new long[blockSize], TestPayloadCodecStage.INSTANCE, decodingContext, 1)
+ );
+ assertEquals("Payload stage not applied - possible data corruption", e.getMessage());
+ }
+
+ private EncodingContext createEncodingContext(final byte[] stageIds, final int blockSize) {
+ final PipelineDescriptor pipeline = new PipelineDescriptor(stageIds, blockSize);
+ final EncodingContext context = new EncodingContext(blockSize, pipeline.pipelineLength());
+ context.setValueCount(blockSize);
+ return context;
+ }
+
+ private DecodingContext writeAndRead(
+ final byte[] stageIds,
+ final long[] values,
+ final long[] decoded,
+ final EncodingContext encodingContext,
+ final int blockSize,
+ final int payloadPosition
+ ) throws IOException {
+ final byte[] buffer = new byte[blockSize * Long.BYTES + 256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
+ BlockFormat.writeBlock(out, values, TestPayloadCodecStage.INSTANCE, encodingContext);
+
+ final DecodingContext decodingContext = new DecodingContext(blockSize, stageIds.length);
+ final ByteArrayDataInput in = new ByteArrayDataInput(buffer, 0, out.getPosition());
+ final int decodedCount = BlockFormat.readBlock(in, decoded, TestPayloadCodecStage.INSTANCE, decodingContext, payloadPosition);
+
+ assertEquals(blockSize, decodedCount);
+ return decodingContext;
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/DecodingContextTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/DecodingContextTests.java
new file mode 100644
index 0000000000000..f2bdd75c1c394
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/DecodingContextTests.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.ByteArrayDataOutput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class DecodingContextTests extends ESTestCase {
+
+ private static final int BASE_BLOCK_SIZE = 128;
+
+ private static int randomBlockSize() {
+ return BASE_BLOCK_SIZE << randomIntBetween(0, 7);
+ }
+
+ private static int randomPositionExcluding(final int position, final int pipelineLength) {
+ return (position + 1 + randomIntBetween(0, pipelineLength - 2)) % pipelineLength;
+ }
+
+ public void testSetPositionBitmap() {
+ final int pipelineLength = randomIntBetween(2, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final DecodingContext context = new DecodingContext(randomBlockSize(), pipelineLength);
+
+ final int applied = randomIntBetween(0, pipelineLength - 1);
+ final int notApplied = randomPositionExcluding(applied, pipelineLength);
+ context.setPositionBitmap((short) (1 << applied));
+ assertTrue(context.isStageApplied(applied));
+ assertFalse(context.isStageApplied(notApplied));
+ }
+
+ public void testClearResetsBitmap() {
+ final int pipelineLength = randomIntBetween(2, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final DecodingContext context = new DecodingContext(randomBlockSize(), pipelineLength);
+
+ short bitmap = 0;
+ for (int i = 0; i < pipelineLength; i++) {
+ if (randomBoolean()) {
+ bitmap |= (short) (1 << i);
+ }
+ }
+ final byte[] buffer = new byte[16];
+ context.setDataInput(new ByteArrayDataInput(buffer, 0, buffer.length));
+ context.setPositionBitmap(bitmap);
+
+ context.clear();
+
+ for (int i = 0; i < pipelineLength; i++) {
+ assertFalse(context.isStageApplied(i));
+ }
+ assertEquals(pipelineLength, context.pipelineLength());
+ }
+
+ public void testBlockSize() {
+ final int blockSize = randomBlockSize();
+ final DecodingContext context = new DecodingContext(blockSize, 1);
+ assertEquals(blockSize, context.blockSize());
+ }
+
+ public void testPipelineLength() {
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final DecodingContext context = new DecodingContext(randomBlockSize(), pipelineLength);
+ assertEquals(pipelineLength, context.pipelineLength());
+ }
+
+ public void testMetadataReaderReadsFromDataInput() throws Exception {
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final DecodingContext context = new DecodingContext(randomBlockSize(), pipelineLength);
+
+ final int numBytes = randomIntBetween(1, 16);
+ final byte[] buffer = randomByteArrayOfLength(numBytes);
+ final ByteArrayDataInput in = new ByteArrayDataInput(buffer, 0, buffer.length);
+ context.setDataInput(in);
+
+ short bitmap = (short) (1 << randomIntBetween(0, pipelineLength - 1));
+ for (int i = 0; i < pipelineLength; i++) {
+ if (randomBoolean()) {
+ bitmap |= (short) (1 << i);
+ }
+ }
+ context.setPositionBitmap(bitmap);
+
+ final MetadataReader reader = context.metadata();
+ for (int i = 0; i < numBytes; i++) {
+ assertEquals(buffer[i], reader.readByte());
+ }
+ }
+
+ public void testReadBytesConvenienceMethod() throws Exception {
+ final int numBytes = randomIntBetween(1, 64);
+ final byte[] source = randomByteArrayOfLength(numBytes);
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final DecodingContext context = new DecodingContext(randomBlockSize(), pipelineLength);
+ context.setDataInput(new ByteArrayDataInput(source, 0, source.length));
+ context.setPositionBitmap((short) (1 << randomIntBetween(0, pipelineLength - 1)));
+
+ final byte[] dest = new byte[numBytes];
+ context.metadata().readBytes(dest);
+ assertArrayEquals(source, dest);
+ }
+
+ public void testReverseWrittenMetadataIsReadSequentially() throws IOException {
+ final int numTransformStages = randomIntBetween(2, 8);
+ final int pipelineLength = numTransformStages + 1;
+ final EncodingContext encodingContext = new EncodingContext(randomBlockSize(), pipelineLength);
+
+ for (int pos = 0; pos < numTransformStages; pos++) {
+ encodingContext.setCurrentPosition(pos);
+ encodingContext.metadata().writeVLong(pos);
+ }
+
+ final byte[] buffer = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
+ encodingContext.writeStageMetadata(out);
+
+ final DecodingContext decodingContext = new DecodingContext(randomBlockSize(), pipelineLength);
+ decodingContext.setDataInput(new ByteArrayDataInput(buffer, 0, out.getPosition()));
+ decodingContext.setPositionBitmap(encodingContext.positionBitmap());
+
+ for (int pos = numTransformStages - 1; pos >= 0; pos--) {
+ assertEquals(pos, decodingContext.readVLong());
+ }
+ }
+
+ public void testAllMetadataReaderMethodsRoundtrip() throws Exception {
+ final byte byteVal = randomByte();
+ final int intVal = randomInt();
+ final long longVal = randomLong();
+ final int vintVal = randomIntBetween(0, Integer.MAX_VALUE);
+ final long vlongVal = randomLongBetween(0L, Long.MAX_VALUE);
+ final int zintVal = randomIntBetween(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ final long zlongVal = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE);
+
+ final MetadataBuffer writer = new MetadataBuffer();
+ writer.writeByte(byteVal);
+ writer.writeInt(intVal);
+ writer.writeLong(longVal);
+ writer.writeVInt(vintVal);
+ writer.writeVLong(vlongVal);
+ writer.writeZInt(zintVal);
+ writer.writeZLong(zlongVal);
+
+ final byte[] buffer = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
+ writer.writeTo(out, 0, writer.size());
+
+ final DecodingContext context = new DecodingContext(randomBlockSize(), 1);
+ context.setDataInput(new ByteArrayDataInput(buffer, 0, out.getPosition()));
+ context.setPositionBitmap((short) 0b1);
+
+ assertEquals(byteVal, context.readByte());
+ assertEquals(intVal, context.readInt());
+ assertEquals(longVal, context.readLong());
+ assertEquals(vintVal, context.readVInt());
+ assertEquals(vlongVal, context.readVLong());
+ assertEquals(zintVal, context.readZInt());
+ assertEquals(zlongVal, context.readZLong());
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/EncodingContextTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/EncodingContextTests.java
new file mode 100644
index 0000000000000..1fad645996436
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/EncodingContextTests.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+import org.apache.lucene.store.ByteArrayDataOutput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class EncodingContextTests extends ESTestCase {
+
+ private static final int BASE_BLOCK_SIZE = 128;
+
+ private static int randomBlockSize() {
+ return BASE_BLOCK_SIZE << randomIntBetween(0, 7);
+ }
+
+ private static int randomPositionExcluding(final int position, final int pipelineLength) {
+ return (position + 1 + randomIntBetween(0, pipelineLength - 2)) % pipelineLength;
+ }
+
+ public void testNewContextStartsWithNoActivePositions() {
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final EncodingContext context = new EncodingContext(randomBlockSize(), pipelineLength);
+ assertEquals(0, context.positionBitmap());
+ }
+
+ public void testActivatePositionSetsBitmapBit() {
+ final int blockSize = randomBlockSize();
+ final int pipelineLength = randomIntBetween(2, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final EncodingContext context = new EncodingContext(blockSize, pipelineLength);
+
+ final int pos1 = randomIntBetween(0, pipelineLength - 1);
+ context.applyStage(pos1);
+ assertTrue(context.isStageApplied(pos1));
+
+ final int pos2 = randomPositionExcluding(pos1, pipelineLength);
+ assertFalse(context.isStageApplied(pos2));
+
+ context.applyStage(pos2);
+ assertTrue(context.isStageApplied(pos1));
+ assertTrue(context.isStageApplied(pos2));
+ }
+
+ public void testMetadataActivatesPosition() {
+ final int pipelineLength = randomIntBetween(2, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final EncodingContext context = new EncodingContext(randomBlockSize(), pipelineLength);
+
+ final int pos = randomIntBetween(0, pipelineLength - 1);
+ context.setCurrentPosition(pos);
+ context.metadata().writeVLong(randomLongBetween(0, Long.MAX_VALUE));
+ assertTrue(context.isStageApplied(pos));
+
+ for (int i = 0; i < pipelineLength; i++) {
+ if (i != pos) {
+ assertFalse(context.isStageApplied(i));
+ }
+ }
+ }
+
+ public void testMetadataFailsWhenPositionNotSet() {
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final EncodingContext context = new EncodingContext(randomBlockSize(), pipelineLength);
+
+ expectThrows(AssertionError.class, context::metadata);
+ }
+
+ public void testClearResetsBitmapAndMetadata() {
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final int blockSize = randomBlockSize();
+ final EncodingContext context = new EncodingContext(blockSize, pipelineLength);
+
+ final int positions = randomIntBetween(1, pipelineLength);
+ for (int p = 0; p < positions; p++) {
+ context.setCurrentPosition(p);
+ context.metadata().writeVLong(randomLongBetween(0, Long.MAX_VALUE));
+ }
+ context.setValueCount(blockSize);
+
+ context.clear();
+
+ assertEquals(0, context.positionBitmap());
+ assertEquals(0, context.valueCount());
+ assertEquals(pipelineLength, context.pipelineLength());
+ }
+
+ public void testWriteStageMetadata() throws IOException {
+ final int pipelineLength = randomIntBetween(2, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final EncodingContext context = new EncodingContext(randomBlockSize(), pipelineLength);
+
+ final int positions = randomIntBetween(1, pipelineLength - 1);
+ for (int p = 0; p < positions; p++) {
+ context.setCurrentPosition(p);
+ context.metadata().writeVLong(randomLongBetween(0, Long.MAX_VALUE));
+ }
+
+ final byte[] buffer = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
+ context.writeStageMetadata(out);
+
+ assertTrue(out.getPosition() > 0);
+ }
+
+ public void testContextReusableAcrossMultipleBlocks() {
+ final int blockSize = randomBlockSize();
+ final EncodingContext context = new EncodingContext(blockSize, 3);
+
+ for (int block = 0; block < 10; block++) {
+ context.clear();
+
+ final int positions = randomIntBetween(1, 3);
+ for (int p = 0; p < positions; p++) {
+ context.setCurrentPosition(p);
+ context.metadata().writeVInt(block * 10 + p);
+ }
+
+ for (int p = 0; p < positions; p++) {
+ assertTrue(context.isStageApplied(p));
+ }
+ }
+ }
+
+ public void testValueCount() {
+ final int blockSize = randomBlockSize();
+ final EncodingContext context = new EncodingContext(blockSize, 1);
+
+ context.setValueCount(blockSize);
+ assertEquals(blockSize, context.valueCount());
+ }
+
+ public void testBlockSize() {
+ final int blockSize = randomBlockSize();
+ final EncodingContext context = new EncodingContext(blockSize, 1);
+ assertEquals(blockSize, context.blockSize());
+ }
+
+ public void testPipelineLength() {
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final EncodingContext context = new EncodingContext(randomBlockSize(), pipelineLength);
+ assertEquals(pipelineLength, context.pipelineLength());
+ }
+
+ public void testSingleBufferSharedAcrossPositions() throws IOException {
+ final int blockSize = randomBlockSize();
+ final EncodingContext context = new EncodingContext(blockSize, 4);
+
+ context.setCurrentPosition(0);
+ context.metadata().writeByte((byte) 0xAA);
+ context.setCurrentPosition(1);
+ context.metadata().writeByte((byte) 0xBB).writeByte((byte) 0xCC);
+ context.setCurrentPosition(2);
+ context.metadata().writeByte((byte) 0xDD);
+
+ final byte[] buffer = new byte[64];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
+ context.writeStageMetadata(out);
+
+ assertEquals(4, out.getPosition());
+ }
+
+ public void testWriteStageMetadataWritesInReverseOrder() throws IOException {
+ final int blockSize = randomBlockSize();
+ final EncodingContext context = new EncodingContext(blockSize, 3);
+
+ context.setCurrentPosition(0);
+ context.metadata().writeByte((byte) 0x00);
+ context.setCurrentPosition(1);
+ context.metadata().writeByte((byte) 0x11);
+
+ final byte[] buffer = new byte[64];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
+ context.writeStageMetadata(out);
+
+ assertEquals(2, out.getPosition());
+ assertEquals((byte) 0x11, buffer[0]);
+ assertEquals((byte) 0x00, buffer[1]);
+ }
+
+ public void testSetCurrentPositionOutOfRangeThrows() {
+ final int pipelineLength = randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH);
+ final EncodingContext context = new EncodingContext(randomBlockSize(), pipelineLength);
+ expectThrows(IllegalArgumentException.class, () -> context.setCurrentPosition(-1));
+ expectThrows(IllegalArgumentException.class, () -> context.setCurrentPosition(pipelineLength));
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/FieldDescriptorTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/FieldDescriptorTests.java
new file mode 100644
index 0000000000000..6dcd19d1d206e
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/FieldDescriptorTests.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+import org.apache.lucene.store.ByteBuffersDirectory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class FieldDescriptorTests extends ESTestCase {
+
+ private static final int BASE_BLOCK_SIZE = 128;
+
+ private static int randomBlockSize() {
+ return BASE_BLOCK_SIZE << randomIntBetween(0, 7);
+ }
+
+ public void testFormatVersionIsWrittenAndRead() throws IOException {
+ final PipelineDescriptor pipeline = new PipelineDescriptor(
+ randomStageIds(randomIntBetween(1, 5)),
+ randomBlockSize(),
+ randomFrom(PipelineDescriptor.DataType.values())
+ );
+
+ try (Directory dir = new ByteBuffersDirectory()) {
+ try (IndexOutput meta = dir.createOutput("test.meta", IOContext.DEFAULT)) {
+ FieldDescriptor.write(meta, pipeline);
+ }
+
+ try (IndexInput meta = dir.openInput("test.meta", IOContext.DEFAULT)) {
+ final int version = meta.readVInt();
+ assertThat(version, equalTo(FieldDescriptor.CURRENT_FORMAT_VERSION));
+ }
+ }
+ }
+
+ private static byte[] randomStageIds(final int length) {
+ final StageId[] allStages = StageId.values();
+ final byte[] stageIds = new byte[length];
+ for (int i = 0; i < length; i++) {
+ stageIds[i] = allStages[randomIntBetween(0, allStages.length - 1)].id;
+ }
+ return stageIds;
+ }
+
+ public void testUnsupportedFormatVersionThrows() throws IOException {
+ try (Directory dir = new ByteBuffersDirectory()) {
+ try (IndexOutput meta = dir.createOutput("test.meta", IOContext.DEFAULT)) {
+ meta.writeVInt(999);
+ }
+
+ try (IndexInput meta = dir.openInput("test.meta", IOContext.DEFAULT)) {
+ final IOException e = expectThrows(IOException.class, () -> FieldDescriptor.read(meta));
+ assertThat(e.getMessage(), containsString("Unsupported FieldDescriptor format version: 999"));
+ assertThat(e.getMessage(), containsString("Maximum supported version is " + FieldDescriptor.CURRENT_FORMAT_VERSION));
+ }
+ }
+ }
+
+ public void testRoundTrip() throws IOException {
+ final int blockSize = randomBlockSize();
+ final PipelineDescriptor.DataType dataType = randomFrom(PipelineDescriptor.DataType.values());
+ final byte[] stageIds = randomStageIds(randomIntBetween(1, PipelineDescriptor.MAX_PIPELINE_LENGTH));
+ final PipelineDescriptor pipeline = new PipelineDescriptor(stageIds, blockSize, dataType);
+
+ try (Directory dir = new ByteBuffersDirectory()) {
+ try (IndexOutput meta = dir.createOutput("test.meta", IOContext.DEFAULT)) {
+ FieldDescriptor.write(meta, pipeline);
+ }
+
+ try (IndexInput meta = dir.openInput("test.meta", IOContext.DEFAULT)) {
+ final PipelineDescriptor desc = FieldDescriptor.read(meta);
+ assertEquals(pipeline, desc);
+ assertEquals(dataType, desc.dataType());
+ }
+ }
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataBufferTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataBufferTests.java
new file mode 100644
index 0000000000000..fff0a33da6f6a
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/MetadataBufferTests.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.ByteArrayDataOutput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class MetadataBufferTests extends ESTestCase {
+
+ public void testNewBufferIsEmpty() {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ assertEquals(0, buffer.size());
+ }
+
+ public void testClearResetsBuffer() {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ buffer.writeVInt(randomIntBetween(1, 10000));
+ buffer.writeVLong(randomLongBetween(1L, 100000L));
+
+ buffer.clear();
+
+ assertEquals(0, buffer.size());
+ }
+
+ public void testBufferReusableAfterClear() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ buffer.writeVInt(randomIntBetween(1, 10000));
+ buffer.clear();
+
+ final long expected = randomLongBetween(1L, 100000L);
+ buffer.writeVLong(expected);
+
+ final byte[] output = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ assertEquals(expected, in.readVLong());
+ }
+
+ public void testBufferGrowsWithLargeData() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ final int count = randomIntBetween(100, 200);
+
+ for (int i = 0; i < count; i++) {
+ buffer.writeVInt(i);
+ }
+
+ final int expectedSize = buffer.size();
+ assertTrue("Buffer should have grown beyond default capacity", expectedSize > 64);
+
+ final byte[] output = new byte[4096];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ assertEquals("writeTo should write exactly size() bytes", expectedSize, out.getPosition());
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ for (int i = 0; i < count; i++) {
+ assertEquals(i, in.readVInt());
+ }
+ }
+
+ public void testWriteToSlice() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+
+ final int first = randomIntBetween(1, 1000);
+ final int second = randomIntBetween(1001, 2000);
+ final int third = randomIntBetween(2001, 3000);
+
+ buffer.writeVInt(first);
+ final int offset1 = buffer.size();
+ buffer.writeVInt(second);
+ final int offset2 = buffer.size();
+ buffer.writeVInt(third);
+
+ final byte[] output = new byte[64];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, offset1, offset2 - offset1);
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ assertEquals(second, in.readVInt());
+ }
+
+ public void testWriteToEmptySlice() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ buffer.writeVInt(randomIntBetween(1, 10000));
+
+ final byte[] output = new byte[64];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, 0);
+
+ assertEquals(0, out.getPosition());
+ }
+
+ public void testWriteBytesWithOffsetAndLength() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ final byte[] source = randomByteArrayOfLength(randomIntBetween(10, 50));
+ final int offset = randomIntBetween(0, source.length / 2);
+ final int length = randomIntBetween(1, source.length - offset);
+
+ buffer.writeBytes(source, offset, length);
+
+ assertEquals(length, buffer.size());
+
+ final byte[] output = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ final byte[] read = new byte[length];
+ in.readBytes(read, 0, length);
+
+ final byte[] expected = new byte[length];
+ System.arraycopy(source, offset, expected, 0, length);
+ assertArrayEquals(expected, read);
+ }
+
+ public void testRoundtrip() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ final byte byteVal = randomByte();
+ final int intVal = randomIntBetween(0, Integer.MAX_VALUE);
+ final long longVal = randomLongBetween(0L, Long.MAX_VALUE);
+ final int zintVal = randomIntBetween(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ final long zlongVal = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE);
+
+ buffer.writeByte(byteVal);
+ buffer.writeVInt(intVal);
+ buffer.writeVLong(longVal);
+ buffer.writeZInt(zintVal);
+ buffer.writeZLong(zlongVal);
+
+ final byte[] output = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ assertEquals(byteVal, in.readByte());
+ assertEquals(intVal, in.readVInt());
+ assertEquals(longVal, in.readVLong());
+ assertEquals(zintVal, in.readZInt());
+ assertEquals(zlongVal, in.readZLong());
+ }
+
+ public void testWriteLongBoundaryValues() throws IOException {
+ final long[] values = new long[] { 0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 0x00000000FFFFFFFFL, 0xFFFFFFFF00000000L };
+
+ for (final long value : values) {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ buffer.writeLong(value);
+
+ final byte[] output = new byte[64];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ assertEquals(
+ "Roundtrip failed for value " + Long.toHexString(value),
+ value,
+ new ByteArrayDataInput(output, 0, out.getPosition()).readLong()
+ );
+
+ final byte[] luceneOutput = new byte[Long.BYTES];
+ new ByteArrayDataOutput(luceneOutput).writeLong(value);
+ final byte[] bufferBytes = new byte[Long.BYTES];
+ new ByteArrayDataOutput(bufferBytes).writeBytes(output, 0, Long.BYTES);
+ assertArrayEquals("Byte layout mismatch for value " + Long.toHexString(value), luceneOutput, bufferBytes);
+ }
+ }
+
+ public void testWriteLongMultipleValues() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ final int count = randomIntBetween(5, 20);
+ final long[] values = new long[count];
+
+ for (int i = 0; i < count; i++) {
+ values[i] = randomLong();
+ buffer.writeLong(values[i]);
+ }
+
+ assertEquals(count * Long.BYTES, buffer.size());
+
+ final byte[] output = new byte[count * Long.BYTES];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ for (int i = 0; i < count; i++) {
+ assertEquals("Value at index " + i, values[i], in.readLong());
+ }
+ }
+
+ public void testWriteLongMixedWithOtherTypes() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ final byte byteVal = randomByte();
+ final long longVal = randomLong();
+ final int vintVal = randomIntBetween(0, Integer.MAX_VALUE);
+ final long longVal2 = randomLong();
+
+ buffer.writeByte(byteVal);
+ buffer.writeLong(longVal);
+ buffer.writeVInt(vintVal);
+ buffer.writeLong(longVal2);
+
+ final byte[] output = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ assertEquals(byteVal, in.readByte());
+ assertEquals(longVal, in.readLong());
+ assertEquals(vintVal, in.readVInt());
+ assertEquals(longVal2, in.readLong());
+ }
+
+ public void testWriteBytesConvenienceMethod() throws IOException {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ final byte[] source = randomByteArrayOfLength(randomIntBetween(5, 20));
+
+ buffer.writeBytes(source);
+
+ assertEquals(source.length, buffer.size());
+
+ final byte[] output = new byte[256];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ final ByteArrayDataInput in = new ByteArrayDataInput(output, 0, out.getPosition());
+ final byte[] read = new byte[source.length];
+ in.readBytes(read, 0, source.length);
+ assertArrayEquals(source, read);
+ }
+
+ public void testEmptyMethod() {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ buffer.empty();
+ assertEquals(0, buffer.size());
+ }
+
+ public void testWriteIntBoundaryValues() throws IOException {
+ final int[] values = new int[] { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE };
+
+ for (final int value : values) {
+ final MetadataBuffer buffer = new MetadataBuffer();
+ buffer.writeInt(value);
+
+ final byte[] output = new byte[64];
+ final ByteArrayDataOutput out = new ByteArrayDataOutput(output);
+ buffer.writeTo(out, 0, buffer.size());
+
+ assertEquals(
+ "Roundtrip failed for value " + Integer.toHexString(value),
+ value,
+ new ByteArrayDataInput(output, 0, out.getPosition()).readInt()
+ );
+
+ final byte[] luceneOutput = new byte[Integer.BYTES];
+ new ByteArrayDataOutput(luceneOutput).writeInt(value);
+ final byte[] bufferBytes = new byte[Integer.BYTES];
+ new ByteArrayDataOutput(bufferBytes).writeBytes(output, 0, Integer.BYTES);
+ assertArrayEquals("Byte layout mismatch for value " + Integer.toHexString(value), luceneOutput, bufferBytes);
+ }
+ }
+
+}
diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/PipelineConfigTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/PipelineConfigTests.java
new file mode 100644
index 0000000000000..87eed02d25850
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/pipeline/PipelineConfigTests.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.codec.tsdb.pipeline;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class PipelineConfigTests extends ESTestCase {
+
+ private static int randomBlockSize() {
+ return 128 << randomIntBetween(0, 7);
+ }
+
+ public void testLongBuilderFluency() {
+ final int blockSize = randomBlockSize();
+ final PipelineConfig config = PipelineConfig.forLongs(blockSize).delta().offset().gcd().bitPack();
+ assertEquals(PipelineDescriptor.DataType.LONG, config.dataType());
+ assertEquals(blockSize, config.blockSize());
+ assertEquals(4, config.specs().size());
+ }
+
+ public void testBlockSizePreserved() {
+ final int blockSize = randomBlockSize();
+ assertEquals(blockSize, PipelineConfig.forLongs(blockSize).delta().bitPack().blockSize());
+ }
+
+ public void testEquality() {
+ final int blockSize = randomBlockSize();
+ final PipelineConfig config1 = PipelineConfig.forLongs(blockSize).delta().bitPack();
+ final PipelineConfig config2 = PipelineConfig.forLongs(blockSize).delta().bitPack();
+
+ assertEquals(config1, config2);
+ assertEquals(config1.hashCode(), config2.hashCode());
+ }
+
+ public void testInequalityByStages() {
+ final int blockSize = randomBlockSize();
+ assertNotEquals(PipelineConfig.forLongs(blockSize).delta().bitPack(), PipelineConfig.forLongs(blockSize).offset().bitPack());
+ }
+
+ public void testInequalityByDataType() {
+ final int blockSize = randomBlockSize();
+ final List