Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
874 changes: 446 additions & 428 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 6 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo
resolver = "3"

[workspace.package]
# edition to be changed to 2024 when we update
# Minimum Supported Rust Version (MSRV) to 1.85.0
# which is datafusion 49
#
edition = "2024"
# we should try to follow datafusion version
rust-version = "1.88.0"
Expand All @@ -33,11 +29,11 @@ rust-version = "1.88.0"
arrow = { version = "57", features = ["ipc_compression"] }
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
datafusion = "51.0.0"
datafusion-cli = "51.0.0"
datafusion-proto = "51.0.0"
datafusion-proto-common = "51.0.0"
datafusion-substrait = "51.0.0"
datafusion = "52.0.0"
datafusion-cli = "52.0.0"
datafusion-proto = "52.0.0"
datafusion-proto-common = "52.0.0"
datafusion-substrait = "52.0.0"
object_store = "0.12"
prost = "0.14"
prost-types = "0.14"
Expand All @@ -51,7 +47,7 @@ tonic-prost-build = { version = "0.14" }
tracing = "0.1"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ctor = { version = "0.5" }
ctor = { version = "0.6" }
mimalloc = { version = "0.1" }

tokio = { version = "1" }
Expand Down
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[package]
name = "ballista-cli"
description = "Command Line Client for Ballista distributed query engine."
version = "51.0.0"
version = "52.0.0"
authors = ["Apache DataFusion <dev@datafusion.apache.org>"]
edition = { workspace = true }
rust-version = { workspace = true }
Expand All @@ -29,7 +29,7 @@ repository = "https://github.com/apache/datafusion-ballista"
readme = "README.md"

[dependencies]
ballista = { path = "../ballista/client", version = "51.0.0", features = ["standalone"] }
ballista = { path = "../ballista/client", version = "52.0.0", features = ["standalone"] }
clap = { workspace = true, features = ["derive", "cargo"] }
datafusion = { workspace = true }
datafusion-cli = { workspace = true }
Expand Down
12 changes: 6 additions & 6 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "51.0.0"
version = "52.0.0"
homepage = "https://datafusion.apache.org/ballista/"
repository = "https://github.com/apache/datafusion-ballista"
readme = "README.md"
Expand All @@ -29,18 +29,18 @@ rust-version = { workspace = true }

[dependencies]
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "51.0.0" }
ballista-executor = { path = "../executor", version = "51.0.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "51.0.0", optional = true }
ballista-core = { path = "../core", version = "52.0.0" }
ballista-executor = { path = "../executor", version = "52.0.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "52.0.0", optional = true }
datafusion = { workspace = true }
log = { workspace = true }

tokio = { workspace = true }
url = { workspace = true }

[dev-dependencies]
ballista-executor = { path = "../executor", version = "51.0.0" }
ballista-scheduler = { path = "../scheduler", version = "51.0.0" }
ballista-executor = { path = "../executor", version = "52.0.0" }
ballista-scheduler = { path = "../scheduler", version = "52.0.0" }
ctor = { workspace = true }
datafusion-proto = { workspace = true }
env_logger = { workspace = true }
Expand Down
48 changes: 29 additions & 19 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod supported {
use datafusion::prelude::*;
use datafusion::{assert_batches_eq, prelude::SessionContext};
use rstest::*;
use std::path::PathBuf;

#[rstest::fixture]
fn test_data() -> String {
Expand Down Expand Up @@ -714,17 +715,22 @@ mod supported {
];

let write_dir = tempfile::tempdir().expect("temporary directory to be created");
let write_dir_path = write_dir
.path()
.to_str()
.expect("path to be converted to str");
let write_dir_path = PathBuf::from(
write_dir
.path()
.to_str()
.expect("path to be converted to str"),
);

let parquet_file = write_dir_path.join("p_written_table.parquet");
let parquet_file = parquet_file.to_str().expect("cannot create csv file");

ctx.sql("select * from test")
.await?
.write_parquet(write_dir_path, Default::default(), Default::default())
.write_parquet(parquet_file, Default::default(), Default::default())
.await?;

ctx.register_parquet("p_written_table", write_dir_path, Default::default())
ctx.register_parquet("p_written_table", parquet_file, Default::default())
.await?;

let result = ctx
Expand All @@ -735,12 +741,15 @@ mod supported {

assert_batches_eq!(expected, &result);

let csv_file = write_dir_path.join("c_written_table.csv");
let csv_file = csv_file.to_str().expect("cannot create csv file");

ctx.sql("select * from test")
.await?
.write_csv(write_dir_path, Default::default(), Default::default())
.write_csv(csv_file, Default::default(), Default::default())
.await?;

ctx.register_csv("c_written_table", write_dir_path, Default::default())
ctx.register_csv("c_written_table", csv_file, Default::default())
.await?;

let result = ctx
Expand All @@ -751,12 +760,15 @@ mod supported {

assert_batches_eq!(expected, &result);

let json_file = write_dir_path.join("j_written_table.json");
let json_file = json_file.to_str().expect("cannot create csv file");

ctx.sql("select * from test")
.await?
.write_json(write_dir_path, Default::default(), Default::default())
.write_json(json_file, Default::default(), Default::default())
.await?;

ctx.register_json("j_written_table", write_dir_path, Default::default())
ctx.register_json("j_written_table", json_file, Default::default())
.await?;

let result = ctx
Expand Down Expand Up @@ -1048,13 +1060,12 @@ mod supported {
"| | EmptyRelation: rows=1 |",
"| physical_plan | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |",
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |",
"| | CoalesceBatchesExec: target_batch_size=8192 |",
"| | RepartitionExec: partitioning=Hash([id@0], 16), input_partitions=1 |",
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] |",
"| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id] |",
"| | UnnestExec |",
"| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] |",
"| | PlaceholderRowExec |",
"| | RepartitionExec: partitioning=Hash([id@0], 16), input_partitions=1 |",
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] |",
"| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id] |",
"| | UnnestExec |",
"| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] |",
"| | PlaceholderRowExec |",
"| | |",
"| distributed_plan | =========ResolvedStage[stage_id=1.0, partitions=1]========= |",
"| | ShuffleWriterExec: partitioning: Hash([id@0], 16) |",
Expand All @@ -1069,8 +1080,7 @@ mod supported {
"| | ShuffleWriterExec: partitioning: None |",
"| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |",
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |",
"| | CoalesceBatchesExec: target_batch_size=8192 |",
"| | UnresolvedShuffleExec: partitioning: Hash([id@0], 16) |",
"| | UnresolvedShuffleExec: partitioning: Hash([id@0], 16) |",
"| | |",
"| | |",
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
Expand Down
2 changes: 1 addition & 1 deletion ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista-core"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "51.0.0"
version = "52.0.0"
homepage = "https://datafusion.apache.org/ballista/"
repository = "https://github.com/apache/datafusion-ballista"
readme = "README.md"
Expand Down
1 change: 1 addition & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ message OperatorMetric {
uint64 output_bytes = 12;
NamedPruningMetrics pruning_metrics = 13;
NamedRatio ratio = 14;
uint64 output_batches = 15;
}
}

Expand Down
36 changes: 35 additions & 1 deletion ballista/core/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ message CreateExternalTableNode {

message PrepareNode {
string name = 1;
// We serialize both the data types and the fields for compatibility with
// older versions (newer versions populate both).
repeated datafusion_common.ArrowType data_types = 2;
LogicalPlanNode input = 3;
repeated datafusion_common.Field fields = 4;
Expand Down Expand Up @@ -413,6 +415,8 @@ message Wildcard {

message PlaceholderNode {
string id = 1;
// We serialize the data type, metadata, and nullability separately to maintain
// compatibility with older versions
datafusion_common.ArrowType data_type = 2;
optional bool nullable = 3;
map<string, string> metadata = 4;
Expand Down Expand Up @@ -744,6 +748,7 @@ message PhysicalPlanNode {
GenerateSeriesNode generate_series = 33;
SortMergeJoinExecNode sort_merge_join = 34;
MemoryScanExecNode memory_scan = 35;
AsyncFuncExecNode async_func = 36;
}
}

Expand Down Expand Up @@ -867,6 +872,8 @@ message PhysicalExprNode {
PhysicalExtensionExprNode extension = 19;

UnknownColumn unknown_column = 20;

PhysicalHashExprNode hash_expr = 21;
}
}

Expand Down Expand Up @@ -985,6 +992,15 @@ message PhysicalExtensionExprNode {
repeated PhysicalExprNode inputs = 2;
}

message PhysicalHashExprNode {
repeated PhysicalExprNode on_columns = 1;
uint64 seed0 = 2;
uint64 seed1 = 3;
uint64 seed2 = 4;
uint64 seed3 = 5;
string description = 6;
}

message FilterExecNode {
PhysicalPlanNode input = 1;
PhysicalExprNode expr = 2;
Expand All @@ -1005,6 +1021,15 @@ message PhysicalSortExprNodeCollection {
repeated PhysicalSortExprNode physical_sort_expr_nodes = 1;
}

message ProjectionExpr {
string alias = 1;
PhysicalExprNode expr = 2;
}

message ProjectionExprs {
repeated ProjectionExpr projections = 1;
}

message FileScanExecConf {
repeated FileGroup file_groups = 1;
datafusion_common.Schema schema = 2;
Expand All @@ -1020,6 +1045,8 @@ message FileScanExecConf {

datafusion_common.Constraints constraints = 11;
optional uint64 batch_size = 12;

optional ProjectionExprs projection_exprs = 13;
}

message ParquetScanExecNode {
Expand Down Expand Up @@ -1207,6 +1234,7 @@ message AggregateExecNode {
repeated bool groups = 9;
repeated MaybeFilter filter_expr = 10;
AggLimit limit = 11;
bool has_grouping_set = 12;
}

message GlobalLimitExecNode {
Expand Down Expand Up @@ -1377,4 +1405,10 @@ message SortMergeJoinExecNode {
JoinFilter filter = 5;
repeated SortExprNode sort_options = 6;
datafusion_common.NullEquality null_equality = 7;
}
}

message AsyncFuncExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalExprNode async_exprs = 2;
repeated string async_expr_names = 3;
}
18 changes: 12 additions & 6 deletions ballista/core/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,11 @@ message Decimal256{
// Serialized data type
message ArrowType{
oneof arrow_type_enum {
EmptyMessage NONE = 1; // arrow::Type::NA
EmptyMessage BOOL = 2; // arrow::Type::BOOL
EmptyMessage UINT8 = 3; // arrow::Type::UINT8
EmptyMessage INT8 = 4; // arrow::Type::INT8
EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h
EmptyMessage NONE = 1; // arrow::Type::NA
EmptyMessage BOOL = 2; // arrow::Type::BOOL
EmptyMessage UINT8 = 3; // arrow::Type::UINT8
EmptyMessage INT8 = 4; // arrow::Type::INT8
EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h
EmptyMessage INT16 = 6;
EmptyMessage UINT32 = 7;
EmptyMessage INT32 = 8;
Expand Down Expand Up @@ -461,12 +461,14 @@ message CsvOptions {
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
bytes terminator = 17; // Optional terminator character as a byte
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
optional uint32 compression_level = 19; // Optional compression level
}

// Options controlling CSV format
message JsonOptions {
CompressionTypeVariant compression = 1; // Compression type
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
optional uint32 compression_level = 3; // Optional compression level
}

message TableParquetOptions {
Expand Down Expand Up @@ -519,6 +521,7 @@ message ParquetOptions {
bool skip_metadata = 3; // default = true
bool pushdown_filters = 5; // default = false
bool reorder_filters = 6; // default = false
bool force_filter_selections = 34; // default = false
uint64 data_pagesize_limit = 7; // default = 1024 * 1024
uint64 write_batch_size = 8; // default = 1024
string writer_version = 9; // default = "1.0"
Expand Down Expand Up @@ -608,6 +611,8 @@ message Statistics {
Precision num_rows = 1;
Precision total_byte_size = 2;
repeated ColumnStats column_stats = 3;
// total_rows was removed - field 4 is reserved
reserved 4;
}

message ColumnStats {
Expand All @@ -616,4 +621,5 @@ message ColumnStats {
Precision sum_value = 5;
Precision null_count = 3;
Precision distinct_count = 4;
}
Precision byte_size = 6;
}
7 changes: 4 additions & 3 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,11 @@ impl ShuffleWriterExec {
writers.push(None);
}

let mut partitioner = BatchPartitioner::try_new(
Partitioning::Hash(exprs, num_output_partitions),
let mut partitioner = BatchPartitioner::new_hash_partitioner(
exprs,
num_output_partitions,
write_metrics.repart_time.clone(),
)?;
);

while let Some(result) = stream.next().await {
let input_batch = result?;
Expand Down
7 changes: 4 additions & 3 deletions ballista/core/src/execution_plans/sort_shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,11 @@ impl SortShuffleWriterExec {
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

// Create batch partitioner
let mut partitioner = BatchPartitioner::try_new(
Partitioning::Hash(exprs, num_output_partitions),
let mut partitioner = BatchPartitioner::new_hash_partitioner(
exprs,
num_output_partitions,
metrics.repart_time.clone(),
)?;
);

// Process input stream
while let Some(result) = stream.next().await {
Expand Down
Loading