Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/fifo tests cleanup #11616

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
pub mod sanity_checker;
mod sort_pushdown;
#[cfg(test)]
pub mod test_utils;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
mod utils;

#[cfg(test)]
pub mod test_utils;
mod sort_pushdown;
mod utils;

pub use datafusion_physical_optimizer::*;
174 changes: 86 additions & 88 deletions datafusion/core/tests/fifo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
Expand All @@ -16,38 +16,37 @@
// under the License.

//! This test demonstrates the DataFusion FIFO capabilities.
//!

#[cfg(target_family = "unix")]
#[cfg(test)]
mod unix_test {
use datafusion_common::instant::Instant;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::fs::File;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use arrow::array::Array;
use arrow::csv::ReaderBuilder;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::SchemaRef;
use futures::StreamExt;
use nix::sys::stat;
use nix::unistd;
use tempfile::TempDir;
use tokio::task::{spawn_blocking, JoinHandle};

use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use datafusion::datasource::TableProvider;
use datafusion::{
prelude::{CsvReadOptions, SessionConfig, SessionContext},
test_util::{aggr_test_schema, arrow_test_data},
};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_common::instant::Instant;
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;

use futures::StreamExt;
use nix::sys::stat;
use nix::unistd;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tokio::task::{spawn_blocking, JoinHandle};

/// Makes a TableProvider for a fifo file
fn fifo_table(
schema: SchemaRef,
Expand All @@ -71,20 +70,20 @@ mod unix_test {
}
}

fn write_to_fifo(
mut file: &File,
async fn write_to_fifo(
file: &mut tokio::fs::File,
line: &str,
ref_time: Instant,
broken_pipe_timeout: Duration,
) -> Result<()> {
// We need to handle broken pipe error until the reader is ready. This
// is why we use a timeout to limit the wait duration for the reader.
// If the error is different than broken pipe, we fail immediately.
while let Err(e) = file.write_all(line.as_bytes()) {
while let Err(e) = file.write_all(line.as_bytes()).await {
if e.raw_os_error().unwrap() == 32 {
let interval = Instant::now().duration_since(ref_time);
if interval < broken_pipe_timeout {
thread::sleep(Duration::from_millis(100));
tokio::time::sleep(Duration::from_millis(50)).await;
continue;
}
}
Expand All @@ -93,28 +92,38 @@ mod unix_test {
Ok(())
}

fn create_writing_thread(
/// This function creates a writing task for the FIFO file. To verify
/// incremental processing, it waits for a signal to continue writing after
/// a certain number of lines are written.
#[allow(clippy::disallowed_methods)]
fn create_writing_task(
file_path: PathBuf,
header: String,
lines: Vec<String>,
waiting_lock: Arc<AtomicBool>,
wait_until: usize,
waiting_signal: Arc<AtomicBool>,
send_before_waiting: usize,
) -> JoinHandle<()> {
// Timeout for a long period of BrokenPipe error
let broken_pipe_timeout = Duration::from_secs(10);
let sa = file_path.clone();
// Spawn a new thread to write to the FIFO file
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
spawn_blocking(move || {
let file = OpenOptions::new().write(true).open(sa).unwrap();
// Spawn a new task to write to the FIFO file
tokio::spawn(async move {
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.open(file_path)
.await
.unwrap();
// Reference time to use when deciding to fail the test
let execution_start = Instant::now();
write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
write_to_fifo(&mut file, &header, execution_start, broken_pipe_timeout)
.await
.unwrap();
for (cnt, line) in lines.iter().enumerate() {
while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
thread::sleep(Duration::from_millis(50));
while waiting_signal.load(Ordering::SeqCst) && cnt > send_before_waiting {
tokio::time::sleep(Duration::from_millis(50)).await;
}
write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
write_to_fifo(&mut file, line, execution_start, broken_pipe_timeout)
.await
.unwrap();
}
drop(file);
})
Expand All @@ -125,14 +134,16 @@ mod unix_test {
const TEST_BATCH_SIZE: usize = 20;
// Number of lines written to FIFO
const TEST_DATA_SIZE: usize = 20_000;
// Number of lines to write before waiting to verify incremental processing
const SEND_BEFORE_WAITING: usize = 2 * TEST_BATCH_SIZE;
// Number of lines what can be joined. Each joinable key produced 20 lines with
// aggregate_test_100 dataset. We will use these joinable keys for understanding
// incremental execution.
const TEST_JOIN_RATIO: f64 = 0.01;

// This test provides a relatively realistic end-to-end scenario where
// we swap join sides to accommodate a FIFO source.
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
#[tokio::test]
async fn unbounded_file_with_swapped_join() -> Result<()> {
// Create session context
let config = SessionConfig::new()
Expand Down Expand Up @@ -162,8 +173,8 @@ mod unix_test {
.zip(0..TEST_DATA_SIZE)
.map(|(a1, a2)| format!("{a1},{a2}\n"))
.collect::<Vec<_>>();
// Create writing threads for the left and right FIFO files
let task = create_writing_thread(
// Create writing tasks for the left and right FIFO files
let task = create_writing_task(
fifo_path.clone(),
"a1,a2\n".to_owned(),
lines,
Expand All @@ -190,7 +201,16 @@ mod unix_test {
)
.await?;
// Execute the query
let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?;
let df = ctx
.sql(
"SELECT
t1.a2, t2.c1, t2.c4, t2.c5
FROM
left as t1, right as t2
WHERE
t1.a1 = t2.c1",
)
.await?;
let mut stream = df.execute_stream().await?;
while (stream.next().await).is_some() {
waiting.store(false, Ordering::SeqCst);
Expand All @@ -199,16 +219,9 @@ mod unix_test {
Ok(())
}

#[derive(Debug, PartialEq)]
enum JoinOperation {
LeftUnmatched,
RightUnmatched,
Equal,
}

// This test provides a relatively realistic end-to-end scenario where
// we change the join into a [SymmetricHashJoin] to accommodate two
// unbounded (FIFO) sources.
/// This test provides a relatively realistic end-to-end scenario where
/// we change the join into a `SymmetricHashJoinExec` to accommodate two
/// unbounded (FIFO) sources.
#[tokio::test]
async fn unbounded_file_with_symmetric_join() -> Result<()> {
// Create session context
Expand Down Expand Up @@ -247,19 +260,18 @@ mod unix_test {
let df = ctx
.sql(
"SELECT
t1.a1,
t1.a2,
t2.a1,
t2.a2
t1.a1, t1.a2, t2.a1, t2.a2
FROM
left as t1 FULL
JOIN right as t2 ON t1.a2 = t2.a2
AND t1.a1 > t2.a1 + 4
AND t1.a1 < t2.a1 + 9",
left as t1
FULL JOIN
right as t2
ON
t1.a2 = t2.a2 AND
t1.a1 > t2.a1 + 4 AND
t1.a1 < t2.a1 + 9",
)
.await?;
let mut stream = df.execute_stream().await?;
let mut operations = vec![];

// Tasks
let mut tasks: Vec<JoinHandle<()>> = vec![];
Expand All @@ -273,54 +285,43 @@ mod unix_test {
.map(|(a1, a2)| format!("{a1},{a2}\n"))
.collect::<Vec<_>>();

// Create writing threads for the left and right FIFO files
tasks.push(create_writing_thread(
// Create writing tasks for the left and right FIFO files
tasks.push(create_writing_task(
left_fifo,
"a1,a2\n".to_owned(),
lines.clone(),
waiting.clone(),
TEST_BATCH_SIZE,
SEND_BEFORE_WAITING,
));
tasks.push(create_writing_thread(
tasks.push(create_writing_task(
right_fifo,
"a1,a2\n".to_owned(),
lines.clone(),
lines,
waiting.clone(),
TEST_BATCH_SIZE,
SEND_BEFORE_WAITING,
));
// Partial.
// Collect output data:
let (mut equal, mut left, mut right) = (0, 0, 0);
while let Some(Ok(batch)) = stream.next().await {
waiting.store(false, Ordering::SeqCst);
let left_unmatched = batch.column(2).null_count();
let right_unmatched = batch.column(0).null_count();
let op = if left_unmatched == 0 && right_unmatched == 0 {
JoinOperation::Equal
} else if right_unmatched > left_unmatched {
JoinOperation::RightUnmatched
if left_unmatched == 0 && right_unmatched == 0 {
equal += 1;
} else if right_unmatched <= left_unmatched {
left += 1;
} else {
JoinOperation::LeftUnmatched
right += 1;
};
operations.push(op);
}
futures::future::try_join_all(tasks).await.unwrap();

// The SymmetricHashJoin executor produces FULL join results at every
// pruning, which happens before it reaches the end of input and more
// than once. In this test, we feed partially joinable data to both
// sides in order to ensure that left or right unmatched results are
// generated more than once during the test.
assert!(
operations
.iter()
.filter(|&n| JoinOperation::RightUnmatched.eq(n))
.count()
> 1
&& operations
.iter()
.filter(|&n| JoinOperation::LeftUnmatched.eq(n))
.count()
> 1
);
// The symmetric hash join algorithm produces FULL join results at
// every pruning, which happens before it reaches the end of input and
// more than once. In this test, we feed partially joinable data to
// both sides in order to ensure that left or right unmatched results
// are generated as expected.
assert!(equal >= 0 && left > 1 && right > 1);
Ok(())
}

Expand All @@ -341,17 +342,14 @@ mod unix_test {
(source_fifo_path.clone(), source_fifo_path.display());
// Tasks
let mut tasks: Vec<JoinHandle<()>> = vec![];
// TEST_BATCH_SIZE + 1 rows will be provided. However, after processing precisely
// TEST_BATCH_SIZE rows, the program will pause and wait for a batch to be read in another
// thread. This approach ensures that the pipeline remains unbroken.
tasks.push(create_writing_thread(
tasks.push(create_writing_task(
source_fifo_path_thread,
"a1,a2\n".to_owned(),
(0..TEST_DATA_SIZE)
.map(|_| "a,1\n".to_string())
.collect::<Vec<_>>(),
waiting,
TEST_BATCH_SIZE,
SEND_BEFORE_WAITING,
));
// Create a new temporary FIFO file
let sink_fifo_path = create_fifo_file(&tmp_dir, "sink.csv")?;
Expand All @@ -370,8 +368,8 @@ mod unix_test {

let mut reader = ReaderBuilder::new(schema)
.with_batch_size(TEST_BATCH_SIZE)
.with_header(true)
.build(file)
.map_err(|e| DataFusionError::Internal(e.to_string()))
.unwrap();

while let Some(Ok(_)) = reader.next() {
Expand Down