Skip to content

feat(blob_v2): add ref_id deduplication (Plan A — input-only hint)#6600

Open
DanielMao1 wants to merge 1 commit intolance-format:mainfrom
fecet:feat/blob-v2-ref-id-dedup
Open

feat(blob_v2): add ref_id deduplication (Plan A — input-only hint)#6600
DanielMao1 wants to merge 1 commit intolance-format:mainfrom
fecet:feat/blob-v2-ref-id-dedup

Conversation

@DanielMao1
Copy link
Copy Markdown

@DanielMao1 DanielMao1 commented Apr 23, 2026

feat(blob_v2): add ref_id deduplication across Inline / Packed / Dedicated

Zero on-disk format change. ref_id is a write-time input hint only —
consumed by the preprocessor and encoder, then dropped before any byte touches
disk. On-disk Blob v2 descriptor remains exactly the 5 fields it has today.

Motivation

Blob v2 today assumes 1 row = 1 blob. Every row owns its own bytes. There
is no API, no descriptor field, no internal cache to say "these rows reuse
that row's blob."

But real multimodal / time-series workloads routinely align rows at different
logical frequencies into a single table, where many rows reference the same
underlying object
:

Low-frequency (shared) High-frequency (per-row)
Reference image / keyframe / video GOP / packaged archive (tar, zip) per-frame label, bbox, annotation
LiDAR scan per-object detection
Video embedding caption / QA pair
RL observation action / reward

Without format-level dedup, the current options are all bad:

Approach Storage Query Programming model
Upsample (repeat low-freq to match high-freq) 10–1000× waste fast simple
Downsample compact fast loses data
Two tables + JOIN compact slow (runtime reassembly) breaks columnar scans
Nested list column compact medium awkward indexing
Shared blob (this PR) compact fast every row stays whole

Concrete impact

Today — 20 rows carrying the same 6 MB payload produce 20 independent
sidecar files totaling 120 MB. The 120 MB is purely duplicate bytes.

With this PR — the same 20 rows produce 1 sidecar file of 6 MB. The 20
rows' descriptors all share one blob_id; read-back returns byte-identical
payloads per row.

Verified savings (python/test_ref_id_dedup.py, 20 rows, single ref_id):

inline_32kb   (payload 32 KB,   20 rows):  1.05× amplification  [✓ DEDUP]
packed_1mb    (payload 1 MB,    20 rows):  1.00× amplification  [✓ DEDUP, 1 sidecar]
dedicated_6mb (payload 6 MB,    20 rows):  1.00× amplification  [✓ DEDUP, 1 sidecar]

For real training workloads where 8 labels reference 1 GOP (video column),
expected savings on the video column are ~8×, which typically dominates
dataset size.


User-facing API

The only user-visible change is one new optional field on Blob:

from lance.blob import Blob, blob_array, blob_field
import lance, numpy as np, pyarrow as pa

shared_bytes = np.random.default_rng(42).integers(
    0, 256, size=(1450, 1450, 3), dtype=np.uint8
).tobytes()                                     # ~6 MB

# Every row carries ref_id=42 -> Lance dedups on write.
blobs = blob_array([Blob(data=shared_bytes, ref_id=42) for _ in range(20)])

batch = pa.RecordBatch.from_arrays(
    [pa.array(range(20), type=pa.int64()), blobs],
    schema=pa.schema([pa.field("row_idx", pa.int64()), blob_field("payload")]),
)

lance.write_dataset(batch, "ds.lance", mode="create", data_storage_version="2.2")

ref_id = None or 0 (the default) means "no sharing" — identical to pre-PR
behavior. Existing code without ref_id is unaffected.

On disk, the 20 rows share a single sidecar file:

ds.lance/
└── data/
    ├── <stem>.lance                    # 20 rows × descriptors, all same blob_id
    └── <stem>/
        └── blob_1.blob                 # 6 MB, single physical copy
    total sidecar storage: 6 MB         # 20× reduction vs. today's 120 MB

Read-back is invisible to readers — every row sees the full 6 MB payload;
only the disk layout differs.


Design: where dedup happens

The PR hooks into two existing layers; it does not add a new pipeline
stage. Each hook consults one in-memory cache keyed by ref_id, then updates
it. The cache lives for exactly one fragment's write and is dropped at
finalization.

flowchart TD
    P["<b>Python</b><br/>20 × Blob(data=bytes, ref_id=42)<br/>lance.write_dataset(batch, ...)"]
    P -->|PyO3| R

    R["<b>Rust orchestration</b><br/>Dataset::write → InsertBuilder::execute_stream<br/>→ write_fragments_internal <i>(gate: version ≥ 2.2)</i><br/>→ do_write_fragments"]
    R --> V

    V["V2WriterAdapter::write<br/><i>one BlobPreprocessor per fragment</i>"]
    V --> H1

    H1(["<b>Hook 1 · BlobPreprocessor::preprocess_batch</b><br/>5-field Struct ──▶ 7-field Struct<br/>Packed / Dedicated dedup<br/><i>via ref_id_sidecar_cache</i>"])
    H1 --> F

    F["FileWriter::write_batch<br/>encode_batch — per-column dispatch"]
    F --> H2

    H2(["<b>Hook 2 · BlobV2StructuralEncoder::maybe_encode</b><br/>7-field Struct ──▶ 5-field descriptor<br/>Inline dedup<br/><i>via ref_dedup_tmp_map</i>"])
    H2 --> D

    D[("<b>On disk</b><br/>kind · position · size · blob_id · blob_uri<br/><i>ref_id not persisted</i>")]

    classDef hook fill:#fff3e0,stroke:#e65100,stroke-width:2.5px,color:#3e2723
    classDef normal fill:#fafafa,stroke:#9e9e9e,color:#212121
    classDef disk fill:#e0f2f1,stroke:#00695c,stroke-width:2px,color:#004d40

    class H1,H2 hook
    class P,R,V,F normal
    class D disk
Loading

The two amber pill nodes are the only new decision points. ref_id enters
the pipeline with the user's input StructArray, flows through the in-memory
7-field intermediate struct between preprocessor and encoder, and exits at
Hook 2 when the 5-field descriptor is constructed for disk.

Where the dedup decision happens

Hook 1 — BlobPreprocessor::preprocess_batch (rust/lance/src/dataset/blob.rs)

Owns a per-fragment cache covering the two preprocessor-written paths:

pub struct BlobPreprocessor {
    object_store: ObjectStore,
    local_counter: u32,                                // allocates blob_id
    pack_writer: PackWriter,                           // Packed rolling sidecar
    ...
    ref_id_sidecar_cache: HashMap<u32, SidecarRef>,    // <-- new in this PR
}

#[derive(Clone, Copy)]
enum SidecarRef {
    Dedicated { blob_id: u32, size: u64 },
    Packed    { blob_id: u32, position: u64, size: u64 },
}

Row-level loop consults the cache before routing by size:

let ref_id = ref_id_col.as_ref()
    .filter(|c| !c.is_null(i)).map(|c| c.value(i)).unwrap_or(0);

if ref_id > 0 {
    if let Some(cached) = self.ref_id_sidecar_cache.get(&ref_id).copied() {
        match cached {
            SidecarRef::Dedicated { blob_id, size } => {
                kind_builder.append_value(BlobKind::Dedicated as u8);
                blob_id_builder.append_value(blob_id);
                blob_size_builder.append_value(size);
                continue;                              // reuse; no write_dedicated
            }
            SidecarRef::Packed { blob_id, position, size } => { /* analogous */ }
        }
    }
}

// miss: normal write path + cache.insert
let blob_id = self.next_blob_id();
self.write_dedicated(blob_id, BlobWriteSource::Bytes(bytes)).await?;
if ref_id > 0 {
    self.ref_id_sidecar_cache.insert(
        ref_id, SidecarRef::Dedicated { blob_id, size: data_len as u64 },
    );
}

For the example (20 rows, ref_id=42, 6 MB each): row 0 misses and writes one
Dedicated sidecar file; rows 1..19 hit and reuse the cached (blob_id, size)
without any additional I/O.

Hook 2 — BlobV2StructuralEncoder::maybe_encode (rust/lance-encoding/src/encodings/logical/blob.rs)

Owns a symmetric cache for Inline, because only the encoder knows the
out-of-line buffer offset:

pub struct BlobV2StructuralEncoder {
    descriptor_encoder: Box<dyn FieldEncoder>,
    ref_dedup_tmp_map: HashMap<u32, (u64, u64)>,   // ref_id -> (position, size)
}

BlobKind::Inline => {
    if ref_id > 0 {
        if let Some(&(pos, sz)) = self.ref_dedup_tmp_map.get(&ref_id) {
            (pos, sz)                                   // reuse, no add_buffer
        } else {
            let pos = external_buffers.add_buffer(bytes);
            self.ref_dedup_tmp_map.insert(ref_id, (pos, bytes.len() as u64));
            (pos, bytes.len() as u64)
        }
    } else { /* unconditional add_buffer */ }
}

The two caches partition the problem cleanly:

Cache Owner Covers
ref_id_sidecar_cache BlobPreprocessor Packed + Dedicated (sidecar files)
ref_dedup_tmp_map BlobV2StructuralEncoder Inline (main-file out-of-line buffer)

What lands on disk

BLOB_V2_DESC_FIELDS is unchanged from upstream:

pub static BLOB_V2_DESC_FIELDS: LazyLock<Fields> = LazyLock::new(|| {
    Fields::from(vec![
        ArrowField::new("kind", DataType::UInt8, false),
        ArrowField::new("position", DataType::UInt64, false),
        ArrowField::new("size", DataType::UInt64, false),
        ArrowField::new("blob_id", DataType::UInt32, false),
        ArrowField::new("blob_uri", DataType::Utf8, false),
    ])
});

The encoder constructs the descriptor StructArray with exactly 5 children;
ref_id is read in Hook 2 for cache lookup but intentionally not appended
to the children vector.

Verified via LanceFileReader.metadata().schema on a dataset written by this
PR:

kind:     uint8 not null
position: uint64 not null
size:     uint64 not null
blob_id:  uint32 not null
blob_uri: string not null

Byte-identical structure to any dataset written by upstream.


Non-intrusiveness

Readers: zero change required

Old readers see 20 rows with the same blob_id in the same 5-field
descriptor. Each row independently resolves blob_id=1 to the same sidecar
file — 20 reads, 20 correct byte payloads. No coordination needed. No
schema version bump.

Compaction / GC: unaffected

  • GC (feat(blob_v2): add GC support #5473 data-file-bound GC): sidecars are collected when their owning
    data file is gone. Shared rows all live in one data file, so deleting any
    subset leaves the fragment alive; deleting the whole fragment triggers the
    blob's GC. No changes.
  • Compaction operates on descriptors. Rewriting 20 rows sharing blob_id=1
    will either preserve the sharing (if the compactor treats equal blob_id
    as a single blob) or produce 20 independent blob_ids (regressing to
    upstream behavior). Correctness is preserved either way; only storage
    efficiency may regress. No compactor change needed for this PR.

Old files work unchanged

The schema validator in blob_version_from_descriptions accepts the 5-field
V2 descriptor exactly as upstream — this PR doesn't widen or narrow that
check. Files written by any prior Lance version continue to work.


Implementation footprint

 python/python/lance/blob.py                       |  14 +/-  5
 python/test_ref_id_dedup.py                       |  97 +  0    (new)
 rust/lance-encoding/src/decoder.rs                |  17 +/-  1
 rust/lance-encoding/src/encodings/logical/blob.rs |  60 +/-  9
 rust/lance/src/dataset/blob.rs                    |  97 +/-  3
 5 files changed, 285 insertions(+), 17 deletions(-)

Production code (excluding the 97-line test script): ~188 lines added.
All changes are additive: two HashMap fields, one SidecarRef enum, two
cache lookup/insert call sites, and the Python Blob.ref_id plumbing.


Testing

cd python
python test_ref_id_dedup.py

Expected output — all three size classes dedup, read-back is byte-identical:

=== inline_32kb (payload=32,768B, ref_id=101, rows=20) ===
  amplification:   1.05x   [✓ DEDUP]
  read-back OK:    True (1 unique contents out of 20 rows)

=== packed_1mb (payload=1,048,576B, ref_id=102, rows=20) ===
  amplification:   1.00x   [✓ DEDUP]
  read-back OK:    True (1 unique contents out of 20 rows)

=== dedicated_6mb (payload=6,291,456B, ref_id=103, rows=20) ===
  amplification:   1.00x   [✓ DEDUP]
  read-back OK:    True (1 unique contents out of 20 rows)

Explicitly not in scope

  • Content-hash auto-dedup. Users explicitly pick when to share.
  • Cross-write sharing. Caches live for one write_dataset invocation /
    one fragment. Cross-fragment / cross-write sharing is natural future work.
  • Post-write observability. SELECT ref_id, COUNT(*) GROUP BY ref_id is
    not possible because ref_id is not persisted. The discussion thread
    includes "Option B" (persist ref_id in the descriptor) as a candidate
    for users who need this.
  • Read-side fetch coalescing by ref_id. Lance's FileScheduler already
    merges adjacent reads to the same file, so the penalty is modest. Explicit
    ref_id-aware read caching can be layered on separately.

Built on

Multiple rows with the same positive `ref_id` share one physical blob.
`ref_id = 0` or null means no sharing (existing behavior — unchanged).

Plan A design: on-disk Blob v2 descriptor stays at 5 fields. `ref_id`
flows through the write-time pipeline as an in-memory input hint only:
  - BlobPreprocessor::preprocess_batch (Packed/Dedicated dedup cache)
  - BlobV2StructuralEncoder::maybe_encode (Inline dedup cache)
It is dropped before the descriptor is emitted to disk.

Benefits:
  - Zero on-disk format change; full compatibility with existing readers
  - User API is new but additive: Blob(data=, ref_id=42) + blob_array(...)
  - Dedup works across Inline (≤64KB), Packed (64KB–4MB), Dedicated (>4MB)

Trade-offs vs persisting ref_id in descriptor:
  - No post-write observability (cannot SELECT ref_id, COUNT(*) GROUP BY)
  - No compaction hint (future compactors must rehash or fall back)
These can be added later as a separate opt-in feature without breaking
the format.

Verification (test_ref_id_dedup.py, 20 rows × 1 ref_id):
  - inline_32kb:  1.05x amplification
  - packed_1mb:   1.00x amplification  (1 sidecar file)
  - dedicated_6mb: 1.00x amplification (1 sidecar file instead of 20)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@github-actions github-actions Bot added enhancement New feature or request python labels Apr 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant