Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arrow = { workspace = true }
async-trait = { workspace = true }
aws-config = "1.8.6"
aws-credential-types = "1.2.7"
chrono = { workspace = true }
clap = { version = "4.5.47", features = ["derive", "cargo"] }
datafusion = { workspace = true, features = [
"avro",
Expand Down
149 changes: 147 additions & 2 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ use std::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::Duration,
};

use async_trait::async_trait;
use chrono::Utc;
use datafusion::{
common::instant::Instant,
error::DataFusionError,
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use futures::stream::BoxStream;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
};
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use url::Url;

/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
Expand Down Expand Up @@ -81,6 +84,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
pub struct InstrumentedObjectStore {
inner: Arc<dyn ObjectStore>,
instrument_mode: AtomicU8,
requests: Mutex<Vec<RequestDetails>>,
}

impl InstrumentedObjectStore {
Expand All @@ -89,12 +93,46 @@ impl InstrumentedObjectStore {
Self {
inner: object_store,
instrument_mode,
requests: Mutex::new(Vec::new()),
}
}

fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
self.instrument_mode.store(mode as u8, Ordering::Relaxed)
}

/// Returns all [`RequestDetails`] accumulated in this [`InstrumentedObjectStore`] and clears
/// the stored requests
pub fn take_requests(&self) -> Vec<RequestDetails> {
let mut req = self.requests.lock();

req.drain(..).collect()
}

async fn instrumented_get_opts(
&self,
location: &Path,
options: GetOptions,
) -> Result<GetResult> {
let timestamp = Utc::now();
let range = options.range.clone();

let start = Instant::now();
let ret = self.inner.get_opts(location, options).await?;
let elapsed = start.elapsed();

self.requests.lock().push(RequestDetails {
op: Operation::Get,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: Some((ret.range.end - ret.range.start) as usize),
range,
extra_display: None,
});

Ok(ret)
}
}

impl fmt::Display for InstrumentedObjectStore {
Expand Down Expand Up @@ -129,6 +167,12 @@ impl ObjectStore for InstrumentedObjectStore {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
if self.instrument_mode.load(Ordering::Relaxed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make the code cleaner to move the load ordering into its own method -- like

Something like this maybe

impl InstrumentedObjectStore {
  /// return the current mode of this instrumented store
  fn enabled(&self) -> bool {
    self.instrument_mode.load(Ordering::Relaxed) != InstrumentedObjectStoreMode::Disabled
  }
}

!= InstrumentedObjectStoreMode::Disabled as u8
{
return self.instrumented_get_opts(location, options).await;
}

self.inner.get_opts(location, options).await
}

Expand Down Expand Up @@ -157,6 +201,55 @@ impl ObjectStore for InstrumentedObjectStore {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
enum Operation {
_Copy,
_Delete,
Get,
_Head,
_List,
_Put,
}

/// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`]
#[derive(Debug)]
pub struct RequestDetails {
op: Operation,
path: Path,
timestamp: chrono::DateTime<Utc>,
duration: Option<Duration>,
size: Option<usize>,
range: Option<GetRange>,
extra_display: Option<String>,
}

impl fmt::Display for RequestDetails {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut output_parts = vec![format!(
"{} operation={:?}",
self.timestamp.to_rfc3339(),
self.op
)];

if let Some(d) = self.duration {
output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
}
if let Some(s) = self.size {
output_parts.push(format!("size={s}"));
}
if let Some(r) = &self.range {
output_parts.push(format!("range: {r}"));
}
output_parts.push(format!("path={}", self.path));

if let Some(ed) = &self.extra_display {
output_parts.push(ed.clone());
}

write!(f, "{}", output_parts.join(" "))
}
}

/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
#[derive(Debug)]
pub struct InstrumentedObjectStoreRegistry {
Expand Down Expand Up @@ -275,4 +368,56 @@ mod tests {
assert!(fetched.is_ok());
assert_eq!(reg.stores().len(), 1);
}

#[tokio::test]
async fn instrumented_store() {
let store = Arc::new(object_store::memory::InMemory::new());
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
let instrumented = InstrumentedObjectStore::new(store, mode);

// Load the test store with some data we can read
let path = Path::from("test/data");
let payload = PutPayload::from_static(b"test_data");
instrumented.put(&path, payload).await.unwrap();

// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.get(&path).await.unwrap();
assert!(instrumented.requests.lock().is_empty());

instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Enabled);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.get(&path).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);

let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good here to assert on the contents of the request too -- to make sure that all fields got captured

For example:

assert_eq!(requests[0].op, Operation::Get);
// check other fields, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 3c3e0ef


let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Get);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert_eq!(request.size, Some(9));
assert_eq!(request.range, None);
assert!(request.extra_display.is_none());
}

#[test]
fn request_details() {
let rd = RequestDetails {
op: Operation::Get,
path: Path::from("test"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: Some(Duration::new(5, 0)),
size: Some(10),
range: Some((..10).into()),
extra_display: Some(String::from("extra info")),
};

assert_eq!(
format!("{rd}"),
"1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
);
}
}
11 changes: 10 additions & 1 deletion datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,16 @@ impl PrintOptions {
{
writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
for store in self.instrumented_registry.stores() {
writeln!(writer, "{store}")?;
let requests = store.take_requests();

if !requests.is_empty() {
writeln!(writer, "{store}")?;
for req in requests.iter() {
writeln!(writer, "{req}")?;
}
// Add an extra blank line to help visually organize the output
writeln!(writer)?;
}
}
}
}
Expand Down
79 changes: 70 additions & 9 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::process::Command;

use rstest::rstest;

use async_trait::async_trait;
use insta::{glob, Settings};
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
use std::path::PathBuf;
Expand Down Expand Up @@ -374,8 +375,6 @@ async fn test_s3_url_fallback() {
settings.set_snapshot_suffix("s3_url_fallback");
let _bound = settings.bind_to_scope();

let port = container.get_host_port_ipv4(9000).await.unwrap();

// Create a table using a prefix path (without trailing slash)
// This should trigger the fallback logic where head() fails on the prefix
// and list() is used to discover the actual files
Expand All @@ -389,11 +388,73 @@ OPTIONS (
SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
"#;

assert_cmd_snapshot!(cli()
.env_clear()
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
.env("AWS_ALLOW_HTTP", "true")
.pass_stdin(input));
assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
}

/// Validate object store profiling output
#[tokio::test]
async fn test_object_store_profiling() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
eprintln!("Skipping external storages integration tests");
return;
}

let container = setup_minio_container().await;

let mut settings = make_settings();
settings.set_snapshot_suffix("s3_url_fallback");

// as the object store profiling contains timestamps and durations, we must
// filter them out to have stable snapshots
//
// Example line to filter:
// 2025-10-11T12:02:59.722646+00:00 operation=Get duration=0.001495s size=1006 path=cars.csv
// Output:
// <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
settings.add_filter(
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s size=(\d+) path=(.*)",
"<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
);

let _bound = settings.bind_to_scope();

let input = r#"
CREATE EXTERNAL TABLE CARS
STORED AS CSV
LOCATION 's3://data/cars.csv';

-- Initial query should not show any profiling as the object store is not instrumented yet
SELECT * from CARS LIMIT 1;
\object_store_profiling enabled
-- Query again to see the profiling output
SELECT * from CARS LIMIT 1;
\object_store_profiling disabled
-- Final query should not show any profiling as we disabled it again
SELECT * from CARS LIMIT 1;
"#;

assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
}

/// Extension trait to Add the minio connection information to a Command
#[async_trait]
trait MinioCommandExt {
async fn with_minio(&mut self, container: &ContainerAsync<minio::MinIO>)
-> &mut Self;
}

#[async_trait]
impl MinioCommandExt for Command {
async fn with_minio(
&mut self,
container: &ContainerAsync<minio::MinIO>,
) -> &mut Self {
let port = container.get_host_port_ipv4(9000).await.unwrap();

self.env_clear()
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
.env("AWS_ALLOW_HTTP", "true")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
source: datafusion-cli/tests/cli_integration.rs
info:
program: datafusion-cli
args: []
env:
AWS_ACCESS_KEY_ID: TEST-DataFusionLogin
AWS_ALLOW_HTTP: "true"
AWS_ENDPOINT: "http://localhost:55031"
AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword
stdin: "\n CREATE EXTERNAL TABLE CARS\nSTORED AS CSV\nLOCATION 's3://data/cars.csv';\n\n-- Initial query should not show any profiling as the object store is not instrumented yet\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling enabled\n-- Query again to see the profiling output\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling disabled\n-- Final query should not show any profiling as we disabled it again\nSELECT * from CARS LIMIT 1;\n"
snapshot_kind: text
---
success: true
exit_code: 0
----- stdout -----
[CLI_VERSION]
0 row(s) fetched.
[ELAPSED]

+-----+-------+---------------------+
| car | speed | time |
+-----+-------+---------------------+
| red | 20.0 | 1996-04-12T12:05:03 |
+-----+-------+---------------------+
1 row(s) fetched.
[ELAPSED]

ObjectStore Profile mode set to Enabled
+-----+-------+---------------------+
| car | speed | time |
+-----+-------+---------------------+
| red | 20.0 | 1996-04-12T12:05:03 |
+-----+-------+---------------------+
1 row(s) fetched.
[ELAPSED]

Object Store Profiling
Instrumented Object Store: instrument_mode: Enabled, inner: AmazonS3(data)
<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty cool to see


ObjectStore Profile mode set to Disabled
+-----+-------+---------------------+
| car | speed | time |
+-----+-------+---------------------+
| red | 20.0 | 1996-04-12T12:05:03 |
+-----+-------+---------------------+
1 row(s) fetched.
[ELAPSED]

\q

----- stderr -----