Skip to content

Commit 4d3a6a8

Browse files
committed
Merge remote-tracking branch 'apache/main' into alamb/exec_clone
2 parents 13595e3 + 752561a commit 4d3a6a8

File tree

192 files changed

+4370
-1537
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

192 files changed

+4370
-1537
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
under the License.
1818
-->
1919

20-
* [DataFusion CHANGELOG](./datafusion/CHANGELOG.md)
20+
Change logs for each release can be found [here](dev/changelog).
21+
2122

2223
For older versions, see [apache/arrow/CHANGELOG.md](https://github.com/apache/arrow/blob/master/CHANGELOG.md).

benchmarks/src/sort.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};
2222

2323
use arrow::util::pretty;
2424
use datafusion::common::Result;
25-
use datafusion::physical_expr::PhysicalSortExpr;
25+
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
2626
use datafusion::physical_plan::collect;
2727
use datafusion::physical_plan::sorts::sort::SortExec;
2828
use datafusion::prelude::{SessionConfig, SessionContext};
@@ -170,13 +170,13 @@ impl RunOpt {
170170

171171
async fn exec_sort(
172172
ctx: &SessionContext,
173-
expr: &[PhysicalSortExpr],
173+
expr: LexOrderingRef<'_>,
174174
test_file: &TestParquetFile,
175175
debug: bool,
176176
) -> Result<(usize, std::time::Duration)> {
177177
let start = Instant::now();
178178
let scan = test_file.create_scan(ctx, None).await?;
179-
let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
179+
let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan));
180180
let task_ctx = ctx.task_ctx();
181181
let result = collect(exec, task_ctx).await?;
182182
let elapsed = start.elapsed();

datafusion/common/src/functional_dependencies.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl FunctionalDependencies {
334334
left_func_dependencies.extend(right_func_dependencies);
335335
left_func_dependencies
336336
}
337-
JoinType::LeftSemi | JoinType::LeftAnti => {
337+
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
338338
// These joins preserve functional dependencies of the left side:
339339
left_func_dependencies
340340
}

datafusion/common/src/join_type.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ pub enum JoinType {
4444
LeftAnti,
4545
/// Right Anti Join
4646
RightAnti,
47+
/// Left Mark join
48+
///
49+
/// Returns one record for each record from the left input. The output contains an additional
50+
/// column "mark" which is true if there is at least one match in the right input where the
51+
/// join condition evaluates to true. Otherwise, the mark column is false. For more details see
52+
/// [1]. This join type is used to decorrelate EXISTS subqueries used inside disjunctive
53+
/// predicates.
54+
///
55+
/// Note: This we currently do not implement the full null semantics for the mark join described
56+
/// in [1] which will be needed if we and ANY subqueries. In our version the mark column will
57+
/// only be true for had a match and false when no match was found, never null.
58+
///
59+
/// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf
60+
LeftMark,
4761
}
4862

4963
impl JoinType {
@@ -63,6 +77,7 @@ impl Display for JoinType {
6377
JoinType::RightSemi => "RightSemi",
6478
JoinType::LeftAnti => "LeftAnti",
6579
JoinType::RightAnti => "RightAnti",
80+
JoinType::LeftMark => "LeftMark",
6681
};
6782
write!(f, "{join_type}")
6883
}
@@ -82,6 +97,7 @@ impl FromStr for JoinType {
8297
"RIGHTSEMI" => Ok(JoinType::RightSemi),
8398
"LEFTANTI" => Ok(JoinType::LeftAnti),
8499
"RIGHTANTI" => Ok(JoinType::RightAnti),
100+
"LEFTMARK" => Ok(JoinType::LeftMark),
85101
_ => _not_impl_err!("The join type {s} does not exist or is not implemented"),
86102
}
87103
}
@@ -101,6 +117,7 @@ impl Display for JoinSide {
101117
match self {
102118
JoinSide::Left => write!(f, "left"),
103119
JoinSide::Right => write!(f, "right"),
120+
JoinSide::None => write!(f, "none"),
104121
}
105122
}
106123
}
@@ -113,6 +130,9 @@ pub enum JoinSide {
113130
Left,
114131
/// Right side of the join
115132
Right,
133+
/// Neither side of the join, used for Mark joins where the mark column does not belong to
134+
/// either side of the join
135+
None,
116136
}
117137

118138
impl JoinSide {
@@ -121,6 +141,7 @@ impl JoinSide {
121141
match self {
122142
JoinSide::Left => JoinSide::Right,
123143
JoinSide::Right => JoinSide::Left,
144+
JoinSide::None => JoinSide::None,
124145
}
125146
}
126147
}

datafusion/core/benches/physical_plan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use datafusion::physical_plan::{
3636
memory::MemoryExec,
3737
};
3838
use datafusion::prelude::SessionContext;
39+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3940

4041
// Initialise the operator using the provided record batches and the sort key
4142
// as inputs. All record batches must have the same schema.
@@ -52,7 +53,7 @@ fn sort_preserving_merge_operator(
5253
expr: col(name, &schema).unwrap(),
5354
options: Default::default(),
5455
})
55-
.collect::<Vec<_>>();
56+
.collect::<LexOrdering>();
5657

5758
let exec = MemoryExec::try_new(
5859
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),

datafusion/core/benches/sort.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
8989

9090
/// Benchmarks for SortPreservingMerge stream
9191
use criterion::{criterion_group, criterion_main, Criterion};
92+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
9293
use futures::StreamExt;
9394
use rand::rngs::StdRng;
9495
use rand::{Rng, SeedableRng};
@@ -257,7 +258,7 @@ impl BenchCase {
257258
}
258259

259260
/// Make sort exprs for each column in `schema`
260-
fn make_sort_exprs(schema: &Schema) -> Vec<PhysicalSortExpr> {
261+
fn make_sort_exprs(schema: &Schema) -> LexOrdering {
261262
schema
262263
.fields()
263264
.iter()

datafusion/core/src/dataframe/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3864,6 +3864,7 @@ mod tests {
38643864
JoinType::RightSemi,
38653865
JoinType::LeftAnti,
38663866
JoinType::RightAnti,
3867+
JoinType::LeftMark,
38673868
];
38683869

38693870
let default_partition_count = SessionConfig::new().target_partitions();
@@ -3881,7 +3882,10 @@ mod tests {
38813882
let join_schema = physical_plan.schema();
38823883

38833884
match join_type {
3884-
JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
3885+
JoinType::Left
3886+
| JoinType::LeftSemi
3887+
| JoinType::LeftAnti
3888+
| JoinType::LeftMark => {
38853889
let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
38863890
Arc::new(Column::new_with_schema("c1", &join_schema)?),
38873891
Arc::new(Column::new_with_schema("c2", &join_schema)?),

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 135 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2274,47 +2274,7 @@ mod tests {
22742274

22752275
#[tokio::test]
22762276
async fn parquet_sink_write() -> Result<()> {
2277-
let field_a = Field::new("a", DataType::Utf8, false);
2278-
let field_b = Field::new("b", DataType::Utf8, false);
2279-
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
2280-
let object_store_url = ObjectStoreUrl::local_filesystem();
2281-
2282-
let file_sink_config = FileSinkConfig {
2283-
object_store_url: object_store_url.clone(),
2284-
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
2285-
table_paths: vec![ListingTableUrl::parse("file:///")?],
2286-
output_schema: schema.clone(),
2287-
table_partition_cols: vec![],
2288-
insert_op: InsertOp::Overwrite,
2289-
keep_partition_by_columns: false,
2290-
};
2291-
let parquet_sink = Arc::new(ParquetSink::new(
2292-
file_sink_config,
2293-
TableParquetOptions {
2294-
key_value_metadata: std::collections::HashMap::from([
2295-
("my-data".to_string(), Some("stuff".to_string())),
2296-
("my-data-bool-key".to_string(), None),
2297-
]),
2298-
..Default::default()
2299-
},
2300-
));
2301-
2302-
// create data
2303-
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
2304-
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
2305-
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
2306-
2307-
// write stream
2308-
parquet_sink
2309-
.write_all(
2310-
Box::pin(RecordBatchStreamAdapter::new(
2311-
schema,
2312-
futures::stream::iter(vec![Ok(batch)]),
2313-
)),
2314-
&build_ctx(object_store_url.as_ref()),
2315-
)
2316-
.await
2317-
.unwrap();
2277+
let parquet_sink = create_written_parquet_sink("file:///").await?;
23182278

23192279
// assert written
23202280
let mut written = parquet_sink.written();
@@ -2366,6 +2326,140 @@ mod tests {
23662326
Ok(())
23672327
}
23682328

2329+
#[tokio::test]
2330+
async fn parquet_sink_write_with_extension() -> Result<()> {
2331+
let filename = "test_file.custom_ext";
2332+
let file_path = format!("file:///path/to/{}", filename);
2333+
let parquet_sink = create_written_parquet_sink(file_path.as_str()).await?;
2334+
2335+
// assert written
2336+
let mut written = parquet_sink.written();
2337+
let written = written.drain();
2338+
assert_eq!(
2339+
written.len(),
2340+
1,
2341+
"expected a single parquet file to be written, instead found {}",
2342+
written.len()
2343+
);
2344+
2345+
let (path, ..) = written.take(1).next().unwrap();
2346+
2347+
let path_parts = path.parts().collect::<Vec<_>>();
2348+
assert_eq!(
2349+
path_parts.len(),
2350+
3,
2351+
"Expected 3 path parts, instead found {}",
2352+
path_parts.len()
2353+
);
2354+
assert_eq!(path_parts.last().unwrap().as_ref(), filename);
2355+
2356+
Ok(())
2357+
}
2358+
2359+
#[tokio::test]
2360+
async fn parquet_sink_write_with_directory_name() -> Result<()> {
2361+
let file_path = "file:///path/to";
2362+
let parquet_sink = create_written_parquet_sink(file_path).await?;
2363+
2364+
// assert written
2365+
let mut written = parquet_sink.written();
2366+
let written = written.drain();
2367+
assert_eq!(
2368+
written.len(),
2369+
1,
2370+
"expected a single parquet file to be written, instead found {}",
2371+
written.len()
2372+
);
2373+
2374+
let (path, ..) = written.take(1).next().unwrap();
2375+
2376+
let path_parts = path.parts().collect::<Vec<_>>();
2377+
assert_eq!(
2378+
path_parts.len(),
2379+
3,
2380+
"Expected 3 path parts, instead found {}",
2381+
path_parts.len()
2382+
);
2383+
assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
2384+
2385+
Ok(())
2386+
}
2387+
2388+
#[tokio::test]
2389+
async fn parquet_sink_write_with_folder_ending() -> Result<()> {
2390+
let file_path = "file:///path/to/";
2391+
let parquet_sink = create_written_parquet_sink(file_path).await?;
2392+
2393+
// assert written
2394+
let mut written = parquet_sink.written();
2395+
let written = written.drain();
2396+
assert_eq!(
2397+
written.len(),
2398+
1,
2399+
"expected a single parquet file to be written, instead found {}",
2400+
written.len()
2401+
);
2402+
2403+
let (path, ..) = written.take(1).next().unwrap();
2404+
2405+
let path_parts = path.parts().collect::<Vec<_>>();
2406+
assert_eq!(
2407+
path_parts.len(),
2408+
3,
2409+
"Expected 3 path parts, instead found {}",
2410+
path_parts.len()
2411+
);
2412+
assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
2413+
2414+
Ok(())
2415+
}
2416+
2417+
async fn create_written_parquet_sink(table_path: &str) -> Result<Arc<ParquetSink>> {
2418+
let field_a = Field::new("a", DataType::Utf8, false);
2419+
let field_b = Field::new("b", DataType::Utf8, false);
2420+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
2421+
let object_store_url = ObjectStoreUrl::local_filesystem();
2422+
2423+
let file_sink_config = FileSinkConfig {
2424+
object_store_url: object_store_url.clone(),
2425+
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
2426+
table_paths: vec![ListingTableUrl::parse(table_path)?],
2427+
output_schema: schema.clone(),
2428+
table_partition_cols: vec![],
2429+
insert_op: InsertOp::Overwrite,
2430+
keep_partition_by_columns: false,
2431+
};
2432+
let parquet_sink = Arc::new(ParquetSink::new(
2433+
file_sink_config,
2434+
TableParquetOptions {
2435+
key_value_metadata: std::collections::HashMap::from([
2436+
("my-data".to_string(), Some("stuff".to_string())),
2437+
("my-data-bool-key".to_string(), None),
2438+
]),
2439+
..Default::default()
2440+
},
2441+
));
2442+
2443+
// create data
2444+
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
2445+
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
2446+
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
2447+
2448+
// write stream
2449+
parquet_sink
2450+
.write_all(
2451+
Box::pin(RecordBatchStreamAdapter::new(
2452+
schema,
2453+
futures::stream::iter(vec![Ok(batch)]),
2454+
)),
2455+
&build_ctx(object_store_url.as_ref()),
2456+
)
2457+
.await
2458+
.unwrap();
2459+
2460+
Ok(parquet_sink)
2461+
}
2462+
23692463
#[tokio::test]
23702464
async fn parquet_sink_write_partitions() -> Result<()> {
23712465
let field_a = Field::new("a", DataType::Utf8, false);

datafusion/core/src/datasource/file_format/write/demux.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
5959
/// which should be contained within the same output file. The outer channel
6060
/// is used to send a dynamic number of inner channels, representing a dynamic
6161
/// number of total output files. The caller is also responsible to monitor
62-
/// the demux task for errors and abort accordingly. The single_file_output parameter
63-
/// overrides all other settings to force only a single file to be written.
62+
/// the demux task for errors and abort accordingly. A path with an extension will
63+
/// force only a single file to be written with the extension from the path. Otherwise
64+
/// the default extension will be used and the output will be split into multiple files.
6465
/// partition_by parameter will additionally split the input based on the unique
6566
/// values of a specific column `<https://github.com/apache/datafusion/issues/7744>``
6667
/// ┌───────────┐ ┌────────────┐ ┌─────────────┐
@@ -79,12 +80,13 @@ pub(crate) fn start_demuxer_task(
7980
context: &Arc<TaskContext>,
8081
partition_by: Option<Vec<(String, DataType)>>,
8182
base_output_path: ListingTableUrl,
82-
file_extension: String,
83+
default_extension: String,
8384
keep_partition_by_columns: bool,
8485
) -> (SpawnedTask<Result<()>>, DemuxedStreamReceiver) {
8586
let (tx, rx) = mpsc::unbounded_channel();
8687
let context = context.clone();
87-
let single_file_output = !base_output_path.is_collection();
88+
let single_file_output =
89+
!base_output_path.is_collection() && base_output_path.file_extension().is_some();
8890
let task = match partition_by {
8991
Some(parts) => {
9092
// There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
@@ -96,7 +98,7 @@ pub(crate) fn start_demuxer_task(
9698
context,
9799
parts,
98100
base_output_path,
99-
file_extension,
101+
default_extension,
100102
keep_partition_by_columns,
101103
)
102104
.await
@@ -108,7 +110,7 @@ pub(crate) fn start_demuxer_task(
108110
input,
109111
context,
110112
base_output_path,
111-
file_extension,
113+
default_extension,
112114
single_file_output,
113115
)
114116
.await

0 commit comments

Comments
 (0)