Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod floats;
pub use floats::*;
pub mod cast;
pub mod list;
pub mod memory;

type Result<T> = std::result::Result<T, ArrowError>;

Expand Down
91 changes: 91 additions & 0 deletions rust/lance-arrow/src/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashSet;

use arrow_array::{Array, RecordBatch};
use arrow_data::ArrayData;

/// Counts memory used by buffers of Arrow arrays and RecordBatches.
///
/// This is meant to capture how much memory is being used by the Arrow data
/// structures as they are. It does not represent the memory used if the data
/// were to be serialized and then deserialized. In particular:
///
/// * This does not double count memory used by buffers shared by multiple
/// arrays or batches. Round-tripped data may use more memory because of this.
/// * This counts the **total** size of the buffers, even if the array is a slice.
/// Round-tripped data may use less memory because of this.
#[derive(Default)]
pub struct MemoryAccumulator {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation to use this instead of get_array_memory_size? Is it because you are worried about shared buffers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, shared / sliced buffers. Worried we or the user might have something that sliced the input data into smaller batches. Naively using get_array_memory_size will double count in those cases, as SaintBacchus found while attempting this PR: #3435 (comment)

seen: HashSet<usize>,
total: usize,
}

impl MemoryAccumulator {
pub fn record_array(&mut self, array: &dyn Array) {
let data = array.to_data();
self.record_array_data(&data);
}

fn record_array_data(&mut self, data: &ArrayData) {
for buffer in data.buffers() {
let ptr = buffer.as_ptr();
if self.seen.insert(ptr as usize) {
self.total += buffer.capacity();
}
}

if let Some(nulls) = data.nulls() {
let null_buf = nulls.inner().inner();
let ptr = null_buf.as_ptr();
if self.seen.insert(ptr as usize) {
self.total += null_buf.capacity();
}
}

for child in data.child_data() {
self.record_array_data(child);
}
}

pub fn record_batch(&mut self, batch: &RecordBatch) {
for array in batch.columns() {
self.record_array(array);
}
}

pub fn total(&self) -> usize {
self.total
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_array::Int32Array;
use arrow_schema::{DataType, Field, Schema};

use super::*;

#[test]
fn test_memory_accumulator() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let slice = batch.slice(1, 2);

let mut acc = MemoryAccumulator::default();

// Should record whole buffer, not just slice
acc.record_batch(&slice);
assert_eq!(acc.total(), 3 * std::mem::size_of::<i32>());

// Should not double count
acc.record_batch(&slice);
assert_eq!(acc.total(), 3 * std::mem::size_of::<i32>());
}
}
8 changes: 8 additions & 0 deletions rust/lance-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ pub enum Error {
source: BoxedError,
location: Location,
},
#[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
RetryableCommitConflict {
version: u64,
source: BoxedError,
location: Location,
},
#[snafu(display("Too many concurrent writers. {message}, {location}"))]
TooMuchWriteContention { message: String, location: Location },
#[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}, {location}"))]
Internal { message: String, location: Location },
#[snafu(display("A prerequisite task failed: {message}, {location}"))]
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub mod address;
pub mod backoff;
pub mod bit;
pub mod cpu;
pub mod deletion;
Expand Down
92 changes: 92 additions & 0 deletions rust/lance-core/src/utils/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use rand::Rng;
use std::time::Duration;

// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

/// Computes backoff as
///
/// ```text
/// backoff = base^attempt * unit + jitter
/// ```
///
/// The defaults are base=2, unit=50ms, jitter=50ms, min=0ms, max=5s. This gives
/// a backoff of 50ms, 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, 5s, (not including jitter).
///
/// You can have non-exponential backoff by setting base=1.
pub struct Backoff {
base: u32,
unit: u32,
jitter: i32,
min: u32,
max: u32,
attempt: u32,
}

impl Default for Backoff {
fn default() -> Self {
Self {
base: 2,
unit: 50,
jitter: 50,
min: 0,
max: 5000,
attempt: 0,
}
}
}

impl Backoff {
pub fn with_base(self, base: u32) -> Self {
Self { base, ..self }
}

pub fn with_jitter(self, jitter: i32) -> Self {
Self { jitter, ..self }
}

pub fn with_min(self, min: u32) -> Self {
Self { min, ..self }
}

pub fn with_max(self, max: u32) -> Self {
Self { max, ..self }
}

pub fn next_backoff(&mut self) -> Duration {
let backoff = self
.base
.saturating_pow(self.attempt)
.saturating_mul(self.unit);
let jitter = rand::thread_rng().gen_range(-self.jitter..=self.jitter);
let backoff = (backoff.saturating_add_signed(jitter)).clamp(self.min, self.max);
self.attempt += 1;
Duration::from_millis(backoff as u64)
}

pub fn attempt(&self) -> u32 {
self.attempt
}

pub fn reset(&mut self) {
self.attempt = 0;
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_backoff() {
let mut backoff = Backoff::default().with_jitter(0);
assert_eq!(backoff.next_backoff().as_millis(), 50);
assert_eq!(backoff.attempt(), 1);
assert_eq!(backoff.next_backoff().as_millis(), 100);
assert_eq!(backoff.attempt(), 2);
assert_eq!(backoff.next_backoff().as_millis(), 200);
assert_eq!(backoff.attempt(), 3);
assert_eq!(backoff.next_backoff().as_millis(), 400);
assert_eq!(backoff.attempt(), 4);
}
}
2 changes: 2 additions & 0 deletions rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ lance-core = { workspace = true, features = ["datafusion"] }
lance-datagen.workspace = true
lazy_static.workspace = true
log.workspace = true
pin-project.workspace = true
prost.workspace = true
snafu.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true

Expand Down
1 change: 1 addition & 0 deletions rust/lance-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod expr;
pub mod logical_expr;
pub mod planner;
pub mod projection;
pub mod spill;
pub mod sql;
#[cfg(feature = "substrait")]
pub mod substrait;
Expand Down
Loading
Loading