Skip to content
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 40 additions & 6 deletions rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ type ArcAny = Arc<dyn Any + Send + Sync>;

#[derive(Clone)]
pub struct SizedRecord {
record: ArcAny,
size_accessor: Arc<dyn Fn(&ArcAny) -> usize + Send + Sync>,
pub record: ArcAny,
pub size_accessor: Arc<dyn Fn(&ArcAny) -> usize + Send + Sync>,
pub type_name: &'static str,
}

impl std::fmt::Debug for SizedRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SizedRecord")
.field("record", &self.record)
.field("type", &self.type_name)
.finish()
}
}
Expand All @@ -42,12 +43,14 @@ impl DeepSizeOf for SizedRecord {

impl SizedRecord {
fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self {
// Calculate size once and store it
// +8 for the size of the Arc pointer itself
let size_accessor =
|record: &ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() + 8 };
Self {
record,
size_accessor: Arc::new(size_accessor),
type_name: std::any::type_name::<T>(),
}
}
}
Expand Down Expand Up @@ -277,6 +280,13 @@ impl LanceCache {
self.misses.store(0, Ordering::Relaxed);
}

// For testing: get all entries in the cache
#[doc(hidden)]
pub async fn entries(&self) -> Vec<((String, TypeId), SizedRecord)> {
self.cache.run_pending_tasks().await;
self.cache.iter().map(|(k, v)| ((*k).clone(), v)).collect()
}

// CacheKey-based methods
pub async fn insert_with_key<K>(&self, cache_key: &K, metadata: Arc<K::ValueType>)
where
Expand Down Expand Up @@ -340,16 +350,40 @@ pub struct WeakLanceCache {
misses: Arc<AtomicU64>,
}

impl WeakLanceCache {
/// Create a weak reference from a strong LanceCache
pub fn from(cache: &LanceCache) -> Self {
impl From<&LanceCache> for WeakLanceCache {
fn from(cache: &LanceCache) -> Self {
Self {
inner: Arc::downgrade(&cache.cache),
prefix: cache.prefix.clone(),
hits: cache.hits.clone(),
misses: cache.misses.clone(),
}
}
}

impl From<LanceCache> for WeakLanceCache {
fn from(cache: LanceCache) -> Self {
Self {
inner: Arc::downgrade(&cache.cache),
prefix: cache.prefix,
hits: cache.hits,
misses: cache.misses,
}
}
}

impl WeakLanceCache {
pub fn upgrade(self) -> LanceCache {
let Some(cache) = self.inner.upgrade() else {
return LanceCache::no_cache();
};
LanceCache {
cache,
prefix: self.prefix,
hits: self.hits,
misses: self.misses,
}
}

/// Appends a prefix to the cache key
pub fn with_key_prefix(&self, prefix: &str) -> Self {
Expand Down
23 changes: 22 additions & 1 deletion rust/lance-datafusion/src/datagen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

use std::sync::Arc;

use arrow_array::RecordBatchReader;
use datafusion::{
execution::SendableRecordBatchStream,
physical_plan::{stream::RecordBatchStreamAdapter, ExecutionPlan},
};
use datafusion_common::DataFusionError;
use futures::TryStreamExt;
use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount};
use lance_core::Error;
use lance_datagen::{BatchCount, BatchGeneratorBuilder, ByteCount, RoundingBehavior, RowCount};

use crate::exec::OneShotExec;

Expand All @@ -20,6 +22,13 @@ pub trait DatafusionDatagenExt {
num_batches: BatchCount,
) -> SendableRecordBatchStream;

fn into_df_stream_bytes(
self,
batch_size: ByteCount,
num_batches: BatchCount,
rounding_behavior: RoundingBehavior,
) -> Result<SendableRecordBatchStream, Error>;

fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan>;
}

Expand All @@ -34,6 +43,18 @@ impl DatafusionDatagenExt for BatchGeneratorBuilder {
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}

fn into_df_stream_bytes(
self,
batch_size: ByteCount,
num_batches: BatchCount,
rounding_behavior: RoundingBehavior,
) -> Result<SendableRecordBatchStream, Error> {
let stream = self.into_reader_bytes(batch_size, num_batches, rounding_behavior)?;
let schema = stream.schema();
let stream = futures::stream::iter(stream).map_err(DataFusionError::from);
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan> {
let stream = self.into_df_stream(batch_size, num_batches);
Arc::new(OneShotExec::new(stream))
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ arrow-schema.workspace = true
arrow-select.workspace = true
lance-bitpacking = { workspace = true, optional = true }
bytes.workspace = true
deepsize.workspace = true
futures.workspace = true
fsst.workspace = true
hex = "0.4.3"
Expand Down
65 changes: 65 additions & 0 deletions rust/lance-encoding/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,71 @@ fn main() -> Result<()> {
prost_build.protoc_arg("--experimental_allow_proto3_optional");
prost_build.enable_type_names();
prost_build.bytes(["."]); // Enable Bytes type for all messages to avoid Vec clones.

// Implement DeepSizeOf so we can keep metadata in cache.
// Once https://github.com/nhtyy/deepsize2/pull/2 is merged and released,
// we can use that and just implement DeepSizeOf for `.`
for path in &[
"lance.encodings.ColumnEncoding",
"lance.encodings.Blob",
"lance.encodings.ZoneIndex",
"lance.encodings.ArrayEncoding",
"lance.encodings.Flat",
"lance.encodings.Nullable",
"lance.encodings.FixedSizeList",
"lance.encodings.List",
"lance.encodings.Struct",
"lance.encodings.Binary",
"lance.encodings.Dictionary",
"lance.encodings.PackedStruct",
"lance.encodings.SimpleStruct",
"lance.encodings.Bitpacked",
"lance.encodings.FixedSizeBinary",
"lance.encodings.BitpackedForNonNeg",
"lance.encodings.InlineBitpacking",
"lance.encodings.OutOfLineBitpacking",
"lance.encodings.Variable",
"lance.encodings.PackedStructFixedWidthMiniBlock",
"lance.encodings.Block",
"lance.encodings.Rle",
"lance.encodings.GeneralMiniBlock",
"lance.encodings.ByteStreamSplit",
"lance.encodings.Buffer",
"lance.encodings.Compression",
"lance.encodings.Nullable.NoNull",
"lance.encodings.Nullable.AllNull",
"lance.encodings.Nullable.SomeNull",
"lance.encodings21.MiniBlockLayout",
"lance.encodings21.CompressiveEncoding",
"lance.encodings21.FullZipLayout",
"lance.encodings21.AllNullLayout",
"lance.encodings21.BlobLayout",
"lance.encodings21.PageLayout",
"lance.encodings21.BufferCompression",
"lance.encodings21.Flat",
"lance.encodings21.Variable",
"lance.encodings21.OutOfLineBitpacking",
"lance.encodings21.InlineBitpacking",
"lance.encodings21.Dictionary",
"lance.encodings21.Rle",
"lance.encodings21.FixedSizeList",
"lance.encodings21.PackedStruct",
"lance.encodings21.General",
"lance.encodings21.ByteStreamSplit",
] {
prost_build.type_attribute(path, "#[derive(deepsize::DeepSizeOf)]");
}
for path in &[
"lance.encodings.ArrayEncoding.array_encoding",
"lance.encodings.ColumnEncoding.column_encoding",
"lance.encodings.Nullable.nullability",
"lance.encodings21.FullZipLayout.details",
"lance.encodings21.PageLayout.layout",
"lance.encodings21.CompressiveEncoding.compression",
] {
prost_build.enum_attribute(path, "#[derive(deepsize::DeepSizeOf)]");
}

prost_build.compile_protos(&["./protos/encodings_v2_0.proto"], &["./protos"])?;
prost_build.compile_protos(&["./protos/encodings_v2_1.proto"], &["./protos"])?;

Expand Down
33 changes: 29 additions & 4 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ use futures::future::{maybe_done, BoxFuture, MaybeDone};
use futures::stream::{self, BoxStream};
use futures::{FutureExt, StreamExt};
use lance_arrow::DataTypeExt;
use lance_core::cache::LanceCache;
use lance_core::cache::{DeepSizeOf, LanceCache};
use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
use log::{debug, trace, warn};
use snafu::location;
Expand Down Expand Up @@ -262,12 +262,37 @@ const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
/// A file should only use one or the other and never both.
/// 2.0 decoders can always assume this is pb::ArrayEncoding
/// and 2.1+ decoders can always assume this is pb::PageLayout
#[derive(Debug)]
#[derive(Debug, DeepSizeOf)]
pub enum PageEncoding {
Legacy(pb::ArrayEncoding),
Structural(pb21::PageLayout),
}

// Implement these manually because there isn't yet an implementation for bytes::Bytes
impl DeepSizeOf for pb::Fsst {
fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
self.symbol_table.len()
}
}

impl DeepSizeOf for pb::Constant {
fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
self.value.len()
}
}

impl DeepSizeOf for pb21::Fsst {
fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
self.symbol_table.len()
}
}

impl DeepSizeOf for pb21::Constant {
fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize {
self.value.as_ref().map(|v| v.len()).unwrap_or(0)
}
}

impl PageEncoding {
pub fn as_legacy(&self) -> &pb::ArrayEncoding {
match self {
Expand All @@ -291,7 +316,7 @@ impl PageEncoding {
/// Metadata describing a page in a file
///
/// This is typically created by reading the metadata section of a Lance file
#[derive(Debug)]
#[derive(Debug, DeepSizeOf)]
pub struct PageInfo {
/// The number of rows in the page
pub num_rows: u64,
Expand All @@ -308,7 +333,7 @@ pub struct PageInfo {
/// Metadata describing a column in a file
///
/// This is typically created by reading the metadata section of a Lance file
#[derive(Debug, Clone)]
#[derive(Debug, Clone, DeepSizeOf)]
pub struct ColumnInfo {
/// The index of the column in the file
pub index: u32,
Expand Down
Loading
Loading