feat(tsdb): add composable pipeline framework for ES94 TSDB codec#143589
Conversation
Introduce the foundation layer for the ES94 pipeline codec, which replaces the monolithic ES819 encoding with a composable pipeline of transform and payload stages. This is PR 1 of a series. It contains no concrete stage implementations - only the type system, wire format, metadata I/O, block format, and context objects that subsequent PRs build on. Key components: - StageId: wire format registry of stage byte identifiers - StageSpec: sealed hierarchy of stage specifications with parameters - PipelineDescriptor: serialization/deserialization of pipeline configuration - FieldDescriptor: versioned envelope for pipeline descriptors - PipelineConfig: fluent builder API for pipeline construction - BlockFormat: per-block encode/decode layout (bitmap + payload + metadata) - EncodingContext / DecodingContext: mutable per-block state, reused via clear() - MetadataBuffer / MetadataWriter / MetadataReader: gather-scatter metadata I/O - PayloadEncoder / PayloadDecoder: interfaces for terminal payload stages
…lp and extract magic numbers
|
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
| ALP_DOUBLE_STAGE((byte) 0x06, "alpDouble"), | ||
| ALP_FLOAT_STAGE((byte) 0x07, "alpFloat"), | ||
| FPC_DOUBLE_STAGE((byte) 0x08, "fpcDouble"), | ||
| FPC_FLOAT_STAGE((byte) 0x09, "fpcFloat"), | ||
|
|
||
| BITPACK_PAYLOAD((byte) 0xA1, "bitPack"), | ||
| ZSTD_PAYLOAD((byte) 0xA2, "zstd"), | ||
| LZ4_PAYLOAD((byte) 0xA3, "lz4"), | ||
| GORILLA_DOUBLE_PAYLOAD((byte) 0xA4, "gorillaDouble"), | ||
| GORILLA_FLOAT_PAYLOAD((byte) 0xA5, "gorillaFloat"), | ||
| CHIMP_DOUBLE_PAYLOAD((byte) 0xA6, "chimpDouble"), | ||
| CHIMP_FLOAT_PAYLOAD((byte) 0xA7, "chimpFloat"), | ||
| CHIMP128_DOUBLE_PAYLOAD((byte) 0xA8, "chimp128Double"), | ||
| CHIMP128_FLOAT_PAYLOAD((byte) 0xA9, "chimp128Float"); |
There was a problem hiding this comment.
I understand that this PR is preparing for the the es94 doc values format, but can we leave these stages and related code out for now and add these back when needed?
Given that we first focus on getting es94 in a state that performs same encoding techniques that es819 is using? This keeps this PR smaller and easier to review.
There was a problem hiding this comment.
I can keep a limited set to start with but I need a few of them to be able to do some testing with actual values.
martijnvg
left a comment
There was a problem hiding this comment.
Thanks Salvatore, I did a first review round.
| /** | ||
| * Writes stage metadata values to a buffer during encoding. Supports method chaining. | ||
| */ | ||
| public interface MetadataWriter { |
There was a problem hiding this comment.
Can you explain why we can't use here StreamOutput or some other existing writable interface from ES or Lucene? Same question for the MetadateReader
There was a problem hiding this comment.
The block layout is [bitmap][payload][stage metadata] (see BlockFormat Javadoc). Metadata comes after the payload on disk, but transform stages like delta, offset, GCD, produce metadata before the payload during encoding (e.g. we do delta>offset>gcd>bitpack). So we buffer metadata in memory and flush it after the payload is written.
We chose this layout deliberately: if metadata came first on disk instead, encoding would be simpler (write directly to DataOutput) but the decoder would need to buffer or seek past metadata to reach the payload on every block read. Since decoding happens far more often than encoding, we push the buffering to the encode side (once) to give the decoder a clean single forward pass with no buffering or seeking. With this layout if you look at how decoding works we always read sequentially. We always read and cache the bitmap (1-2 bytes). Then the full payload which we decode using BitPack (first on decode but last during encode). Then every piece of metadata for delta>offset>gcd is already ordered so that when decoding we first have GCD metadata, then offset metadata and then delta metadata. No jump, no buffering no going back and forth. This is important because:
- it favors SIMD-friendly decoders (loops with no if or jumps)
- fewer jumps in code help the CPU pre-fetch and caching
MetadataWriter/MetadataReader are minimal interfaces (8 methods each) that decouple stages from this buffering strategy. With this idea every stage just writes/reads metadata without knowing all of this goes on under the hood and they don't need to know anything about the block format or the order. This also makes BWC far easier: since stages only see MetadataWriter/MetadataReader, we can change the block layout or metadata ordering without touching any stage implementation.
There was a problem hiding this comment.
I see, I agree that this an minimal interface. Can you add your explanation of why you chose not to rely on e.g. DataOutput as class level java doc?
server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/PipelineDescriptor.java
Show resolved
Hide resolved
| } | ||
|
|
||
| /** Zstandard block compression payload. */ | ||
| record ZstdPayload(int compressionLevel) implements StageSpec { |
There was a problem hiding this comment.
Some of these stage specs like this one, should always be the stagespec, right? If that is the case maybe we should have a different interface for these payloads? Like a marker interface that expends from StageSpec? Then we can add a build() method to the builders that accepts this marker interface?
There was a problem hiding this comment.
Same reasoning as my reply for add(...): named terminal methods like bitPack() keep callers from depending on any StageSpec type directly. The builder is the only public contract.
| public LongBuilder delta() { | ||
| specs.add(new StageSpec.DeltaStage()); | ||
| return this; | ||
| } | ||
|
|
||
| public LongBuilder offset() { | ||
| specs.add(new StageSpec.OffsetStage()); | ||
| return this; | ||
| } | ||
|
|
||
| public LongBuilder gcd() { | ||
| specs.add(new StageSpec.GcdStage()); | ||
| return this; | ||
| } | ||
|
|
||
| public LongBuilder patchedPFor() { | ||
| specs.add(new StageSpec.PatchedPForStage()); | ||
| return this; | ||
| } | ||
|
|
||
| public LongBuilder xor() { | ||
| specs.add(new StageSpec.XorStage()); | ||
| return this; | ||
| } |
There was a problem hiding this comment.
Maybe add one add(...) method that accepts a stage? That way we can reduce the number of methods here.
There was a problem hiding this comment.
I kept named transform methods (delta(), offset(), gcd()) because when we add type-specific transforms later the compiler can enforce they only appear on the right builder. For instance we don't want users to use ALP or CHIMP for integer encoding. This way we ensure the codec construction is safe and reduce the likelihood for mistakes. With 3 transforms currently the boilerplate is minimal.
For example, ALP is a double-specific transform. With named methods on typed builders, a caller physically cannot apply ALP to an integer pipeline...code won't compile. A generic add(TransformSpec) would instead accept new AlpStage() on a LongBuilder since both are just TransformSpec, silently producing an invalid pipeline that only fails at runtime. So I prefer the builder to restrict construction via the type system. Detecting these mismatches at runtime is much harder and could easily slip into production unnoticed.
There was a problem hiding this comment.
Also, named methods keep the concrete StageSpec record types out of the caller's API. Callers just write .delta() without importing or constructing specific records. With a generic add(TransformSpec) every call site would need new StageSpec.DeltaStage(), leaking internal types into the public surface. This might result in unwanted dependencies for consumers of the API. Beyond the type-safety argument: named methods keep the mapping layer and codec layer dependency-free. The mapping side just expresses intent (.delta().offset().gcd().bitPack()) without importing or constructing codec-internal types.
| public PipelineConfig bitPack() { | ||
| specs.add(new StageSpec.BitPackPayload()); | ||
| return new PipelineConfig(PipelineDescriptor.DataType.LONG, blockSize, specs); | ||
| } | ||
|
|
||
| public PipelineConfig zstd() { | ||
| specs.add(new StageSpec.ZstdPayload()); | ||
| return new PipelineConfig(PipelineDescriptor.DataType.LONG, blockSize, specs); | ||
| } | ||
|
|
||
| public PipelineConfig zstd(int compressionLevel) { | ||
| specs.add(new StageSpec.ZstdPayload(compressionLevel)); | ||
| return new PipelineConfig(PipelineDescriptor.DataType.LONG, blockSize, specs); | ||
| } | ||
|
|
||
| public PipelineConfig lz4() { | ||
| specs.add(new StageSpec.Lz4Payload()); | ||
| return new PipelineConfig(PipelineDescriptor.DataType.LONG, blockSize, specs); | ||
| } | ||
|
|
||
| public PipelineConfig lz4HighCompression() { | ||
| specs.add(new StageSpec.Lz4Payload(true)); | ||
| return new PipelineConfig(PipelineDescriptor.DataType.LONG, blockSize, specs); |
There was a problem hiding this comment.
These are like build() methods, right? Based on my previous comment about final stage specs, let's have one build() method here?
server/src/main/java/org/elasticsearch/index/codec/tsdb/pipeline/BlockFormat.java
Show resolved
Hide resolved
Remove transform and payload stages not needed for ES819 codec parity: PatchedPFor, Xor, ALP, FPC, Zstd, Lz4, Gorilla, Chimp, Chimp128. Retain only delta, offset, gcd, and bitpack. Additional stages will be introduced in subsequent PRs alongside their implementations.
Split StageSpec into TransformSpec (chainable transforms) and PayloadSpec (terminal payload stages) for internal type safety.
|
Important Review skippedAuto reviews are limited based on label configuration. 🏷️ Required labels (at least one) (2)
Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
martijnvg
left a comment
There was a problem hiding this comment.
Thanks Salvatore, some minor comments, LGTM otherwise.
| * | ||
| * @return the pipeline configuration | ||
| */ | ||
| public PipelineConfig bitPack() { |
There was a problem hiding this comment.
Maybe rename to buildUsingBitpack() (or something else) to indicate that this is terminal payload and not other transform stages can be added?
| /** | ||
| * Writes stage metadata values to a buffer during encoding. Supports method chaining. | ||
| */ | ||
| public interface MetadataWriter { |
There was a problem hiding this comment.
I see, I agree that this an minimal interface. Can you add your explanation of why you chose not to rely on e.g. DataOutput as class level java doc?
| * @param pipelineLength the number of stages in the pipeline | ||
| * @param metadataCapacity the initial metadata buffer capacity in bytes | ||
| */ | ||
| EncodingContext(int blockSize, int pipelineLength, int metadataCapacity) { |
There was a problem hiding this comment.
This constructor is unused, maybe remove it?
…ning design rationale Explains why we use dedicated interfaces instead of Lucene's DataOutput/DataInput or Elasticsearch's StreamOutput/StreamInput: the block layout places metadata after the payload on disk, requiring encode-side buffering to give the decoder a clean single forward pass with no seeking or buffering.
Summary
This PR introduces the foundation of the
ES94Deepstore Pipeline Codec, which replaces the monolithicES819encoding with a composable pipeline of transform and payload stages. It inlcudes the type system, wire format, metadata I/O, block format, and context objects. No concrete stage implementations are included; those will land in subsequent PRs that build on this base.What's included
Type system
StageIdStageSpecmaxError)PipelineDescriptor[stageCount][blockShift][dataType][stageIds]FieldDescriptorPipelineDescriptorfor forward compatibilityPipelineConfigEncode/decode infrastructure
BlockFormat[bitmap][payload][stage metadata]- designed for single-pass sequential decodingEncodingContextclear()DecodingContextDataInputMetadataBufferMetadataWriter/MetadataReaderPayloadEncoder/PayloadDecoderDesign highlights
EncodingContext.writeStageMetadataflushes in reverse order so the decoder reads sequentially with no seeking or bufferingEncodingContext/DecodingContextare cleared and reused per block to avoid per-block GC pressureStageIdbyte assignments are declared upfront as the encoder/decoder contract;FieldDescriptoradds a version byte for future format evolutionTesting