diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml index b98e0a1740cbe..1cc39ae76c89a 100644 --- a/.github/workflows/extended.yml +++ b/.github/workflows/extended.yml @@ -33,6 +33,42 @@ on: - main jobs: + # Check crate compiles and base cargo check passes + linux-build-lib: + name: linux build test + runs-on: ubuntu-latest + container: + image: amd64/rust + steps: + - uses: actions/checkout@v4 + - name: Setup Rust toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: stable + - name: Prepare cargo build + run: cargo check --profile ci --all-targets + + # Run extended tests (with feature 'extended_tests') + linux-test-extended: + name: cargo test (amd64) + needs: linux-build-lib + runs-on: ubuntu-latest + container: + image: amd64/rust + steps: + - uses: actions/checkout@v4 + with: + submodules: true + fetch-depth: 1 + - name: Setup Rust toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: stable + - name: Run tests (excluding doctests) + run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,extended_tests + - name: Verify Working Directory Clean + run: git diff --exit-code + # Check answers are correct when hash values collide hash-collisions: name: cargo test hash collisions (amd64) @@ -51,7 +87,8 @@ jobs: - name: Run tests run: | cd datafusion - cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib --tests --features=force_hash_collisions,avro + cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib --tests --features=force_hash_collisions,avro,extended_tests + sqllogictest-sqlite: name: "Run sqllogictests with the sqlite test suite" runs-on: ubuntu-latest diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index e341816b2b8a9..149bf8beb96e9 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -80,6 +80,7 @@ unicode_expressions = [ "datafusion-sql/unicode_expressions", "datafusion-functions/unicode_expressions", ] +extended_tests = [] [dependencies] apache-avro = { version = "0.17", optional = true } @@ -150,6 +151,7 @@ rand_distr = "0.4.3" regex = { workspace = true } rstest = { workspace = true } serde_json = { workspace = true } +sysinfo = "0.33.1" test-utils = { path = "../../test-utils" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] } diff --git a/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs b/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs new file mode 100644 index 0000000000000..32df6c5d62937 --- /dev/null +++ b/datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Validates query's actual memory usage is consistent with the specified memory +//! limit. + +mod sort_mem_validation; +mod utils; diff --git a/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs b/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs new file mode 100644 index 0000000000000..1789f37535a94 --- /dev/null +++ b/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory limit validation tests for the sort queries +//! +//! These tests must run in separate processes to accurately measure memory usage. +//! This file is organized as: +//! - Test runners that spawn individual test processes +//! - Test cases that contain the actual validation logic +use std::{process::Command, str}; + +use log::info; + +use crate::memory_limit::memory_limit_validation::utils; + +// =========================================================================== +// Test runners: +// Runners are splitted into multiple tests to run in parallel +// =========================================================================== + +#[test] +fn memory_limit_validation_runner_works_runner() { + spawn_test_process("memory_limit_validation_runner_works"); +} + +#[test] +fn sort_no_mem_limit_runner() { + spawn_test_process("sort_no_mem_limit"); +} + +#[test] +fn sort_with_mem_limit_1_runner() { + spawn_test_process("sort_with_mem_limit_1"); +} + +#[test] +fn sort_with_mem_limit_2_runner() { + spawn_test_process("sort_with_mem_limit_2"); +} + +#[test] +fn sort_with_mem_limit_3_runner() { + spawn_test_process("sort_with_mem_limit_3"); +} + +#[test] +fn sort_with_mem_limit_2_cols_1_runner() { + spawn_test_process("sort_with_mem_limit_2_cols_1"); +} + +#[test] +fn sort_with_mem_limit_2_cols_2_runner() { + spawn_test_process("sort_with_mem_limit_2_cols_2"); +} + +/// Helper function that executes a test in a separate process with the required environment +/// variable set. Memory limit validation tasks need to measure memory resident set +/// size (RSS), so they must run in a separate process. +fn spawn_test_process(test: &str) { + let test_path = format!( + "memory_limit::memory_limit_validation::sort_mem_validation::{}", + test + ); + info!("Running test: {}", test_path); + + // Run the test command + let output = Command::new("cargo") + .arg("test") + .arg("--package") + .arg("datafusion") + .arg("--test") + .arg("core_integration") + .arg("--features") + .arg("extended_tests") + .arg("--") + .arg(&test_path) + .arg("--exact") + .arg("--nocapture") + .env("DATAFUSION_TEST_MEM_LIMIT_VALIDATION", "1") + .output() + .expect("Failed to execute test command"); + + // Convert output to strings + let stdout = str::from_utf8(&output.stdout).unwrap_or(""); + let stderr = str::from_utf8(&output.stderr).unwrap_or(""); + + info!("{}", stdout); + + assert!( + output.status.success(), + "Test '{}' failed with status: {}\nstdout:\n{}\nstderr:\n{}", + test, + output.status, + stdout, + stderr + ); +} + +// =========================================================================== +// Test cases: +// All following tests need to be run through their individual test wrapper. +// When run directly, environment variable `DATAFUSION_TEST_MEM_LIMIT_VALIDATION` +// is not set, test will return with a no-op. +// +// If some tests consistently fail, suppress by setting a larger expected memory +// usage (e.g. 80_000_000 * 3 -> 80_000_000 * 4) +// =========================================================================== + +/// Test runner itself: if memory limit violated, test should fail. +#[tokio::test] +async fn memory_limit_validation_runner_works() { + if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() { + println!("Skipping test because DATAFUSION_TEST_MEM_LIMIT_VALIDATION is not set"); + + return; + } + + let result = std::panic::catch_unwind(|| { + tokio::runtime::Runtime::new().unwrap().block_on(async { + utils::validate_query_with_memory_limits( + 20_000_000, // set an impossible limit: query requires at least 80MB + None, + "select * from generate_series(1,10000000) as t1(c1) order by c1", + "select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data + ) + .await; + }) + }); + + assert!( + result.is_err(), + "Expected the query to panic due to memory limit" + ); +} + +#[tokio::test] +async fn sort_no_mem_limit() { + utils::validate_query_with_memory_limits( + 80_000_000 * 3, + None, + "select * from generate_series(1,10000000) as t1(c1) order by c1", + "select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data + ) + .await; +} + +#[tokio::test] +async fn sort_with_mem_limit_1() { + utils::validate_query_with_memory_limits( + 40_000_000 * 5, + Some(40_000_000), + "select * from generate_series(1,10000000) as t1(c1) order by c1", + "select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data + ) + .await; +} + +#[tokio::test] +async fn sort_with_mem_limit_2() { + utils::validate_query_with_memory_limits( + 80_000_000 * 3, + Some(80_000_000), + "select * from generate_series(1,10000000) as t1(c1) order by c1", + "select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data + ) + .await; +} + +#[tokio::test] +async fn sort_with_mem_limit_3() { + utils::validate_query_with_memory_limits( + 80_000_000 * 3, + Some(80_000_000 * 10), // mem limit is large enough so that no spill happens + "select * from generate_series(1,10000000) as t1(c1) order by c1", + "select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data + ) + .await; +} + +#[tokio::test] +async fn sort_with_mem_limit_2_cols_1() { + let memory_usage_in_theory = 80_000_000 * 2; // 2 columns + let expected_max_mem_usage = memory_usage_in_theory * 4; + utils::validate_query_with_memory_limits( + expected_max_mem_usage, + None, + "select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST", + "select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data + ) + .await; +} + +// TODO: Query fails, fix it +// Issue: https://github.com/apache/datafusion/issues/14143 +#[ignore] +#[tokio::test] +async fn sort_with_mem_limit_2_cols_2() { + let memory_usage_in_theory = 80_000_000 * 2; // 2 columns + let expected_max_mem_usage = memory_usage_in_theory * 3; + let mem_limit = memory_usage_in_theory as f64 * 0.5; + + utils::validate_query_with_memory_limits( + expected_max_mem_usage, + Some(mem_limit as i64), + "select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST", + "select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data + ) + .await; +} diff --git a/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs new file mode 100644 index 0000000000000..bdf30c140afff --- /dev/null +++ b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common_runtime::SpawnedTask; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use sysinfo::System; +use tokio::time::{interval, Duration}; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::{ + memory_pool::{human_readable_size, FairSpillPool}, + runtime_env::RuntimeEnvBuilder, +}; + +/// Measures the maximum RSS (in bytes) during the execution of an async task. RSS +/// will be sampled every 7ms. +/// +/// # Arguments +/// +/// * `f` - A closure that returns the async task to be measured. +/// +/// # Returns +/// +/// A tuple containing the result of the async task and the maximum RSS observed. +async fn measure_max_rss(f: F) -> (T, usize) +where + F: FnOnce() -> Fut, + Fut: std::future::Future, +{ + // Initialize system information + let mut system = System::new_all(); + system.refresh_all(); + + // Get the current process ID + let pid = sysinfo::get_current_pid().expect("Failed to get current PID"); + + // Shared atomic variable to store max RSS + let max_rss = Arc::new(AtomicUsize::new(0)); + + // Clone for the monitoring task + let max_rss_clone = Arc::clone(&max_rss); + + // Spawn a monitoring task + let monitor_handle = SpawnedTask::spawn(async move { + let mut sys = System::new_all(); + let mut interval = interval(Duration::from_millis(7)); + + loop { + interval.tick().await; + sys.refresh_all(); + if let Some(process) = sys.process(pid) { + let rss_bytes = process.memory(); + max_rss_clone + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { + if rss_bytes as usize > current { + Some(rss_bytes as usize) + } else { + None + } + }) + .ok(); + } else { + // Process no longer exists + break; + } + } + }); + + // Execute the async task + let result = f().await; + + // Give some time for the monitor to catch the final memory state + tokio::time::sleep(Duration::from_millis(200)).await; + + // Terminate the monitoring task + drop(monitor_handle); + + // Retrieve the maximum RSS + let peak_rss = max_rss.load(Ordering::Relaxed); + + (result, peak_rss) +} + +/// Query runner that validates the memory usage of the query. +/// +/// Note this function is supposed to run in a separate process for accurate memory +/// estimation. If environment variable `DATAFUSION_TEST_MEM_LIMIT_VALIDATION` is +/// not set, this function will return immediately, so test cases calls this function +/// should first set the environment variable, then create a new process to run. +/// See `sort_mem_validation.rs` for more details. +/// +/// # Arguments +/// +/// * `expected_mem_bytes` - The maximum expected memory usage for the query. +/// * `mem_limit_bytes` - The memory limit of the query in bytes. `None` means no +/// memory limit is presented. +/// * `query` - The SQL query to execute +/// * `baseline_query` - The SQL query to execute for estimating constant overhead. +/// This query should use 10% of the data of the main query. +/// +/// # Example +/// +/// utils::validate_query_with_memory_limits( +/// 40_000_000 * 2, +/// Some(40_000_000), +/// "SELECT * FROM generate_series(1, 100000000) AS t(i) ORDER BY i", +/// "SELECT * FROM generate_series(1, 10000000) AS t(i) ORDER BY i" +/// ); +/// +/// The above function call means: +/// Set the memory limit to 40MB, and the profiled memory usage of {query - baseline_query} +/// should be less than 40MB * 2. +pub async fn validate_query_with_memory_limits( + expected_mem_bytes: i64, + mem_limit_bytes: Option, + query: &str, + baseline_query: &str, +) { + if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() { + println!("Skipping test because DATAFUSION_TEST_MEM_LIMIT_VALIDATION is not set"); + + return; + } + + println!("Current process ID: {}", std::process::id()); + + let runtime_builder = RuntimeEnvBuilder::new(); + + let runtime = match mem_limit_bytes { + Some(mem_limit_bytes) => runtime_builder + .with_memory_pool(Arc::new(FairSpillPool::new(mem_limit_bytes as usize))) + .build_arc() + .unwrap(), + None => runtime_builder.build_arc().unwrap(), + }; + + let session_config = SessionConfig::new().with_target_partitions(4); // Make sure the configuration is the same if test is running on different machines + + let ctx = SessionContext::new_with_config_rt(session_config, runtime); + + let df = ctx.sql(query).await.unwrap(); + // Run a query with 10% data to estimate the constant overhead + let df_small = ctx.sql(baseline_query).await.unwrap(); + + let (_, baseline_max_rss) = + measure_max_rss(|| async { df_small.collect().await.unwrap() }).await; + + let (_, max_rss) = measure_max_rss(|| async { df.collect().await.unwrap() }).await; + + println!( + "Memory before: {}, Memory after: {}", + human_readable_size(baseline_max_rss), + human_readable_size(max_rss) + ); + + let actual_mem_usage = max_rss as f64 - baseline_max_rss as f64; + + println!( + "Query: {}, Memory usage: {}, Memory limit: {}", + query, + human_readable_size(actual_mem_usage as usize), + human_readable_size(expected_mem_bytes as usize) + ); + + assert!( + actual_mem_usage < expected_mem_bytes as f64, + "Memory usage exceeded the theoretical limit. Actual: {}, Expected limit: {}", + human_readable_size(actual_mem_usage as usize), + human_readable_size(expected_mem_bytes as usize) + ); +} diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index c7514d1c24b1b..b6f2f8e9ac4aa 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -16,6 +16,8 @@ // under the License. //! This module contains tests for limiting memory at runtime in DataFusion +#[cfg(feature = "extended_tests")] +mod memory_limit_validation; use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; diff --git a/parquet-testing b/parquet-testing index e45cd23f784aa..f4d7ed772a62a 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7 +Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882 diff --git a/testing b/testing index 98fceecd024dc..d2a1371230349 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4 +Subproject commit d2a13712303498963395318a4eb42872e66aead7