Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
Compaction,
CompactionMetrics,
DatasetBasePath,
IOStats,
LanceSchema,
ScanStatistics,
_Dataset,
Expand Down Expand Up @@ -1345,6 +1346,87 @@ def get_fragment(self, fragment_id: int) -> Optional[LanceFragment]:
return None
return LanceFragment(self, fragment_id=None, fragment=raw_fragment)

def io_stats_snapshot(self) -> IOStats:
"""
Get a snapshot of current IO statistics without resetting counters.

Returns the current IO statistics without modifying the internal state.
Use this when you need to check stats without resetting them. Multiple
calls will return the same values until IO operations are performed.

Returns
-------
IOStats
Object containing IO statistics with the following attributes:
- read_iops: Number of read operations
- read_bytes: Total bytes read
- write_iops: Number of write operations
- write_bytes: Total bytes written
- num_hops: Number of network hops (for remote storage)

Examples
--------
>>> import lance
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3]})
>>> dataset = lance.write_dataset(data, "memory://test_stats")
>>> result = dataset.to_table()
>>> # Check stats without resetting
>>> stats = dataset.io_stats_snapshot()
>>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations")
Read ... bytes in ... operations
>>> # Can check again and see the same values
>>> stats2 = dataset.io_stats_snapshot()
>>> assert stats.read_bytes == stats2.read_bytes

See Also
--------
io_stats_incremental : Get stats and reset counters for incremental tracking
"""
return self._ds.io_stats_snapshot()

def io_stats_incremental(self) -> IOStats:
"""
Get incremental IO statistics and reset the counters.

Returns IO statistics (number of operations and bytes) since the last
time this method was called, then resets the internal counters to zero.
This is useful for tracking IO operations between different stages of
processing.

Returns
-------
IOStats
Object containing IO statistics with the following attributes:
- read_iops: Number of read operations
- read_bytes: Total bytes read
- write_iops: Number of write operations
- write_bytes: Total bytes written
- num_hops: Number of network hops (for remote storage)

Examples
--------
>>> import lance
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3]})
>>> dataset = lance.write_dataset(data, "memory://test_stats")
>>> result = dataset.to_table()
>>> # Get incremental stats (and reset)
>>> stats = dataset.io_stats_incremental()
>>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations")
Read ... bytes in ... operations
>>> # Next call returns only new stats since last call
>>> more_data = dataset.to_table()
>>> stats2 = dataset.io_stats_incremental()
>>> print(f"Read {stats2.read_bytes} more bytes")
Read ... more bytes

See Also
--------
io_stats_snapshot : Get stats without resetting counters
"""
return self._ds.io_stats_incremental()

def to_batches(
self,
columns: Optional[Union[List[str], Dict[str, str]]] = None,
Expand Down
14 changes: 14 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ use crate::{LanceReader, Scanner};

use self::cleanup::CleanupStats;
use self::commit::PyCommitLock;
use self::io_stats::IoStats;

pub mod blob;
pub mod cleanup;
pub mod commit;
pub mod io_stats;
pub mod optimize;
pub mod stats;

Expand Down Expand Up @@ -2045,6 +2047,18 @@ impl Dataset {
Session::new(self.ds.session())
}

/// Get a snapshot of current IO statistics without resetting counters
fn io_stats_snapshot(&self) -> IoStats {
let stats = self.ds.object_store().io_stats_snapshot();
IoStats::from_lance(stats)
}

/// Get incremental IO statistics for this dataset
fn io_stats_incremental(&self) -> IoStats {
let stats = self.ds.object_store().io_stats_incremental();
IoStats::from_lance(stats)
}

#[staticmethod]
#[pyo3(signature = (dest, storage_options = None, ignore_not_found = None))]
fn drop(
Expand Down
48 changes: 48 additions & 0 deletions python/src/dataset/io_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! IO statistics tracking for dataset operations

use pyo3::{pyclass, pymethods};

/// IO statistics for dataset operations
///
/// This tracks the number of IO operations and bytes transferred for read and write
/// operations performed on the dataset's object store.
///
/// Note: Calling `io_stats()` returns the statistics accumulated since the last call
/// and resets the internal counters (incremental stats pattern).
#[pyclass(name = "IOStats", module = "_lib", get_all)]
#[derive(Clone, Debug)]
pub struct IoStats {
Comment on lines +8 to +17
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could include these docs in mkdocs. I'll make an issue to figure that out. Then we wouldn't need the lengthy Returns block on the python.

/// Number of read IO operations performed
pub read_iops: u64,
/// Total bytes read from storage
pub read_bytes: u64,
/// Number of write IO operations performed
pub write_iops: u64,
/// Total bytes written to storage
pub written_bytes: u64,
}

#[pymethods]
impl IoStats {
fn __repr__(&self) -> String {
format!(
"IOStats(read_iops={}, read_bytes={}, write_iops={}, write_bytes={})",
self.read_iops, self.read_bytes, self.write_iops, self.written_bytes
)
}
}

impl IoStats {
/// Convert from Lance's internal IoStats type
pub fn from_lance(stats: lance_io::utils::tracking_store::IoStats) -> Self {
Self {
read_iops: stats.read_iops,
read_bytes: stats.read_bytes,
write_iops: stats.write_iops,
written_bytes: stats.written_bytes,
}
}
}
2 changes: 2 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider;
use datagen::register_datagen;
use dataset::blob::LanceBlobFile;
use dataset::cleanup::CleanupStats;
use dataset::io_stats::IoStats;
use dataset::optimize::{
PyCompaction, PyCompactionMetrics, PyCompactionPlan, PyCompactionTask, PyRewriteResult,
};
Expand Down Expand Up @@ -258,6 +259,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LanceBufferDescriptor>()?;
m.add_class::<BFloat16>()?;
m.add_class::<CleanupStats>()?;
m.add_class::<IoStats>()?;
m.add_class::<KMeans>()?;
m.add_class::<Hnsw>()?;
m.add_class::<PyCompactionTask>()?;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ gcp = ["object_store/gcp", "dep:opendal", "opendal/services-gcs", "dep:object_st
aws = ["object_store/aws", "dep:aws-config", "dep:aws-credential-types", "dep:opendal", "opendal/services-s3", "dep:object_store_opendal"]
azure = ["object_store/azure", "dep:opendal", "opendal/services-azblob", "dep:object_store_opendal"]
oss = ["dep:opendal", "opendal/services-oss", "dep:object_store_opendal"]
test-util = []

[lints]
workspace = true
46 changes: 42 additions & 4 deletions rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tracing::instrument;

use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
use crate::traits::{Reader, Writer};
use crate::utils::tracking_store::IOTracker;

/// Convert an [`object_store::path::Path`] to a [`std::path::Path`].
pub fn to_local_path(path: &Path) -> String {
Expand Down Expand Up @@ -86,6 +87,9 @@ pub struct LocalObjectReader {

/// Block size, in bytes.
block_size: usize,

/// IO tracker for monitoring read operations.
io_tracker: Arc<IOTracker>,
}

impl DeepSizeOf for LocalObjectReader {
Expand All @@ -107,11 +111,24 @@ impl LocalObjectReader {
}

/// Open a local object reader, with default prefetch size.
///
/// For backward compatibility with existing code that doesn't need tracking.
#[instrument(level = "debug")]
pub async fn open(
path: &Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
Self::open_with_tracker(path, block_size, known_size, Default::default()).await
}

/// Open a local object reader with optional IO tracking.
#[instrument(level = "debug")]
pub(crate) async fn open_with_tracker(
path: &Path,
block_size: usize,
known_size: Option<usize>,
io_tracker: Arc<IOTracker>,
) -> Result<Box<dyn Reader>> {
let path = path.clone();
let local_path = to_local_path(&path);
Expand All @@ -129,6 +146,7 @@ impl LocalObjectReader {
block_size,
size,
path,
io_tracker,
}) as Box<dyn Reader>)
})
.await?
Expand Down Expand Up @@ -171,7 +189,12 @@ impl Reader for LocalObjectReader {
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
let file = self.file.clone();
tokio::task::spawn_blocking(move || {
let io_tracker = self.io_tracker.clone();
let path = self.path.clone();
let num_bytes = range.len() as u64;
let range_u64 = (range.start as u64)..(range.end as u64);

let result = tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
// Safety: `buf` is set with appropriate capacity above. It is
// written to below and we check all data is initialized at that point.
Expand All @@ -187,14 +210,23 @@ impl Reader for LocalObjectReader {
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
});

if result.is_ok() {
io_tracker.record_read("get_range", path, num_bytes, Some(range_u64));
}

result
}

/// Reads the entire file.
#[instrument(level = "debug", skip(self))]
async fn get_all(&self) -> object_store::Result<Bytes> {
let mut file = self.file.clone();
tokio::task::spawn_blocking(move || {
let io_tracker = self.io_tracker.clone();
let path = self.path.clone();

let result = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
file.read_to_end(buf.as_mut())?;
Ok(Bytes::from(buf))
Expand All @@ -203,7 +235,13 @@ impl Reader for LocalObjectReader {
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
});

if let Ok(bytes) = &result {
io_tracker.record_read("get_all", path, bytes.len() as u64, None);
}

result
}
}

Expand Down
Loading
Loading