Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
Compaction,
CompactionMetrics,
DatasetBasePath,
IOStats,
LanceSchema,
ScanStatistics,
_Dataset,
Expand Down Expand Up @@ -1345,6 +1346,87 @@ def get_fragment(self, fragment_id: int) -> Optional[LanceFragment]:
return None
return LanceFragment(self, fragment_id=None, fragment=raw_fragment)

def io_stats_snapshot(self) -> IOStats:
"""
Get a snapshot of current IO statistics without resetting counters.

Returns the current IO statistics without modifying the internal state.
Use this when you need to check stats without resetting them. Multiple
calls will return the same values until IO operations are performed.

Returns
-------
IOStats
Object containing IO statistics with the following attributes:
- read_iops: Number of read operations
- read_bytes: Total bytes read
- write_iops: Number of write operations
- write_bytes: Total bytes written
- num_hops: Number of network hops (for remote storage)

Examples
--------
>>> import lance
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3]})
>>> dataset = lance.write_dataset(data, "memory://test_stats")
>>> result = dataset.to_table()
>>> # Check stats without resetting
>>> stats = dataset.io_stats_snapshot()
>>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations")
Read ... bytes in ... operations
>>> # Can check again and see the same values
>>> stats2 = dataset.io_stats_snapshot()
>>> assert stats.read_bytes == stats2.read_bytes

See Also
--------
io_stats_incremental : Get stats and reset counters for incremental tracking
"""
return self._ds.io_stats_snapshot()

def io_stats_incremental(self) -> IOStats:
"""
Get incremental IO statistics and reset the counters.

Returns IO statistics (number of operations and bytes) since the last
time this method was called, then resets the internal counters to zero.
This is useful for tracking IO operations between different stages of
processing.

Returns
-------
IOStats
Object containing IO statistics with the following attributes:
- read_iops: Number of read operations
- read_bytes: Total bytes read
- write_iops: Number of write operations
- write_bytes: Total bytes written
- num_hops: Number of network hops (for remote storage)

Examples
--------
>>> import lance
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3]})
>>> dataset = lance.write_dataset(data, "memory://test_stats")
>>> result = dataset.to_table()
>>> # Get incremental stats (and reset)
>>> stats = dataset.io_stats_incremental()
>>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations")
Read ... bytes in ... operations
>>> # Next call returns only new stats since last call
>>> more_data = dataset.to_table()
>>> stats2 = dataset.io_stats_incremental()
>>> print(f"Read {stats2.read_bytes} more bytes")
Read ... more bytes

See Also
--------
io_stats_snapshot : Get stats without resetting counters
"""
return self._ds.io_stats_incremental()

def to_batches(
self,
columns: Optional[Union[List[str], Dict[str, str]]] = None,
Expand Down
64 changes: 64 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ use crate::{LanceReader, Scanner};

use self::cleanup::CleanupStats;
use self::commit::PyCommitLock;
use self::io_stats::IoStats;

pub mod blob;
pub mod cleanup;
pub mod commit;
pub mod io_stats;
pub mod optimize;
pub mod stats;

Expand Down Expand Up @@ -2045,6 +2047,68 @@ impl Dataset {
Session::new(self.ds.session())
}

/// Get a snapshot of current IO statistics without resetting counters
///
/// Returns the current IO statistics without modifying the internal state.
/// Use this when you need to check stats without resetting them.
///
/// Returns
/// -------
/// IOStats
/// Statistics about read/write operations including:
/// - read_iops: Number of read operations
/// - read_bytes: Total bytes read
/// - write_iops: Number of write operations
/// - write_bytes: Total bytes written
/// - num_hops: Number of disjoint IO periods (lower means more parallelism)
///
/// Examples
/// --------
/// >>> import lance
/// >>> dataset = lance.dataset("my_dataset")
/// >>> result = dataset.to_table()
/// >>> # Check stats without resetting
/// >>> stats = dataset.io_stats_snapshot()
/// >>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations")
/// >>> # Can check again and see the same values
/// >>> stats2 = dataset.io_stats_snapshot()
/// >>> assert stats.read_bytes == stats2.read_bytes
Comment thread
wjones127 marked this conversation as resolved.
Outdated
fn io_stats_snapshot(&self) -> IoStats {
let stats = self.ds.object_store().io_stats_snapshot();
IoStats::from_lance(stats)
}

/// Get incremental IO statistics for this dataset
///
/// Returns the accumulated IO statistics since the last call to this method
/// and resets the internal counters (incremental stats pattern). This is useful
/// for tracking IO operations between different stages of processing.
///
/// Returns
/// -------
/// IOStats
/// Statistics about read/write operations including:
/// - read_iops: Number of read operations
/// - read_bytes: Total bytes read
/// - write_iops: Number of write operations
/// - write_bytes: Total bytes written
/// - num_hops: Number of disjoint IO periods (lower means more parallelism)
///
/// Examples
/// --------
/// >>> import lance
/// >>> dataset = lance.dataset("my_dataset")
/// >>> # Perform some operations...
/// >>> stats = dataset.io_stats_incremental()
/// >>> print(f"Read {stats.read_bytes} bytes in {stats.read_iops} operations")
/// >>> # Next call returns only new stats since last call
/// >>> more_data = dataset.to_table()
/// >>> stats2 = dataset.io_stats_incremental()
Comment thread
wjones127 marked this conversation as resolved.
Outdated
fn io_stats_incremental(&self) -> IoStats {
let stats = self.ds.object_store().io_stats_incremental();
IoStats::from_lance(stats)
}

#[staticmethod]
#[pyo3(signature = (dest, storage_options = None, ignore_not_found = None))]
fn drop(
Expand Down
54 changes: 54 additions & 0 deletions python/src/dataset/io_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! IO statistics tracking for dataset operations

use pyo3::{pyclass, pymethods};

/// IO statistics for dataset operations
///
/// This tracks the number of IO operations and bytes transferred for read and write
/// operations performed on the dataset's object store.
///
/// Note: Calling `io_stats()` returns the statistics accumulated since the last call
/// and resets the internal counters (incremental stats pattern).
#[pyclass(name = "IOStats", module = "_lib", get_all)]
#[derive(Clone, Debug)]
pub struct IoStats {
Comment on lines +8 to +17

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could include these docs in mkdocs. I'll make an issue to figure that out. Then we wouldn't need the lengthy Returns block on the python.

/// Number of read IO operations performed
pub read_iops: u64,
/// Total bytes read from storage
pub read_bytes: u64,
/// Number of write IO operations performed
pub write_iops: u64,
/// Total bytes written to storage
pub write_bytes: u64,
/// Number of disjoint periods where at least one IO is in-flight
///
/// This metric helps understand IO parallelism. A lower number indicates
/// more parallel IO operations.
pub num_hops: u64,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling these hops/network hops isn't clear to me. Would this be network hops behind the S3 endpoint? That's how I would interpret it but I don't think we have that info

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just network requests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I should give an example in the comment.

Imagine a process:

  1. Call list to get 10 files
  2. In parallel, call head on 10 files
  3. Read the largest file

That's a total of 12 requests (list, 10 heads, 1 get). But we do them in 3 "hops". Maybe that's not the best term. Could do "stages" or something else.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Makes me think of dependency chains - not sure if that's a helpful term.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that "hops" is not quite the correct term (though I am used to our usage of it in this way from the Rust unit tests).

I also am not aware of any standard term.

Is this something we want to expose? Should we mention it is likely meaningless if there are concurrent operations in flight? Or that it can be a somewhat noisy metric?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll rename it for num_stages. You're also right it's mostly useful for testing. So I'll gate it under test-utils.

}

#[pymethods]
impl IoStats {
fn __repr__(&self) -> String {
format!(
"IOStats(read_iops={}, read_bytes={}, write_iops={}, write_bytes={}, num_hops={})",
self.read_iops, self.read_bytes, self.write_iops, self.write_bytes, self.num_hops
)
}
}

impl IoStats {
/// Convert from Lance's internal IoStats type
pub fn from_lance(stats: lance_io::utils::tracking_store::IoStats) -> Self {
Self {
read_iops: stats.read_iops,
read_bytes: stats.read_bytes,
write_iops: stats.write_iops,
write_bytes: stats.write_bytes,
num_hops: stats.num_hops,
}
}
}
2 changes: 2 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider;
use datagen::register_datagen;
use dataset::blob::LanceBlobFile;
use dataset::cleanup::CleanupStats;
use dataset::io_stats::IoStats;
use dataset::optimize::{
PyCompaction, PyCompactionMetrics, PyCompactionPlan, PyCompactionTask, PyRewriteResult,
};
Expand Down Expand Up @@ -258,6 +259,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LanceBufferDescriptor>()?;
m.add_class::<BFloat16>()?;
m.add_class::<CleanupStats>()?;
m.add_class::<IoStats>()?;
m.add_class::<KMeans>()?;
m.add_class::<Hnsw>()?;
m.add_class::<PyCompactionTask>()?;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ gcp = ["object_store/gcp", "dep:opendal", "opendal/services-gcs", "dep:object_st
aws = ["object_store/aws", "dep:aws-config", "dep:aws-credential-types", "dep:opendal", "opendal/services-s3", "dep:object_store_opendal"]
azure = ["object_store/azure", "dep:opendal", "opendal/services-azblob", "dep:object_store_opendal"]
oss = ["dep:opendal", "opendal/services-oss", "dep:object_store_opendal"]
test-util = []

[lints]
workspace = true
48 changes: 44 additions & 4 deletions rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tracing::instrument;

use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
use crate::traits::{Reader, Writer};
use crate::utils::tracking_store::IOTracker;

/// Convert an [`object_store::path::Path`] to a [`std::path::Path`].
pub fn to_local_path(path: &Path) -> String {
Expand Down Expand Up @@ -86,6 +87,9 @@ pub struct LocalObjectReader {

/// Block size, in bytes.
block_size: usize,

/// IO tracker for monitoring read operations.
io_tracker: Option<Arc<IOTracker>>,
}

impl DeepSizeOf for LocalObjectReader {
Expand All @@ -107,11 +111,24 @@ impl LocalObjectReader {
}

/// Open a local object reader, with default prefetch size.
///
/// For backward compatibility with existing code that doesn't need tracking.
#[instrument(level = "debug")]
pub async fn open(
path: &Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
Self::open_with_tracker(path, block_size, known_size, None).await
}

/// Open a local object reader with optional IO tracking.
#[instrument(level = "debug")]
pub(crate) async fn open_with_tracker(
path: &Path,
block_size: usize,
known_size: Option<usize>,
io_tracker: Option<Arc<IOTracker>>,
) -> Result<Box<dyn Reader>> {
let path = path.clone();
let local_path = to_local_path(&path);
Expand All @@ -129,6 +146,7 @@ impl LocalObjectReader {
block_size,
size,
path,
io_tracker,
}) as Box<dyn Reader>)
})
.await?
Expand Down Expand Up @@ -171,7 +189,12 @@ impl Reader for LocalObjectReader {
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
let file = self.file.clone();
tokio::task::spawn_blocking(move || {
let io_tracker = self.io_tracker.clone();
let path = self.path.clone();
let num_bytes = range.len() as u64;
let range_u64 = (range.start as u64)..(range.end as u64);

let result = tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
// Safety: `buf` is set with appropriate capacity above. It is
// written to below and we check all data is initialized at that point.
Expand All @@ -187,14 +210,24 @@ impl Reader for LocalObjectReader {
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
});

// Record the read operation if tracking is enabled
if let (Ok(_), Some(tracker)) = (&result, io_tracker.as_ref()) {
tracker.record_read("get_range", path, num_bytes, Some(range_u64));
}

result
}

/// Reads the entire file.
#[instrument(level = "debug", skip(self))]
async fn get_all(&self) -> object_store::Result<Bytes> {
let mut file = self.file.clone();
tokio::task::spawn_blocking(move || {
let io_tracker = self.io_tracker.clone();
let path = self.path.clone();

let result = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
file.read_to_end(buf.as_mut())?;
Ok(Bytes::from(buf))
Expand All @@ -203,7 +236,14 @@ impl Reader for LocalObjectReader {
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
});

// Record the read operation if tracking is enabled

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the optionality required? Elsewhere in the PR it seems to indicate it'll always be enabled

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove it. There is a constructor that doesn't pass a tracker down (used for tests I think). But I can just make it create an empty stats instance and record unconditionally. That can simplify some code.

if let (Ok(bytes), Some(tracker)) = (&result, io_tracker.as_ref()) {
tracker.record_read("get_all", path, bytes.len() as u64, None);
}

result
}
}

Expand Down
Loading