Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,40 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

/**
* Immutable specification for a field's encoding pipeline.
*
* <p>Captures the data type, block size, and ordered stage specifications for
* deferred codec construction. Use {@link #forLongs} to start building a
* configuration via the fluent builder API.
* <p>Captures the data type, block size, ordered transform stages, and terminal
* payload stage. The builder separates transforms from the payload at
* construction time, making illegal states (e.g. two payloads, payload in the
* middle) unrepresentable.
*
* @param dataType the numeric data type
* @param blockSize the number of values per block (must be a positive power of 2)
* @param specs the ordered stage specifications
* <p>Use {@link #forLongs} to start building a configuration via the fluent
* builder API.
*
* @param dataType the numeric data type
* @param blockSize the number of values per block (must be a positive power of 2)
* @param transforms the ordered transform stage specifications
* @param payload the terminal payload stage specification
*/
public record PipelineConfig(PipelineDescriptor.DataType dataType, int blockSize, List<StageSpec> specs) {
public record PipelineConfig(
PipelineDescriptor.DataType dataType,
int blockSize,
List<StageSpec.TransformSpec> transforms,
StageSpec.PayloadSpec payload
) {

/** Validates invariants and creates a defensive copy of the specs list. */
/** Validates invariants and creates a defensive copy of the transforms list. */
public PipelineConfig {
Objects.requireNonNull(dataType, "dataType must not be null");
if (blockSize <= 0 || (blockSize & (blockSize - 1)) != 0) {
throw new IllegalArgumentException("blockSize must be a positive power of 2, got: " + blockSize);
}
Objects.requireNonNull(specs, "specs must not be null");
specs = List.copyOf(specs);
Objects.requireNonNull(transforms, "transforms must not be null");
Objects.requireNonNull(payload, "payload must not be null");
transforms = List.copyOf(transforms);
}

/**
Expand All @@ -47,15 +59,12 @@ public static LongBuilder forLongs(int blockSize) {
}

/**
* Creates a pipeline configuration directly from components.
* Returns all stage specifications in order (transforms followed by payload).
*
* @param dataType the numeric data type
* @param blockSize the number of values per block
* @param specs the ordered stage specifications
* @return the pipeline configuration
* @return unmodifiable list of all specs
*/
public static PipelineConfig of(final PipelineDescriptor.DataType dataType, int blockSize, final List<StageSpec> specs) {
return new PipelineConfig(dataType, blockSize, specs);
public List<StageSpec> specs() {
return Stream.concat(transforms.stream().map(s -> (StageSpec) s), Stream.of(payload)).toList();
}

/**
Expand All @@ -64,15 +73,16 @@ public static PipelineConfig of(final PipelineDescriptor.DataType dataType, int
* @return the stage names joined by {@code >} delimiters
*/
public String describeStages() {
if (specs.isEmpty()) {
final List<StageSpec> allSpecs = specs();
if (allSpecs.isEmpty()) {
return "empty";
}
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < specs.size(); i++) {
for (int i = 0; i < allSpecs.size(); i++) {
if (i > 0) {
sb.append('>');
}
sb.append(specs.get(i).stageId().displayName);
sb.append(allSpecs.get(i).stageId().displayName);
}
return sb.toString();
}
Expand All @@ -83,7 +93,7 @@ public String describeStages() {
*/
public static final class LongBuilder {
private final int blockSize;
private final List<StageSpec> specs = new ArrayList<>();
private final List<StageSpec.TransformSpec> transforms = new ArrayList<>();

private LongBuilder(int blockSize) {
this.blockSize = blockSize;
Expand All @@ -95,7 +105,7 @@ private LongBuilder(int blockSize) {
* @return this builder
*/
public LongBuilder delta() {
specs.add(new StageSpec.DeltaStage());
transforms.add(new StageSpec.DeltaStage());
return this;
}

Expand All @@ -105,7 +115,7 @@ public LongBuilder delta() {
* @return this builder
*/
public LongBuilder offset() {
specs.add(new StageSpec.OffsetStage());
transforms.add(new StageSpec.OffsetStage());
return this;
}

Expand All @@ -115,7 +125,7 @@ public LongBuilder offset() {
* @return this builder
*/
public LongBuilder gcd() {
specs.add(new StageSpec.GcdStage());
transforms.add(new StageSpec.GcdStage());
return this;
}

Expand All @@ -125,8 +135,7 @@ public LongBuilder gcd() {
* @return the pipeline configuration
*/
public PipelineConfig bitPack() {
specs.add(new StageSpec.BitPackPayload());
return new PipelineConfig(PipelineDescriptor.DataType.LONG, blockSize, specs);
return new PipelineConfig(PipelineDescriptor.DataType.LONG, blockSize, transforms, new StageSpec.BitPackPayload());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.apache.lucene.store.DataInput;
import org.elasticsearch.index.codec.tsdb.pipeline.DecodingContext;

import java.io.IOException;

/**
* Per-field block decoder owning a mutable {@link DecodingContext}.
*
* <p>A single instance decodes all blocks for one field. The context holds
* per-block mutable state (stage bitmap, metadata buffer, data input reference)
* that is cleared via {@link DecodingContext#clear()} at the start of each
* block and reused for the next. This zero-allocation-per-block design avoids
* GC pressure on the decode hot path, but makes the instance NOT thread-safe:
* concurrent callers must each obtain their own instance via
* {@link NumericDecoder#newBlockDecoder()}.
*/
public final class NumericBlockDecoder {

private final NumericDecodePipeline pipeline;
private final DecodingContext decodingContext;

NumericBlockDecoder(final NumericDecodePipeline pipeline) {
this.pipeline = pipeline;
this.decodingContext = new DecodingContext(pipeline.blockSize(), pipeline.size());
}

/**
* Decodes a block of values by reading the payload and reversing transforms.
*
* @param values the output array to populate
* @param count the expected number of values
* @param in the data input to read from
* @throws IOException if an I/O error occurs
*/
public void decode(final long[] values, int count, final DataInput in) throws IOException {
decodingContext.clear();
pipeline.decode(values, count, in, decodingContext);
}

/**
* Returns the number of values per block.
*
* @return the number of values per block
*/
public int blockSize() {
return pipeline.blockSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.apache.lucene.store.DataOutput;
import org.elasticsearch.index.codec.tsdb.pipeline.EncodingContext;

import java.io.IOException;

/**
* Per-field block encoder owning a mutable {@link EncodingContext}.
*
* <p>A single instance encodes all blocks for one field. The context holds
* per-block mutable state (stage bitmap, metadata buffer, value count) that is
* cleared via {@link EncodingContext#clear()} at the start of each block and
* reused for the next. This zero-allocation-per-block design avoids GC pressure
* on the encode hot path, but makes the instance NOT thread-safe: concurrent
* callers must each obtain their own instance via
* {@link NumericEncoder#newBlockEncoder()}.
*/
public final class NumericBlockEncoder {

private final NumericEncodePipeline pipeline;
private final EncodingContext encodingContext;

NumericBlockEncoder(final NumericEncodePipeline pipeline) {
this.pipeline = pipeline;
this.encodingContext = new EncodingContext(pipeline.blockSize(), pipeline.size());
}

/**
* Encodes a block of values through the pipeline.
*
* @param values the values to encode (modified in-place by transform stages)
* @param valueCount the number of valid values
* @param out the data output to write the encoded block to
* @throws IOException if an I/O error occurs
*/
public void encode(final long[] values, int valueCount, final DataOutput out) throws IOException {
encodingContext.clear();
pipeline.encode(values, valueCount, out, encodingContext);
}

/**
* Returns the number of values per block.
*
* @return the number of values per block
*/
public int blockSize() {
return pipeline.blockSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.elasticsearch.index.codec.tsdb.pipeline.PipelineConfig;
import org.elasticsearch.index.codec.tsdb.pipeline.PipelineDescriptor;

/**
* Factory for creating encoder/decoder instances.
*
* <p>The two factory methods take different inputs because they serve different
* paths: {@link #createEncoder} takes a {@link PipelineConfig} (the fluent
* builder API used at index time to specify which stages to apply), while
* {@link #createDecoder} takes a {@link PipelineDescriptor} (the compact
* byte-level representation read from segment metadata at search time). The
* encoder writes the descriptor; the decoder reads it back.
*/
public interface NumericCodecFactory {

/** Default factory that delegates to {@link NumericEncoder#fromConfig} and {@link NumericDecoder#fromDescriptor}. */
NumericCodecFactory DEFAULT = new NumericCodecFactory() {
@Override
public NumericEncoder createEncoder(PipelineConfig config) {
return NumericEncoder.fromConfig(config);
}

@Override
public NumericDecoder createDecoder(PipelineDescriptor descriptor) {
return NumericDecoder.fromDescriptor(descriptor);
}
};

/**
* Creates an encoder from the given pipeline configuration.
*
* @param config the pipeline configuration specifying which stages to apply
* @return the encoder for the configured pipeline
*/
NumericEncoder createEncoder(PipelineConfig config);

/**
* Creates a decoder from the given pipeline descriptor.
*
* @param descriptor the pipeline descriptor read from segment metadata
* @return the decoder for the described pipeline
*/
NumericDecoder createDecoder(PipelineDescriptor descriptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
* deltas, removing offsets, or factoring out a GCD) so that the terminal payload
* stage can pack them into fewer bits.
*/
public interface NumericCodecStage extends NumericEncoder, NumericDecoder {}
public interface NumericCodecStage extends TransformEncoder, TransformDecoder {}
Loading
Loading