diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index cb67d62d13f..7352f4a1e40 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -53,6 +53,7 @@ Compaction, CompactionMetrics, DatasetBasePath, + IOStats, LanceSchema, ScanStatistics, _Dataset, @@ -1345,6 +1346,87 @@ def get_fragment(self, fragment_id: int) -> Optional[LanceFragment]: return None return LanceFragment(self, fragment_id=None, fragment=raw_fragment) + def io_stats_snapshot(self) -> IOStats: + """ + Get a snapshot of current IO statistics without resetting counters. + + Returns the current IO statistics without modifying the internal state. + Use this when you need to check stats without resetting them. Multiple + calls will return the same values until IO operations are performed. + + Returns + ------- + IOStats + Object containing IO statistics with the following attributes: + - read_iops: Number of read operations + - read_bytes: Total bytes read + - write_iops: Number of write operations + - write_bytes: Total bytes written + - num_hops: Number of network hops (for remote storage) + + Examples + -------- + >>> import lance + >>> import pyarrow as pa + >>> data = pa.table({"x": [1, 2, 3]}) + >>> dataset = lance.write_dataset(data, "memory://test_stats") + >>> result = dataset.to_table() + >>> # Check stats without resetting + >>> stats = dataset.io_stats_snapshot() + >>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations") + Read ... bytes in ... operations + >>> # Can check again and see the same values + >>> stats2 = dataset.io_stats_snapshot() + >>> assert stats.read_bytes == stats2.read_bytes + + See Also + -------- + io_stats_incremental : Get stats and reset counters for incremental tracking + """ + return self._ds.io_stats_snapshot() + + def io_stats_incremental(self) -> IOStats: + """ + Get incremental IO statistics and reset the counters. + + Returns IO statistics (number of operations and bytes) since the last + time this method was called, then resets the internal counters to zero. + This is useful for tracking IO operations between different stages of + processing. + + Returns + ------- + IOStats + Object containing IO statistics with the following attributes: + - read_iops: Number of read operations + - read_bytes: Total bytes read + - write_iops: Number of write operations + - write_bytes: Total bytes written + - num_hops: Number of network hops (for remote storage) + + Examples + -------- + >>> import lance + >>> import pyarrow as pa + >>> data = pa.table({"x": [1, 2, 3]}) + >>> dataset = lance.write_dataset(data, "memory://test_stats") + >>> result = dataset.to_table() + >>> # Get incremental stats (and reset) + >>> stats = dataset.io_stats_incremental() + >>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations") + Read ... bytes in ... operations + >>> # Next call returns only new stats since last call + >>> more_data = dataset.to_table() + >>> stats2 = dataset.io_stats_incremental() + >>> print(f"Read {stats2.read_bytes} more bytes") + Read ... more bytes + + See Also + -------- + io_stats_snapshot : Get stats without resetting counters + """ + return self._ds.io_stats_incremental() + def to_batches( self, columns: Optional[Union[List[str], Dict[str, str]]] = None, diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 7f80766dfba..b429913e7f7 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -96,10 +96,12 @@ use crate::{LanceReader, Scanner}; use self::cleanup::CleanupStats; use self::commit::PyCommitLock; +use self::io_stats::IoStats; pub mod blob; pub mod cleanup; pub mod commit; +pub mod io_stats; pub mod optimize; pub mod stats; @@ -2045,6 +2047,18 @@ impl Dataset { Session::new(self.ds.session()) } + /// Get a snapshot of current IO statistics without resetting counters + fn io_stats_snapshot(&self) -> IoStats { + let stats = self.ds.object_store().io_stats_snapshot(); + IoStats::from_lance(stats) + } + + /// Get incremental IO statistics for this dataset + fn io_stats_incremental(&self) -> IoStats { + let stats = self.ds.object_store().io_stats_incremental(); + IoStats::from_lance(stats) + } + #[staticmethod] #[pyo3(signature = (dest, storage_options = None, ignore_not_found = None))] fn drop( diff --git a/python/src/dataset/io_stats.rs b/python/src/dataset/io_stats.rs new file mode 100644 index 00000000000..fd6f10513c3 --- /dev/null +++ b/python/src/dataset/io_stats.rs @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! IO statistics tracking for dataset operations + +use pyo3::{pyclass, pymethods}; + +/// IO statistics for dataset operations +/// +/// This tracks the number of IO operations and bytes transferred for read and write +/// operations performed on the dataset's object store. +/// +/// Note: Calling `io_stats()` returns the statistics accumulated since the last call +/// and resets the internal counters (incremental stats pattern). +#[pyclass(name = "IOStats", module = "_lib", get_all)] +#[derive(Clone, Debug)] +pub struct IoStats { + /// Number of read IO operations performed + pub read_iops: u64, + /// Total bytes read from storage + pub read_bytes: u64, + /// Number of write IO operations performed + pub write_iops: u64, + /// Total bytes written to storage + pub written_bytes: u64, +} + +#[pymethods] +impl IoStats { + fn __repr__(&self) -> String { + format!( + "IOStats(read_iops={}, read_bytes={}, write_iops={}, write_bytes={})", + self.read_iops, self.read_bytes, self.write_iops, self.written_bytes + ) + } +} + +impl IoStats { + /// Convert from Lance's internal IoStats type + pub fn from_lance(stats: lance_io::utils::tracking_store::IoStats) -> Self { + Self { + read_iops: stats.read_iops, + read_bytes: stats.read_bytes, + write_iops: stats.write_iops, + written_bytes: stats.written_bytes, + } + } +} diff --git a/python/src/lib.rs b/python/src/lib.rs index 8d8196bd75d..7b85e06fe69 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -44,6 +44,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider; use datagen::register_datagen; use dataset::blob::LanceBlobFile; use dataset::cleanup::CleanupStats; +use dataset::io_stats::IoStats; use dataset::optimize::{ PyCompaction, PyCompactionMetrics, PyCompactionPlan, PyCompactionTask, PyRewriteResult, }; @@ -258,6 +259,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index 06accbac6a4..3a5ddfefa44 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -69,6 +69,7 @@ gcp = ["object_store/gcp", "dep:opendal", "opendal/services-gcs", "dep:object_st aws = ["object_store/aws", "dep:aws-config", "dep:aws-credential-types", "dep:opendal", "opendal/services-s3", "dep:object_store_opendal"] azure = ["object_store/azure", "dep:opendal", "opendal/services-azblob", "dep:object_store_opendal"] oss = ["dep:opendal", "opendal/services-oss", "dep:object_store_opendal"] +test-util = [] [lints] workspace = true diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index a882d49ecda..3d6b6c21881 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -26,6 +26,7 @@ use tracing::instrument; use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM; use crate::traits::{Reader, Writer}; +use crate::utils::tracking_store::IOTracker; /// Convert an [`object_store::path::Path`] to a [`std::path::Path`]. pub fn to_local_path(path: &Path) -> String { @@ -86,6 +87,9 @@ pub struct LocalObjectReader { /// Block size, in bytes. block_size: usize, + + /// IO tracker for monitoring read operations. + io_tracker: Arc, } impl DeepSizeOf for LocalObjectReader { @@ -107,11 +111,24 @@ impl LocalObjectReader { } /// Open a local object reader, with default prefetch size. + /// + /// For backward compatibility with existing code that doesn't need tracking. #[instrument(level = "debug")] pub async fn open( path: &Path, block_size: usize, known_size: Option, + ) -> Result> { + Self::open_with_tracker(path, block_size, known_size, Default::default()).await + } + + /// Open a local object reader with optional IO tracking. + #[instrument(level = "debug")] + pub(crate) async fn open_with_tracker( + path: &Path, + block_size: usize, + known_size: Option, + io_tracker: Arc, ) -> Result> { let path = path.clone(); let local_path = to_local_path(&path); @@ -129,6 +146,7 @@ impl LocalObjectReader { block_size, size, path, + io_tracker, }) as Box) }) .await? @@ -171,7 +189,12 @@ impl Reader for LocalObjectReader { #[instrument(level = "debug", skip(self))] async fn get_range(&self, range: Range) -> object_store::Result { let file = self.file.clone(); - tokio::task::spawn_blocking(move || { + let io_tracker = self.io_tracker.clone(); + let path = self.path.clone(); + let num_bytes = range.len() as u64; + let range_u64 = (range.start as u64)..(range.end as u64); + + let result = tokio::task::spawn_blocking(move || { let mut buf = BytesMut::with_capacity(range.len()); // Safety: `buf` is set with appropriate capacity above. It is // written to below and we check all data is initialized at that point. @@ -187,14 +210,23 @@ impl Reader for LocalObjectReader { .map_err(|err: std::io::Error| object_store::Error::Generic { store: "LocalFileSystem", source: err.into(), - }) + }); + + if result.is_ok() { + io_tracker.record_read("get_range", path, num_bytes, Some(range_u64)); + } + + result } /// Reads the entire file. #[instrument(level = "debug", skip(self))] async fn get_all(&self) -> object_store::Result { let mut file = self.file.clone(); - tokio::task::spawn_blocking(move || { + let io_tracker = self.io_tracker.clone(); + let path = self.path.clone(); + + let result = tokio::task::spawn_blocking(move || { let mut buf = Vec::new(); file.read_to_end(buf.as_mut())?; Ok(Bytes::from(buf)) @@ -203,7 +235,13 @@ impl Reader for LocalObjectReader { .map_err(|err: std::io::Error| object_store::Error::Generic { store: "LocalFileSystem", source: err.into(), - }) + }); + + if let Ok(bytes) = &result { + io_tracker.record_read("get_all", path, bytes.len() as u64, None); + } + + result } } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 94967eb93b6..22ba273215b 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -38,6 +38,7 @@ pub mod storage_options; mod tracing; use crate::object_reader::SmallReader; use crate::object_writer::WriteResult; +use crate::utils::tracking_store::{IOTracker, IoStats}; use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader}; use lance_core::{Error, Result}; @@ -124,6 +125,8 @@ pub struct ObjectStore { io_parallelism: usize, /// Number of times to retry a failed download download_retry_count: usize, + /// IO tracker for monitoring read/write operations + io_tracker: IOTracker, } impl DeepSizeOf for ObjectStore { @@ -350,8 +353,13 @@ impl ObjectStore { if let Some(wrapper) = params.object_store_wrapper.as_ref() { inner = wrapper.wrap(&store_prefix, inner); } + + // Always wrap with IO tracking + let io_tracker = IOTracker::default(); + let tracked_store = io_tracker.wrap("", inner); + let store = Self { - inner, + inner: tracked_store, scheme: path.scheme().to_string(), block_size: params.block_size.unwrap_or(64 * 1024), max_iop_size: *DEFAULT_MAX_IOP_SIZE, @@ -359,6 +367,7 @@ impl ObjectStore { list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(), io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, + io_tracker, }; let path = Path::parse(path.path())?; return Ok((Arc::new(store), path)); @@ -451,13 +460,46 @@ impl ObjectStore { .unwrap_or(self.io_parallelism) } + /// Get the IO tracker for this object store + /// + /// The IO tracker can be used to get statistics about read/write operations + /// performed on this object store. + pub fn io_tracker(&self) -> &IOTracker { + &self.io_tracker + } + + /// Get a snapshot of current IO statistics without resetting counters + /// + /// Returns the current IO statistics without modifying the internal state. + /// Use this when you need to check stats without resetting them. + pub fn io_stats_snapshot(&self) -> IoStats { + self.io_tracker.stats() + } + + /// Get incremental IO statistics since the last call to this method + /// + /// Returns the accumulated statistics since the last call and resets the + /// counters to zero. This is useful for tracking IO operations between + /// different stages of processing. + pub fn io_stats_incremental(&self) -> IoStats { + self.io_tracker.incremental_stats() + } + /// Open a file for path. /// /// Parameters /// - ``path``: Absolute path to the file. pub async fn open(&self, path: &Path) -> Result> { match self.scheme.as_str() { - "file" => LocalObjectReader::open(path, self.block_size, None).await, + "file" => { + LocalObjectReader::open_with_tracker( + path, + self.block_size, + None, + Arc::new(self.io_tracker.clone()), + ) + .await + } _ => Ok(Box::new(CloudObjectReader::new( self.inner.clone(), path.clone(), @@ -486,7 +528,15 @@ impl ObjectStore { } match self.scheme.as_str() { - "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await, + "file" => { + LocalObjectReader::open_with_tracker( + path, + self.block_size, + Some(known_size), + Arc::new(self.io_tracker.clone()), + ) + .await + } _ => Ok(Box::new(CloudObjectReader::new( self.inner.clone(), path.clone(), @@ -757,8 +807,12 @@ impl ObjectStore { None => store, }; + // Always wrap with IO tracking + let io_tracker = IOTracker::default(); + let tracked_store = io_tracker.wrap("", store); + Self { - inner: store, + inner: tracked_store, scheme: scheme.into(), block_size, max_iop_size: *DEFAULT_MAX_IOP_SIZE, @@ -766,6 +820,7 @@ impl ObjectStore { list_is_lexically_ordered, io_parallelism, download_retry_count, + io_tracker, } } } diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 633017d4aeb..062f064642f 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -10,6 +10,8 @@ use object_store::path::Path; use snafu::location; use url::Url; +use crate::object_store::WrappingObjectStore; + use super::{tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams}; use lance_core::error::{Error, LanceOptionExt, Result}; @@ -210,6 +212,9 @@ impl ObjectStoreRegistry { store.inner = wrapper.wrap(&cache_path, store.inner); } + // Always wrap with IO tracking + store.inner = store.io_tracker.wrap("", store.inner); + let store = Arc::new(store); { diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 1883336feba..75cdab861ef 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -170,6 +170,7 @@ impl ObjectStoreProvider for AwsStoreProvider { list_is_lexically_ordered: !is_s3_express, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, + io_tracker: Default::default(), }) } } diff --git a/rust/lance-io/src/object_store/providers/azure.rs b/rust/lance-io/src/object_store/providers/azure.rs index ca65d267da7..c992cc491e4 100644 --- a/rust/lance-io/src/object_store/providers/azure.rs +++ b/rust/lance-io/src/object_store/providers/azure.rs @@ -122,6 +122,7 @@ impl ObjectStoreProvider for AzureBlobStoreProvider { list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, + io_tracker: Default::default(), }) } diff --git a/rust/lance-io/src/object_store/providers/gcp.rs b/rust/lance-io/src/object_store/providers/gcp.rs index 3d58a7d110d..038015d7f4e 100644 --- a/rust/lance-io/src/object_store/providers/gcp.rs +++ b/rust/lance-io/src/object_store/providers/gcp.rs @@ -123,6 +123,7 @@ impl ObjectStoreProvider for GcsStoreProvider { list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, + io_tracker: Default::default(), }) } } diff --git a/rust/lance-io/src/object_store/providers/local.rs b/rust/lance-io/src/object_store/providers/local.rs index 89bdfa5ab62..e0d98b4c66e 100644 --- a/rust/lance-io/src/object_store/providers/local.rs +++ b/rust/lance-io/src/object_store/providers/local.rs @@ -31,6 +31,7 @@ impl ObjectStoreProvider for FileStoreProvider { list_is_lexically_ordered: false, io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, download_retry_count, + io_tracker: Default::default(), }) } diff --git a/rust/lance-io/src/object_store/providers/memory.rs b/rust/lance-io/src/object_store/providers/memory.rs index b03baea104f..2ec9a4f5c33 100644 --- a/rust/lance-io/src/object_store/providers/memory.rs +++ b/rust/lance-io/src/object_store/providers/memory.rs @@ -30,6 +30,7 @@ impl ObjectStoreProvider for MemoryStoreProvider { list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, + io_tracker: Default::default(), }) } diff --git a/rust/lance-io/src/object_store/providers/oss.rs b/rust/lance-io/src/object_store/providers/oss.rs index 1e1f4a5c594..3437ec8d1b6 100644 --- a/rust/lance-io/src/object_store/providers/oss.rs +++ b/rust/lance-io/src/object_store/providers/oss.rs @@ -102,6 +102,7 @@ impl ObjectStoreProvider for OssStoreProvider { list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or(true), io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: storage_options.download_retry_count(), + io_tracker: Default::default(), }) } } diff --git a/rust/lance-io/src/utils/tracking_store.rs b/rust/lance-io/src/utils/tracking_store.rs index eebce400dcc..f1afb77990b 100644 --- a/rust/lance-io/src/utils/tracking_store.rs +++ b/rust/lance-io/src/utils/tracking_store.rs @@ -10,7 +10,9 @@ //! This modules provides [`IOTracker`] which can be used to wrap any object store. use std::fmt::{Display, Formatter}; use std::ops::Range; -use std::sync::{atomic::AtomicU16, Arc, Mutex}; +#[cfg(feature = "test-util")] +use std::sync::atomic::AtomicU16; +use std::sync::{Arc, Mutex}; use bytes::Bytes; use futures::stream::BoxStream; @@ -26,9 +28,43 @@ use crate::object_store::WrappingObjectStore; pub struct IOTracker(Arc>); impl IOTracker { + /// Get IO statistics and reset the counters (incremental pattern). + /// + /// This returns the accumulated statistics since the last call and resets + /// the internal counters to zero. pub fn incremental_stats(&self) -> IoStats { std::mem::take(&mut *self.0.lock().unwrap()) } + + /// Get a snapshot of current IO statistics without resetting counters. + /// + /// This returns a clone of the current statistics without modifying the + /// internal state. Use this when you need to check stats without resetting. + pub fn stats(&self) -> IoStats { + self.0.lock().unwrap().clone() + } + + /// Record a read operation for tracking. + /// + /// This is used by readers that bypass the ObjectStore layer (like LocalObjectReader) + /// to ensure their IO operations are still tracked. + pub fn record_read( + &self, + #[allow(unused_variables)] method: &'static str, + #[allow(unused_variables)] path: Path, + num_bytes: u64, + #[allow(unused_variables)] range: Option>, + ) { + let mut stats = self.0.lock().unwrap(); + stats.read_iops += 1; + stats.read_bytes += num_bytes; + #[cfg(feature = "test-util")] + stats.requests.push(IoRequestRecord { + method, + path, + range, + }); + } } impl WrappingObjectStore for IOTracker { @@ -37,14 +73,17 @@ impl WrappingObjectStore for IOTracker { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct IoStats { pub read_iops: u64, pub read_bytes: u64, pub write_iops: u64, - pub write_bytes: u64, + pub written_bytes: u64, + // This is only really meaningful in tests where there isn't any concurrent IO. + #[cfg(feature = "test-util")] /// Number of disjoint periods where at least one IO is in-flight. - pub num_hops: u64, + pub num_stages: u64, + #[cfg(feature = "test-util")] pub requests: Vec, } @@ -52,6 +91,7 @@ pub struct IoStats { /// assert_io_eq!(io_stats, read_iops, 1); /// assert_io_eq!(io_stats, write_iops, 0, "should be no writes"); /// assert_io_eq!(io_stats, num_hops, 1, "should be just {}", "one hop"); +#[cfg(feature = "test-util")] #[macro_export] macro_rules! assert_io_eq { ($io_stats:expr, $field:ident, $expected:expr) => { @@ -77,6 +117,7 @@ macro_rules! assert_io_eq { }; } +#[cfg(feature = "test-util")] #[macro_export] macro_rules! assert_io_gt { ($io_stats:expr, $field:ident, $expected:expr) => { @@ -102,6 +143,7 @@ macro_rules! assert_io_gt { }; } +#[cfg(feature = "test-util")] #[macro_export] macro_rules! assert_io_lt { ($io_stats:expr, $field:ident, $expected:expr) => { @@ -163,6 +205,7 @@ impl Display for IoStats { pub struct IoTrackingStore { target: Arc, stats: Arc>, + #[cfg(feature = "test-util")] active_requests: Arc, } @@ -173,10 +216,11 @@ impl Display for IoTrackingStore { } impl IoTrackingStore { - fn new(target: Arc, stats: Arc>) -> Self { + pub fn new(target: Arc, stats: Arc>) -> Self { Self { target, stats, + #[cfg(feature = "test-util")] active_requests: Arc::new(AtomicU16::new(0)), } } @@ -191,26 +235,38 @@ impl IoTrackingStore { let mut stats = self.stats.lock().unwrap(); stats.read_iops += 1; stats.read_bytes += num_bytes; + #[cfg(feature = "test-util")] stats.requests.push(IoRequestRecord { method, path, range, }); + #[cfg(not(feature = "test-util"))] + let _ = (method, path, range); // Suppress unused variable warnings } fn record_write(&self, method: &'static str, path: Path, num_bytes: u64) { let mut stats = self.stats.lock().unwrap(); stats.write_iops += 1; - stats.write_bytes += num_bytes; + stats.written_bytes += num_bytes; + #[cfg(feature = "test-util")] stats.requests.push(IoRequestRecord { method, path, range: None, }); + #[cfg(not(feature = "test-util"))] + let _ = (method, path); // Suppress unused variable warnings + } + + #[cfg(feature = "test-util")] + fn stage_guard(&self) -> StageGuard { + StageGuard::new(self.active_requests.clone(), self.stats.clone()) } - fn hop_guard(&self) -> HopGuard { - HopGuard::new(self.active_requests.clone(), self.stats.clone()) + #[cfg(not(feature = "test-util"))] + fn stage_guard(&self) -> StageGuard { + StageGuard } } @@ -218,7 +274,7 @@ impl IoTrackingStore { #[deny(clippy::missing_trait_methods)] impl ObjectStore for IoTrackingStore { async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_write("put", location.to_owned(), bytes.content_length() as u64); self.target.put(location, bytes).await } @@ -229,7 +285,7 @@ impl ObjectStore for IoTrackingStore { bytes: PutPayload, opts: PutOptions, ) -> OSResult { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_write( "put_opts", location.to_owned(), @@ -239,12 +295,14 @@ impl ObjectStore for IoTrackingStore { } async fn put_multipart(&self, location: &Path) -> OSResult> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); let target = self.target.put_multipart(location).await?; Ok(Box::new(IoTrackingMultipartUpload { target, stats: self.stats.clone(), + #[cfg(feature = "test-util")] path: location.to_owned(), + #[cfg(feature = "test-util")] _guard, })) } @@ -254,18 +312,20 @@ impl ObjectStore for IoTrackingStore { location: &Path, opts: PutMultipartOptions, ) -> OSResult> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); let target = self.target.put_multipart_opts(location, opts).await?; Ok(Box::new(IoTrackingMultipartUpload { target, stats: self.stats.clone(), + #[cfg(feature = "test-util")] path: location.to_owned(), + #[cfg(feature = "test-util")] _guard, })) } async fn get(&self, location: &Path) -> OSResult { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); let result = self.target.get(location).await; if let Ok(result) = &result { let num_bytes = result.range.end - result.range.start; @@ -275,7 +335,7 @@ impl ObjectStore for IoTrackingStore { } async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); let range = match &options.range { Some(GetRange::Bounded(range)) => Some(range.clone()), _ => None, // TODO: fill in other options. @@ -290,7 +350,7 @@ impl ObjectStore for IoTrackingStore { } async fn get_range(&self, location: &Path, range: Range) -> OSResult { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); let result = self.target.get_range(location, range.clone()).await; if let Ok(result) = &result { self.record_read( @@ -304,7 +364,7 @@ impl ObjectStore for IoTrackingStore { } async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> OSResult> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); let result = self.target.get_ranges(location, ranges).await; if let Ok(result) = &result { self.record_read( @@ -318,13 +378,13 @@ impl ObjectStore for IoTrackingStore { } async fn head(&self, location: &Path) -> OSResult { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_read("head", location.to_owned(), 0, None); self.target.head(location).await } async fn delete(&self, location: &Path) -> OSResult<()> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_write("delete", location.to_owned(), 0); self.target.delete(location).await } @@ -337,7 +397,7 @@ impl ObjectStore for IoTrackingStore { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_read("list", prefix.cloned().unwrap_or_default(), 0, None); self.target.list(prefix) } @@ -357,7 +417,7 @@ impl ObjectStore for IoTrackingStore { } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_read( "list_with_delimiter", prefix.cloned().unwrap_or_default(), @@ -368,25 +428,25 @@ impl ObjectStore for IoTrackingStore { } async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_write("copy", from.to_owned(), 0); self.target.copy(from, to).await } async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_write("rename", from.to_owned(), 0); self.target.rename(from, to).await } async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_write("rename_if_not_exists", from.to_owned(), 0); self.target.rename_if_not_exists(from, to).await } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { - let _guard = self.hop_guard(); + let _guard = self.stage_guard(); self.record_write("copy_if_not_exists", from.to_owned(), 0); self.target.copy_if_not_exists(from, to).await } @@ -395,9 +455,11 @@ impl ObjectStore for IoTrackingStore { #[derive(Debug)] struct IoTrackingMultipartUpload { target: Box, + #[cfg(feature = "test-util")] path: Path, stats: Arc>, - _guard: HopGuard, + #[cfg(feature = "test-util")] + _guard: StageGuard, } #[async_trait::async_trait] @@ -414,7 +476,8 @@ impl MultipartUpload for IoTrackingMultipartUpload { { let mut stats = self.stats.lock().unwrap(); stats.write_iops += 1; - stats.write_bytes += payload.content_length() as u64; + stats.written_bytes += payload.content_length() as u64; + #[cfg(feature = "test-util")] stats.requests.push(IoRequestRecord { method: "put_part", path: self.path.to_owned(), @@ -425,13 +488,18 @@ impl MultipartUpload for IoTrackingMultipartUpload { } } +#[cfg(feature = "test-util")] #[derive(Debug)] -struct HopGuard { +struct StageGuard { active_requests: Arc, stats: Arc>, } -impl HopGuard { +#[cfg(not(feature = "test-util"))] +struct StageGuard; + +#[cfg(feature = "test-util")] +impl StageGuard { fn new(active_requests: Arc, stats: Arc>) -> Self { active_requests.fetch_add(1, std::sync::atomic::Ordering::SeqCst); Self { @@ -441,7 +509,8 @@ impl HopGuard { } } -impl Drop for HopGuard { +#[cfg(feature = "test-util")] +impl Drop for StageGuard { fn drop(&mut self) { if self .active_requests @@ -449,7 +518,7 @@ impl Drop for HopGuard { == 1 { let mut stats = self.stats.lock().unwrap(); - stats.num_hops += 1; + stats.num_stages += 1; } } } diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 44093e2f6fa..d91b9237069 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -99,6 +99,7 @@ dirs = "5.0.0" all_asserts = "2.3.1" mock_instant.workspace = true lance-testing = { workspace = true } +lance-io = { workspace = true, features = ["test-util"] } tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } env_logger = "0.11.7" tempfile.workspace = true diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 4329ffdf003..c2b76ad2fc3 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2643,7 +2643,6 @@ mod tests { use lance_index::scalar::FullTextSearchQuery; use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, IndexType}; use lance_io::assert_io_eq; - use lance_io::utils::tracking_store::IOTracker; use lance_io::utils::CachedFileSize; use lance_linalg::distance::MetricType; use lance_table::feature_flags; @@ -2897,9 +2896,6 @@ mod tests { #[tokio::test] async fn test_load_manifest_iops() { - // Need to use in-memory for accurate IOPS tracking. - let io_tracker = Arc::new(IOTracker::default()); - // Use consistent session so memory store can be reused. let session = Arc::new(Session::default()); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -2917,10 +2913,6 @@ mod tests { batches, "memory://test", Some(WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), session: Some(session.clone()), ..Default::default() }), @@ -2928,17 +2920,10 @@ mod tests { .await .unwrap(); - let _ = io_tracker.incremental_stats(); //reset + let _ = _original_ds.object_store().io_stats_incremental(); //reset let _dataset = DatasetBuilder::from_uri("memory://test") - .with_read_params(ReadParams { - store_options: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), - session: Some(session), - ..Default::default() - }) + .with_session(session) .load() .await .unwrap(); @@ -2947,7 +2932,7 @@ mod tests { // 1. List _versions directory to get the latest manifest location // 2. Read the manifest file. (The manifest is small enough to be read in one go. // Larger manifests would result in more IOPS.) - let io_stats = io_tracker.incremental_stats(); + let io_stats = _dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 2); } @@ -8884,7 +8869,6 @@ mod tests { } let session = Arc::new(Session::default()); - let io_tracker = Arc::new(IOTracker::default()); // Case 1: Default write_flag=true, delete external transaction file, read should use inline transaction let ds = create_dataset(5).await; @@ -8901,23 +8885,15 @@ mod tests { // Case 2: reading small manifest caches transaction data, eliminating transaction reading IO. let read_ds2 = DatasetBuilder::from_uri(ds2.uri.clone()) .with_session(session.clone()) - .with_read_params(ReadParams { - store_options: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), - session: Some(session.clone()), - ..Default::default() - }) .load() .await .unwrap(); - let stats = io_tracker.incremental_stats(); // Reset + let stats = read_ds2.object_store().io_stats_incremental(); // Reset assert!(stats.read_bytes < 64 * 1024); // Because the manifest is so small, we should have opportunistically // cached the transaction in memory already. let inline_tx = read_ds2.read_transaction().await.unwrap().unwrap(); - let stats = io_tracker.incremental_stats(); + let stats = read_ds2.object_store().io_stats_incremental(); assert_eq!(stats.read_iops, 0); assert_eq!(stats.read_bytes, 0); assert_eq!(inline_tx, tx); diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 1a1ae67116f..b90d0b7b81b 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2604,11 +2604,7 @@ mod tests { use lance_datagen::{array, gen_batch, RowCount}; use lance_file::version::LanceFileVersion; use lance_file::writer::FileWriterOptions; - use lance_io::{ - assert_io_eq, assert_io_lt, - object_store::{ObjectStore, ObjectStoreParams}, - utils::tracking_store::IOTracker, - }; + use lance_io::{assert_io_eq, assert_io_lt, object_store::ObjectStore}; use pretty_assertions::assert_eq; use rstest::rstest; @@ -3883,12 +3879,7 @@ mod tests { ) .unwrap(); let session = Arc::new(Session::default()); - let io_stats = Arc::new(IOTracker::default()); let write_params = WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(io_stats.clone()), - ..Default::default() - }), session: Some(session.clone()), ..Default::default() }; @@ -3901,9 +3892,9 @@ mod tests { // Assert file is small (< 4300 bytes) { - let stats = io_stats.incremental_stats(); + let stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(stats, write_iops, 3); - assert_io_lt!(stats, write_bytes, 4300); + assert_io_lt!(stats, written_bytes, 4300); } // Measure IOPS needed to scan all data first time. @@ -3926,7 +3917,7 @@ mod tests { assert_eq!(data.num_rows(), 1); assert_eq!(data.num_columns(), 7); - let stats = io_stats.incremental_stats(); + let stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(stats, read_iops, 1); assert_io_lt!(stats, read_bytes, 4096); } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 68c63a994d1..49fc91b3d68 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3911,7 +3911,7 @@ mod test { use lance_index::{scalar::ScalarIndexParams, IndexType}; use lance_io::assert_io_gt; use lance_io::object_store::ObjectStoreParams; - use lance_io::utils::tracking_store::IOTracker; + use lance_linalg::distance::DistanceType; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector}; use object_store::throttle::ThrottleConfig; @@ -6461,15 +6461,10 @@ mod test { .col("not_indexed", array::step::()) .into_reader_rows(RowCount::from(1000), BatchCount::from(20)); - let io_tracker = Arc::new(IOTracker::default()); let mut dataset = Dataset::write( data, "memory://test", Some(WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), commit_handler: Some(Arc::new(RenameCommitHandler)), data_storage_version: Some(data_storage_version), ..Default::default() @@ -6489,9 +6484,9 @@ mod test { .unwrap(); // First run a full scan to get a baseline - let _ = io_tracker.incremental_stats(); // reset + let _ = dataset.object_store().io_stats_incremental(); // reset dataset.scan().try_into_batch().await.unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); let full_scan_bytes = io_stats.read_bytes; // Next do a scan without pushdown, we should still see a benefit from late materialization @@ -6503,7 +6498,7 @@ mod test { .try_into_batch() .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_lt!(io_stats, read_bytes, full_scan_bytes); let filtered_scan_bytes = io_stats.read_bytes; @@ -6517,7 +6512,7 @@ mod test { .try_into_batch() .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_lt!(io_stats, read_bytes, filtered_scan_bytes); } @@ -6531,7 +6526,7 @@ mod test { .try_into_batch() .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_lt!(io_stats, read_bytes, full_scan_bytes); let index_scan_bytes = io_stats.read_bytes; @@ -6544,7 +6539,7 @@ mod test { .try_into_batch() .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_lt!(io_stats, read_bytes, index_scan_bytes); } @@ -7608,15 +7603,10 @@ mod test { .col("not_indexed", array::step::()) .into_reader_rows(RowCount::from(100), BatchCount::from(5)); - let io_tracker = Arc::new(IOTracker::default()); let mut dataset = Dataset::write( data, "memory://test", Some(WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), data_storage_version: Some(data_storage_version), ..Default::default() }), @@ -7681,7 +7671,7 @@ mod test { .unwrap(); // First pass will need to perform some IOPs to determine what scalar indices are available - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_gt!(io_stats, read_iops, 0); // Second planning cycle should not perform any I/O @@ -7694,7 +7684,7 @@ mod test { .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); dataset @@ -7706,7 +7696,7 @@ mod test { .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); dataset @@ -7719,7 +7709,7 @@ mod test { .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); dataset @@ -7732,7 +7722,7 @@ mod test { .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); } diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index ba7bf04a5df..0be1688bbc8 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -474,9 +474,9 @@ pub struct BatchCommitResult { mod tests { use arrow::array::{Int32Array, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; - use lance_io::utils::tracking_store::IOTracker; + + use lance_io::utils::CachedFileSize; use lance_io::{assert_io_eq, assert_io_gt}; - use lance_io::{object_store::ChainedWrappingObjectStore, utils::CachedFileSize}; use lance_table::format::{DataFile, Fragment}; use std::time::Duration; @@ -524,7 +524,6 @@ mod tests { #[tokio::test] async fn test_reuse_session() { // Need to use in-memory for accurate IOPS tracking. - let io_tracker = IOTracker::default(); let session = Arc::new(Session::default()); // Create new dataset let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -537,13 +536,8 @@ mod tests { vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], ) .unwrap(); - let store_params = ObjectStoreParams { - object_store_wrapper: Some(Arc::new(io_tracker.clone())), - ..Default::default() - }; let dataset = InsertBuilder::new("memory://test") .with_params(&WriteParams { - store_params: Some(store_params.clone()), session: Some(session.clone()), enable_v2_manifest_paths: true, ..Default::default() @@ -553,7 +547,7 @@ mod tests { .unwrap(); let dataset = Arc::new(dataset); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_gt!(io_stats, read_iops, 0); assert_io_gt!(io_stats, write_iops, 0); @@ -569,7 +563,7 @@ mod tests { // we shouldn't need to read anything from disk. Except we do need // to check for the latest version to see if we need to do conflict // resolution. - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 1, "check latest version, i = {} ", i); // Should see 2 IOPs: // 1. Write the transaction files @@ -579,7 +573,6 @@ mod tests { // Commit transaction with URI and session let new_ds = CommitBuilder::new("memory://test") - .with_store_params(store_params.clone()) .with_session(dataset.session.clone()) .execute(sample_transaction(1)) .await @@ -588,7 +581,7 @@ mod tests { // Session should still be re-used // However, the dataset needs to be loaded and the read version checked out, // so an additional 4 IOPs are needed. - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 5, "load dataset + check version"); assert_io_eq!(io_stats, write_iops, 2, "write txn + manifest"); @@ -596,7 +589,6 @@ mod tests { // registry so we see the same store. let new_session = Arc::new(Session::new(0, 0, session.store_registry())); let new_ds = CommitBuilder::new("memory://test") - .with_store_params(store_params) .with_session(new_session) .execute(sample_transaction(1)) .await @@ -604,7 +596,7 @@ mod tests { assert_eq!(new_ds.manifest().version, 8); // Now we have to load all previous transactions. - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_gt!(io_stats, read_iops, 10); assert_io_eq!(io_stats, write_iops, 2, "write txn + manifest"); } @@ -615,12 +607,7 @@ mod tests { // * write txn file (this could be optional one day) // * write manifest let session = Arc::new(Session::default()); - let io_tracker = IOTracker::default(); let write_params = WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(Arc::new(io_tracker.clone())), - ..Default::default() - }), session: Some(session.clone()), ..Default::default() }; @@ -639,15 +626,15 @@ mod tests { .await .unwrap(); - io_tracker.incremental_stats(); // Reset the stats + dataset.object_store().io_stats_incremental(); // Reset the stats let read_version = dataset.manifest().version; - let _ = CommitBuilder::new(Arc::new(dataset)) + let new_ds = CommitBuilder::new(Arc::new(dataset)) .execute(sample_transaction(read_version)) .await .unwrap(); // Assert io requests - let io_stats = io_tracker.incremental_stats(); + let io_stats = new_ds.object_store().io_stats_incremental(); // This could be zero, if we decided to be optimistic. However, that // would mean two wasted write requests (txn + manifest) if there was // a conflict. We choose to be pessimistic for more consistent performance. @@ -655,7 +642,7 @@ mod tests { assert_io_eq!(io_stats, write_iops, 2); // We can't write them in parallel. The transaction file must exist before // we can write the manifest. - assert_io_eq!(io_stats, num_hops, 3); + assert_io_eq!(io_stats, num_stages, 3); } #[tokio::test] @@ -663,7 +650,6 @@ mod tests { async fn test_commit_conflict_iops(#[values(true, false)] use_cache: bool) { let cache_size = if use_cache { 1_000_000 } else { 0 }; let session = Arc::new(Session::new(0, cache_size, Default::default())); - let io_tracker = Arc::new(IOTracker::default()); // We need throttled to correctly count num hops. Otherwise, memory store // returns synchronously, and each request is 1 hop. let throttled = Arc::new(ThrottledStoreWrapper { @@ -676,10 +662,7 @@ mod tests { }); let write_params = WriteParams { store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(Arc::new(ChainedWrappingObjectStore::new(vec![ - throttled, - io_tracker.clone(), - ]))), + object_store_wrapper: Some(throttled), ..Default::default() }), session: Some(session.clone()), @@ -709,14 +692,14 @@ mod tests { .await .unwrap(); } - io_tracker.incremental_stats(); + dataset.object_store().io_stats_incremental(); - let _ = CommitBuilder::new(original_dataset.clone()) + let new_ds = CommitBuilder::new(original_dataset.clone()) .execute(sample_transaction(original_dataset.manifest().version)) .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = new_ds.object_store().io_stats_incremental(); // If there is a conflict with two transaction, the retry should require io requests: // * 1 list version @@ -728,7 +711,7 @@ mod tests { // of those. We should be able to read in 5 hops. if use_cache { assert_io_eq!(io_stats, read_iops, 1); // Just list versions - assert_io_eq!(io_stats, num_hops, 3); + assert_io_eq!(io_stats, num_stages, 3); } else { // We need to read the other manifests and transactions. @@ -737,7 +720,7 @@ mod tests { // It's possible to read the txns for some versions before we // finish reading later versions and so the entire "read versions // and txs" may appear as 1 hop instead of 2. - assert_io_lt!(io_stats, num_hops, 6); + assert_io_lt!(io_stats, num_stages, 6); } assert_io_eq!(io_stats, write_iops, 2); // txn + manifest } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index c24c0744c98..3ca748e558a 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -1606,12 +1606,12 @@ fn is_vector_field(data_type: DataType) -> bool { mod tests { use crate::dataset::builder::DatasetBuilder; use crate::dataset::optimize::{compact_files, CompactionOptions}; - use crate::dataset::{ReadParams, WriteMode, WriteParams}; + use crate::dataset::{WriteMode, WriteParams}; use crate::index::vector::VectorIndexParams; use crate::session::Session; use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount}; use arrow_array::Int32Array; - use lance_io::utils::tracking_store::IOTracker; + use lance_io::{assert_io_eq, assert_io_lt}; use super::*; @@ -1630,7 +1630,7 @@ mod tests { use lance_index::vector::{ hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams, }; - use lance_io::object_store::ObjectStoreParams; + use lance_linalg::distance::{DistanceType, MetricType}; use lance_testing::datagen::generate_random_array; use rstest::rstest; @@ -2358,12 +2358,7 @@ mod tests { #[lance_test_macros::test(tokio::test)] async fn test_load_indices() { let session = Arc::new(Session::default()); - let io_tracker = Arc::new(IOTracker::default()); let write_params = WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), session: Some(session.clone()), ..Default::default() }; @@ -2392,10 +2387,10 @@ mod tests { ) .await .unwrap(); - io_tracker.incremental_stats(); // Reset + dataset.object_store().io_stats_incremental(); // Reset let indices = dataset.load_indices().await.unwrap(); - let stats = io_tracker.incremental_stats(); + let stats = dataset.object_store().io_stats_incremental(); // We should already have this cached since we just wrote it. assert_io_eq!(stats, read_iops, 0); assert_io_eq!(stats, read_bytes, 0); @@ -2405,24 +2400,16 @@ mod tests { let dataset2 = DatasetBuilder::from_uri(test_uri) .with_session(session.clone()) - .with_read_params(ReadParams { - store_options: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), - session: Some(session.clone()), - ..Default::default() - }) .load() .await .unwrap(); - let stats = io_tracker.incremental_stats(); // Reset + let stats = dataset2.object_store().io_stats_incremental(); // Reset assert_io_lt!(stats, read_bytes, 64 * 1024); // Because the manifest is so small, we should have opportunistically // cached the indices in memory already. let indices2 = dataset2.load_indices().await.unwrap(); - let stats = io_tracker.incremental_stats(); + let stats = dataset2.object_store().io_stats_incremental(); assert_io_eq!(stats, read_iops, 0); assert_io_eq!(stats, read_bytes, 0); assert_eq!(indices2.len(), 1); diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 5e98b634f4e..dfe0edc8d0e 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1659,8 +1659,7 @@ mod tests { use lance_core::Error; use lance_file::version::LanceFileVersion; use lance_io::assert_io_eq; - use lance_io::object_store::ObjectStoreParams; - use lance_io::utils::tracking_store::IOTracker; + use lance_table::format::IndexMetadata; use lance_table::io::deletion::{deletion_file_path, read_deletion_file}; @@ -1672,13 +1671,8 @@ mod tests { io, }; - async fn test_dataset(num_rows: usize, num_fragments: usize) -> (Dataset, Arc) { - let io_tracker = Arc::new(IOTracker::default()); + async fn test_dataset(num_rows: usize, num_fragments: usize) -> Dataset { let write_params = WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(io_tracker.clone()), - ..Default::default() - }), max_rows_per_file: num_rows / num_fragments, ..Default::default() }; @@ -1700,7 +1694,7 @@ mod tests { .execute(vec![data]) .await .unwrap(); - (dataset, io_tracker) + dataset } /// Helper function for tests to create UpdateConfig operations using old-style parameters @@ -1752,7 +1746,7 @@ mod tests { #[tokio::test] async fn test_non_overlapping_rebase_delete_update() { - let (dataset, io_tracker) = test_dataset(5, 5).await; + let dataset = test_dataset(5, 5).await; let operation = Operation::Update { updated_fragments: vec![Fragment::new(0)], removed_fragment_ids: vec![], @@ -1793,12 +1787,12 @@ mod tests { .await .unwrap(); - io_tracker.incremental_stats(); // reset + dataset.object_store().io_stats_incremental(); // reset for (other_version, other_transaction) in other_transactions.iter().enumerate() { rebase .check_txn(other_transaction, other_version as u64) .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); assert_io_eq!(io_stats, write_iops, 0); } @@ -1812,7 +1806,7 @@ mod tests { let rebased_transaction = rebase.finish(&dataset).await.unwrap(); assert_eq!(rebased_transaction, expected_transaction); // We didn't need to do any IO, so the stats should be 0. - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); assert_io_eq!(io_stats, write_iops, 0); } @@ -1865,7 +1859,7 @@ mod tests { #[rstest::rstest] async fn test_non_conflicting_rebase_delete_update() { // 5 rows, all in one fragment. Each transaction modifies a different row. - let (mut dataset, io_tracker) = test_dataset(5, 1).await; + let mut dataset = test_dataset(5, 1).await; let mut fragment = dataset.fragments().as_slice()[0].clone(); // Other operations modify the 1st, 2nd, and 3rd rows sequentially. @@ -1915,12 +1909,12 @@ mod tests { .await .unwrap(); - io_tracker.incremental_stats(); // reset + dataset.object_store().io_stats_incremental(); // reset for (other_version, other_transaction) in previous_transactions.iter().enumerate() { rebase .check_txn(other_transaction, other_version as u64) .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); assert_io_eq!(io_stats, write_iops, 0); } @@ -1931,7 +1925,7 @@ mod tests { let rebased_transaction = rebase.finish(&dataset).await.unwrap(); assert_eq!(rebased_transaction.read_version, dataset.manifest.version); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); if expected_rewrite { // Read the current deletion file, and write the new one. assert_io_eq!(io_stats, read_iops, 0, "deletion file should be cached"); @@ -1976,7 +1970,7 @@ mod tests { ); assert!(dataset.object_store().exists(&new_path).await.unwrap()); - assert_io_eq!(io_stats, num_hops, 1); + assert_io_eq!(io_stats, num_stages, 1); } else { // No IO should have happened. assert_io_eq!(io_stats, read_iops, 0); @@ -1998,7 +1992,7 @@ mod tests { #[values("update_full", "update_partial", "delete_full", "delete_partial")] other: &str, ) { // 5 rows, all in one fragment. Each transaction modifies the same row. - let (dataset, io_tracker) = test_dataset(5, 1).await; + let dataset = test_dataset(5, 1).await; let mut fragment = dataset.fragments().as_slice()[0].clone(); let sample_file = Fragment::new(0) @@ -2078,12 +2072,12 @@ mod tests { let affected_rows = RowIdTreeMap::from_iter([0]); - io_tracker.incremental_stats(); // reset + dataset.object_store().io_stats_incremental(); // reset let mut rebase = TransactionRebase::try_new(&dataset, txn.clone(), Some(&affected_rows)) .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); assert_io_eq!(io_stats, write_iops, 0); @@ -2108,7 +2102,7 @@ mod tests { vec![(0, true)], ); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0); assert_io_eq!(io_stats, write_iops, 0); @@ -2118,7 +2112,7 @@ mod tests { Err(crate::Error::RetryableCommitConflict { .. }) )); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, read_iops, 0, "deletion file should be cached"); assert_io_eq!(io_stats, write_iops, 0, "failed before writing"); } @@ -2620,7 +2614,7 @@ mod tests { #[tokio::test] async fn test_add_bases_non_conflicting() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; // Create two transactions adding different bases let txn1 = Transaction::new_from_version( @@ -2656,7 +2650,7 @@ mod tests { #[tokio::test] async fn test_add_bases_name_conflict() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; // Create two transactions adding bases with the same name let txn1 = Transaction::new_from_version( @@ -2697,7 +2691,7 @@ mod tests { #[tokio::test] async fn test_add_bases_path_conflict() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; // Create two transactions adding bases with the same path let txn1 = Transaction::new_from_version( @@ -2738,7 +2732,7 @@ mod tests { #[tokio::test] async fn test_add_bases_id_conflict() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; // Create two transactions adding bases with the same non-zero ID let txn1 = Transaction::new_from_version( @@ -2779,7 +2773,7 @@ mod tests { #[tokio::test] async fn test_add_bases_no_conflict_with_data_operations() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; let add_bases_txn = Transaction::new_from_version( 1, @@ -2827,7 +2821,7 @@ mod tests { #[tokio::test] async fn test_add_bases_multiple_bases() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; // txn1 adds two bases let txn1 = Transaction::new_from_version( @@ -2877,7 +2871,7 @@ mod tests { #[tokio::test] async fn test_add_bases_with_none_name() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; // Bases with None names should not conflict on name let txn1 = Transaction::new_from_version( @@ -2913,7 +2907,7 @@ mod tests { #[tokio::test] async fn test_add_bases_with_zero_id() { - let (dataset, _) = test_dataset(10, 2).await; + let dataset = test_dataset(10, 2).await; // Bases with zero IDs should not conflict on ID let txn1 = Transaction::new_from_version( diff --git a/rust/lance/src/io/commit/s3_test.rs b/rust/lance/src/io/commit/s3_test.rs index a6e848e0354..35e64703688 100644 --- a/rust/lance/src/io/commit/s3_test.rs +++ b/rust/lance/src/io/commit/s3_test.rs @@ -180,6 +180,7 @@ async fn test_concurrent_writers() { let datagen = gen_batch().col("values", array::step::()); let data = datagen.into_batch_rows(RowCount::from(100)).unwrap(); + // We want to track IOs prior to creating the dataset, so need to explicitly create the tracker let io_tracker = Arc::new(IOTracker::default()); // Create a table @@ -216,7 +217,7 @@ async fn test_concurrent_writers() { .await .unwrap(); // Commit: 2 IOPs. 1 for transaction file, 1 for manifest file - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); assert_io_eq!(io_stats, write_iops, 2); let dataset = Arc::new(dataset); let old_version = dataset.manifest().version; @@ -292,7 +293,7 @@ async fn test_ddb_open_iops() { let io_stats = io_tracker.incremental_stats(); assert_io_eq!(io_stats, write_iops, 1); - let _ = CommitBuilder::new(&uri) + let committed_ds = CommitBuilder::new(&uri) .with_store_params(store_params.clone()) .execute(transaction) .await @@ -303,7 +304,7 @@ async fn test_ddb_open_iops() { // * write staged file // * copy to final file // * delete staged file - let io_stats = io_tracker.incremental_stats(); + let io_stats = committed_ds.object_store().io_stats_incremental(); assert_io_eq!(io_stats, write_iops, 4); assert_io_eq!(io_stats, read_iops, 1); @@ -315,7 +316,7 @@ async fn test_ddb_open_iops() { .load() .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); // Open dataset can be read with 1 IOP, just to read the manifest. // Looking up latest manifest is handled in dynamodb. assert_io_eq!(io_stats, read_iops, 1); @@ -330,7 +331,7 @@ async fn test_ddb_open_iops() { .execute(vec![data.clone()]) .await .unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); // Append: 5 IOPS: data file, transaction file, 3x manifest file assert_io_eq!(io_stats, write_iops, 5); // TODO: we can reduce this by implementing a specialized CommitHandler::list_manifest_locations() @@ -339,7 +340,7 @@ async fn test_ddb_open_iops() { // Checkout original version dataset.checkout_version(1).await.unwrap(); - let io_stats = io_tracker.incremental_stats(); + let io_stats = dataset.object_store().io_stats_incremental(); // Checkout: 1 IOPS: manifest file assert_io_eq!(io_stats, read_iops, 1); assert_io_eq!(io_stats, write_iops, 0);