Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.apache.lucene.store.DataInput;
import org.elasticsearch.index.codec.tsdb.pipeline.DecodingContext;

import java.io.IOException;

/**
* Per-field block decoder owning a mutable {@link DecodingContext}.
*
* <p>NOT thread-safe. Each instance owns a {@link DecodingContext} with mutable
* per-block state. Callers that may run concurrently must each hold their own
* decoder instance, obtained via {@link NumericDecoder#newBlockDecoder()}.
*/
public final class NumericBlockDecoder {

private final NumericDecodePipeline pipeline;
private final DecodingContext decodingContext;

NumericBlockDecoder(final NumericDecodePipeline pipeline) {
this.pipeline = pipeline;
this.decodingContext = new DecodingContext(pipeline.blockSize(), pipeline.size());
}

/**
* Decodes a block of values by reading the payload and reversing transforms.
*
* @param values the output array to populate
* @param count the expected number of values
* @param in the data input to read from
* @throws IOException if an I/O error occurs
*/
public void decode(final long[] values, int count, final DataInput in) throws IOException {
decodingContext.clear();
pipeline.decode(values, count, in, decodingContext);
}

/** Returns the number of values per block. */
public int blockSize() {
return pipeline.blockSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.apache.lucene.store.DataOutput;
import org.elasticsearch.index.codec.tsdb.pipeline.EncodingContext;

import java.io.IOException;

/**
* Per-field block encoder owning a mutable {@link EncodingContext}.
*
* <p>NOT thread-safe. Each instance owns an {@link EncodingContext} with mutable
* per-block state. Callers that may run concurrently must each hold their own
* encoder instance, obtained via {@link NumericEncoder#newBlockEncoder()}.
*/
public final class NumericBlockEncoder {

private final NumericEncodePipeline pipeline;
private final EncodingContext encodingContext;

NumericBlockEncoder(final NumericEncodePipeline pipeline) {
this.pipeline = pipeline;
this.encodingContext = new EncodingContext(pipeline.blockSize(), pipeline.size());
}

/**
* Encodes a block of values through the pipeline.
*
* @param values the values to encode (modified in-place by transform stages)
* @param valueCount the number of valid values
* @param out the data output to write the encoded block to
* @throws IOException if an I/O error occurs
*/
public void encode(final long[] values, int valueCount, final DataOutput out) throws IOException {
encodingContext.clear();
pipeline.encode(values, valueCount, out, encodingContext);
}

/** Returns the number of values per block. */
public int blockSize() {
return pipeline.blockSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.elasticsearch.index.codec.tsdb.pipeline.PipelineConfig;
import org.elasticsearch.index.codec.tsdb.pipeline.PipelineDescriptor;

/**
* Factory for creating encoder/decoder instances from pipeline configurations.
* The consumer calls {@link #createEncoder} with a {@link PipelineConfig}.
* The producer calls {@link #createDecoder} with a {@link PipelineDescriptor}.
*/
public interface NumericCodecFactory {

/** Default factory that delegates to {@link NumericEncoder#fromConfig} and {@link NumericDecoder#fromDescriptor}. */
NumericCodecFactory DEFAULT = new NumericCodecFactory() {
@Override
public NumericEncoder createEncoder(PipelineConfig config) {
return NumericEncoder.fromConfig(config);
}

@Override
public NumericDecoder createDecoder(PipelineDescriptor descriptor) {
return NumericDecoder.fromDescriptor(descriptor);
}
};

/** Creates an encoder from the given pipeline configuration. */
NumericEncoder createEncoder(PipelineConfig config);

/** Creates a decoder from the given pipeline descriptor. */
NumericDecoder createDecoder(PipelineDescriptor descriptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
* deltas, removing offsets, or factoring out a GCD) so that the terminal payload
* stage can pack them into fewer bits.
*/
public interface NumericCodecStage extends NumericEncoder, NumericDecoder {}
public interface NumericCodecStage extends TransformEncoder, TransformDecoder {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.apache.lucene.store.DataInput;
import org.elasticsearch.index.codec.tsdb.pipeline.BlockFormat;
import org.elasticsearch.index.codec.tsdb.pipeline.DecodingContext;
import org.elasticsearch.index.codec.tsdb.pipeline.PipelineDescriptor;
import org.elasticsearch.index.codec.tsdb.pipeline.StageId;
import org.elasticsearch.index.codec.tsdb.pipeline.StageSpec;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Immutable decoding pipeline: reads payload then reverses transform stages.
*
* <p>Reconstructed from a {@link PipelineDescriptor} read from segment metadata,
* making the format self-describing. The decoder does not need to know the
* pipeline configuration at compile time.
*
* <p>Instances are immutable and thread-safe. Mutable per-block state lives in
* {@link DecodingContext}, which must be provided by the caller.
*/
public final class NumericDecodePipeline {

private final NumericCodecStage[] transformStages;
private final PayloadCodecStage payloadStage;
private final int blockSize;
private final int payloadPosition;

NumericDecodePipeline(final NumericCodecStage[] transformStages, final PayloadCodecStage payloadStage, int blockSize) {
this.transformStages = transformStages;
this.payloadStage = payloadStage;
this.blockSize = blockSize;
this.payloadPosition = transformStages.length;
}

/**
* Reconstructs a decode pipeline from a persisted descriptor.
*
* @param descriptor the pipeline descriptor read from segment metadata
* @return the decode pipeline
*/
public static NumericDecodePipeline fromDescriptor(final PipelineDescriptor descriptor) {
final int blockSize = descriptor.blockSize();
final int stageCount = descriptor.pipelineLength();
final List<NumericCodecStage> transforms = new ArrayList<>();

for (int i = 0; i < stageCount - 1; i++) {
final StageSpec spec = StageFactory.specFromStageId(StageId.fromId(descriptor.stageIdAt(i)));
transforms.add(StageFactory.newTransformStage(spec));
}
final StageSpec payloadSpec = StageFactory.specFromStageId(StageId.fromId(descriptor.stageIdAt(stageCount - 1)));
final PayloadCodecStage payloadStage = StageFactory.newPayloadStage(payloadSpec, blockSize);

return new NumericDecodePipeline(transforms.toArray(NumericCodecStage[]::new), payloadStage, blockSize);
}

/**
* Decodes a block of values by reading the payload and reversing transforms.
*
* @param values the output array to populate
* @param count the expected number of values
* @param in the data input to read from
* @param context the mutable per-block decoding context
* @throws IOException if an I/O error occurs
*/
public void decode(final long[] values, int count, final DataInput in, final DecodingContext context) throws IOException {
context.setDataInput(in);
BlockFormat.readBlock(in, values, payloadStage, context, payloadPosition);
for (int i = transformStages.length - 1; i >= 0; i--) {
if (context.isStageApplied(i)) {
transformStages[i].decode(values, count, context);
}
}
}

/** Returns the number of values per block. */
public int blockSize() {
return blockSize;
}

/** Returns the total number of stages (transforms + payload). */
public int size() {
return transformStages.length + 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,47 @@

package org.elasticsearch.index.codec.tsdb.pipeline.numeric;

import org.elasticsearch.index.codec.tsdb.pipeline.DecodingContext;

import java.io.IOException;
import org.elasticsearch.index.codec.tsdb.pipeline.PipelineDescriptor;

/**
* Reverses an in-place transformation as a non-terminal stage of the decode pipeline.
* Read-path coordinator: owns a {@link NumericDecodePipeline} and produces
* {@link NumericBlockDecoder} instances for decoding blocks of values.
*
* <p>Instances are immutable and thread-safe. Per-field mutable state lives in
* {@link NumericBlockDecoder}, which callers obtain via {@link #newBlockDecoder()}.
*
* <p>Unlike {@link PayloadDecoder}, transform decoders do not read from a
* {@link org.apache.lucene.store.DataInput} directly. They read metadata written
* by the corresponding {@link NumericEncoder} via {@link DecodingContext#metadata()}.
* <p>Created via {@link #fromDescriptor} or via {@link NumericCodecFactory#createDecoder}.
*/
public interface NumericDecoder {
public final class NumericDecoder {

private final NumericDecodePipeline pipeline;

NumericDecoder(final NumericDecodePipeline pipeline) {
this.pipeline = pipeline;
}

/**
* Returns the unique stage identifier.
* Reconstructs a decoder from a persisted descriptor.
* Use {@link NumericCodecFactory#createDecoder} as the public entry point.
*
* @return the stage ID byte
* @param descriptor the pipeline descriptor read from segment metadata
* @return the decoder
*/
byte id();
static NumericDecoder fromDescriptor(final PipelineDescriptor descriptor) {
return new NumericDecoder(NumericDecodePipeline.fromDescriptor(descriptor));
}

/**
* Reverses the transformation on values in-place using metadata from the context.
* Creates a new block decoder with its own mutable decoding context.
*
* @param values the values to reverse-transform in-place
* @param valueCount the number of valid values in the array
* @param context the decoding context for reading stage metadata
* @throws IOException if an I/O error occurs while reading metadata
* @return a fresh block decoder
*/
void decode(long[] values, int valueCount, DecodingContext context) throws IOException;
public NumericBlockDecoder newBlockDecoder() {
return new NumericBlockDecoder(pipeline);
}

/** Returns the number of values per block. */
public int blockSize() {
return pipeline.blockSize();
}
}
Loading
Loading