Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/advanced_parque…
Browse files Browse the repository at this point in the history
…t_index
  • Loading branch information
alamb committed Jun 19, 2024
2 parents 9be984d + 61e2ddb commit e9caa0d
Show file tree
Hide file tree
Showing 46 changed files with 2,194 additions and 873 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ prost-derive = { version = "0.12", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
test-utils = { path = "../test-utils" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.11"
url = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ cargo run --example csv_sql
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions
Expand Down
326 changes: 171 additions & 155 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,172 +15,188 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::assert_batches_eq;
use datafusion_common::instant::Instant;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::SchemaRef;
use futures::StreamExt;
use nix::sys::stat;
use nix::unistd;
use tempfile::TempDir;
use tokio::task::JoinSet;

use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
const TEST_DATA_SIZE: usize = 5;

/// Makes a TableProvider for a fifo file using `StreamTable` with the `StreamProvider` trait
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
.with_header(true);
let config = StreamConfig::new(Arc::new(source)).with_order(sort);
Arc::new(StreamTable::new(Arc::new(config)))
}
#[cfg(not(target_os = "windows"))]
mod non_windows {
use datafusion::assert_batches_eq;
use datafusion_common::instant::Instant;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::SchemaRef;
use futures::StreamExt;
use nix::sys::stat;
use nix::unistd;
use tempfile::TempDir;
use tokio::task::JoinSet;

use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
const TEST_DATA_SIZE: usize = 5;

/// Makes a TableProvider for a fifo file using `StreamTable` with the `StreamProvider` trait
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
.with_header(true);
let config = StreamConfig::new(Arc::new(source)).with_order(sort);
Arc::new(StreamTable::new(Arc::new(config)))
}

fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
let file_path = tmp_dir.path().join(file_name);
// Simulate an infinite environment via a FIFO file
if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
exec_err!("{}", e)
} else {
Ok(file_path)
fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
let file_path = tmp_dir.path().join(file_name);
// Simulate an infinite environment via a FIFO file
if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
exec_err!("{}", e)
} else {
Ok(file_path)
}
}
}

fn write_to_fifo(
mut file: &File,
line: &str,
ref_time: Instant,
broken_pipe_timeout: Duration,
) -> Result<()> {
// We need to handle broken pipe error until the reader is ready. This
// is why we use a timeout to limit the wait duration for the reader.
// If the error is different than broken pipe, we fail immediately.
while let Err(e) = file.write_all(line.as_bytes()) {
if e.raw_os_error().unwrap() == 32 {
let interval = Instant::now().duration_since(ref_time);
if interval < broken_pipe_timeout {
thread::sleep(Duration::from_millis(100));
continue;
fn write_to_fifo(
mut file: &File,
line: &str,
ref_time: Instant,
broken_pipe_timeout: Duration,
) -> Result<()> {
// We need to handle broken pipe error until the reader is ready. This
// is why we use a timeout to limit the wait duration for the reader.
// If the error is different than broken pipe, we fail immediately.
while let Err(e) = file.write_all(line.as_bytes()) {
if e.raw_os_error().unwrap() == 32 {
let interval = Instant::now().duration_since(ref_time);
if interval < broken_pipe_timeout {
thread::sleep(Duration::from_millis(100));
continue;
}
}
return exec_err!("{}", e);
}
return exec_err!("{}", e);
Ok(())
}
Ok(())
}

fn create_writing_thread(
file_path: PathBuf,
maybe_header: Option<String>,
lines: Vec<String>,
waiting_lock: Arc<AtomicBool>,
wait_until: usize,
tasks: &mut JoinSet<()>,
) {
// Timeout for a long period of BrokenPipe error
let broken_pipe_timeout = Duration::from_secs(10);
let sa = file_path.clone();
// Spawn a new thread to write to the FIFO file
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
tasks.spawn_blocking(move || {
let file = OpenOptions::new().write(true).open(sa).unwrap();
// Reference time to use when deciding to fail the test
let execution_start = Instant::now();
if let Some(header) = maybe_header {
write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
}
for (cnt, line) in lines.iter().enumerate() {
while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
thread::sleep(Duration::from_millis(50));
fn create_writing_thread(
file_path: PathBuf,
maybe_header: Option<String>,
lines: Vec<String>,
waiting_lock: Arc<AtomicBool>,
wait_until: usize,
tasks: &mut JoinSet<()>,
) {
// Timeout for a long period of BrokenPipe error
let broken_pipe_timeout = Duration::from_secs(10);
let sa = file_path.clone();
// Spawn a new thread to write to the FIFO file
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
tasks.spawn_blocking(move || {
let file = OpenOptions::new().write(true).open(sa).unwrap();
// Reference time to use when deciding to fail the test
let execution_start = Instant::now();
if let Some(header) = maybe_header {
write_to_fifo(&file, &header, execution_start, broken_pipe_timeout)
.unwrap();
}
for (cnt, line) in lines.iter().enumerate() {
while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
thread::sleep(Duration::from_millis(50));
}
write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
}
write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
drop(file);
});
}

/// This example demonstrates a scanning against an Arrow data source (JSON) and
/// fetching results
pub async fn main() -> Result<()> {
// Create session context
let config = SessionConfig::new()
.with_batch_size(TEST_BATCH_SIZE)
.with_collect_statistics(false)
.with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
let tmp_dir = TempDir::new()?;
let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;

let mut tasks: JoinSet<()> = JoinSet::new();
let waiting = Arc::new(AtomicBool::new(true));

let data_iter = 0..TEST_DATA_SIZE;
let lines = data_iter
.map(|i| format!("{},{}\n", i, i + 1))
.collect::<Vec<_>>();

create_writing_thread(
fifo_path.clone(),
Some("a1,a2\n".to_owned()),
lines.clone(),
waiting.clone(),
TEST_DATA_SIZE,
&mut tasks,
);

// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));

// Specify the ordering:
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];

let provider = fifo_table(schema.clone(), fifo_path, order.clone());
ctx.register_table("fifo", provider)?;

let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
let mut stream = df.execute_stream().await.unwrap();

let mut batches = Vec::new();
if let Some(Ok(batch)) = stream.next().await {
batches.push(batch)
}
drop(file);
});

let expected = vec![
"+----+----+",
"| a1 | a2 |",
"+----+----+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"+----+----+",
];

assert_batches_eq!(&expected, &batches);

Ok(())
}
}

/// This example demonstrates a scanning against an Arrow data source (JSON) and
/// fetching results
#[tokio::main]
async fn main() -> Result<()> {
// Create session context
let config = SessionConfig::new()
.with_batch_size(TEST_BATCH_SIZE)
.with_collect_statistics(false)
.with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
let tmp_dir = TempDir::new()?;
let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;

let mut tasks: JoinSet<()> = JoinSet::new();
let waiting = Arc::new(AtomicBool::new(true));

let data_iter = 0..TEST_DATA_SIZE;
let lines = data_iter
.map(|i| format!("{},{}\n", i, i + 1))
.collect::<Vec<_>>();

create_writing_thread(
fifo_path.clone(),
Some("a1,a2\n".to_owned()),
lines.clone(),
waiting.clone(),
TEST_DATA_SIZE,
&mut tasks,
);

// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));

// Specify the ordering:
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];

let provider = fifo_table(schema.clone(), fifo_path, order.clone());
ctx.register_table("fifo", provider)?;

let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
let mut stream = df.execute_stream().await.unwrap();

let mut batches = Vec::new();
if let Some(Ok(batch)) = stream.next().await {
batches.push(batch)
async fn main() -> datafusion_common::Result<()> {
#[cfg(target_os = "windows")]
{
println!("file_stream_provider example does not work on windows");
Ok(())
}
#[cfg(not(target_os = "windows"))]
{
non_windows::main().await
}

let expected = vec![
"+----+----+",
"| a1 | a2 |",
"+----+----+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"+----+----+",
];

assert_batches_eq!(&expected, &batches);

Ok(())
}
6 changes: 5 additions & 1 deletion datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,11 @@ impl ParquetMetadataIndexBuilder {
reader.schema(),
reader.parquet_schema(),
)?;
let row_counts = StatisticsConverter::row_group_row_counts(row_groups.iter())?;
let row_counts = converter
.row_group_row_counts(row_groups.iter())?
.ok_or_else(|| {
internal_datafusion_err!("Row group row counts are missing")
})?;
let value_column_mins = converter.row_group_mins(row_groups.iter())?;
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?;

Expand Down
Loading

0 comments on commit e9caa0d

Please sign in to comment.