Skip to content

Commit

Permalink
Make cargo test compile (apache#7)
Browse files Browse the repository at this point in the history
* WIP: on making cargo test compile

* make cargo test compile

* fix
  • Loading branch information
yjshen authored Sep 25, 2021
1 parent 7a5294b commit 4030615
Show file tree
Hide file tree
Showing 22 changed files with 159 additions and 134 deletions.
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();

let filename = &format!("{}/alltypes_plain.parquet", testdata);

Expand Down
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// register csv file with the execution context
ctx.register_csv(
Expand Down
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ simd = ["datafusion/simd"]
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = { package = "arrow2", version="0.5", features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "io_print", "ahash", "merge_sort", "compute", "regex"] }
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu
let physical_plan = ctx.create_physical_plan(&plan)?;
let result = collect(physical_plan).await?;
if debug {
print::print(&result)?;
print::print(&result);
}
Ok(())
}
Expand Down
37 changes: 15 additions & 22 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,18 @@ use std::{
time::Instant,
};

use futures::StreamExt;

//use ballista::context::BallistaContext;
use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::io::parquet::write::{CompressionCodec, WriteOptions};
use datafusion::arrow::io::print;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty;
use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{CsvFile, MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;

use arrow::io::parquet::write::{Compression, Version, WriteOptions};
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -315,7 +307,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
millis.push(elapsed as f64);
println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
if opt.debug {
pretty::print_batches(&batches)?;
print::print(&batches);
}
}
Expand Down Expand Up @@ -369,7 +361,7 @@ async fn execute_query(
.indent()
.to_string()
);
print::print(&result)?;
print::print(&result);
}
Ok(result)
}
Expand Down Expand Up @@ -413,13 +405,13 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
"csv" => ctx.write_csv(csv, output_path).await?,
"parquet" => {
let compression = match opt.compression.as_str() {
"none" => CompressionCodec::Uncompressed,
"snappy" => CompressionCodec::Snappy,
"brotli" => CompressionCodec::Brotli,
"gzip" => CompressionCodec::Gzip,
"lz4" => CompressionCodec::Lz4,
"lz0" => CompressionCodec::Lzo,
"zstd" => CompressionCodec::Zstd,
"none" => Compression::Uncompressed,
"snappy" => Compression::Snappy,
"brotli" => Compression::Brotli,
"gzip" => Compression::Gzip,
"lz4" => Compression::Lz4,
"lz0" => Compression::Lzo,
"zstd" => Compression::Zstd,
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {}",
Expand All @@ -431,8 +423,9 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
let options = WriteOptions {
compression,
write_statistics: false,
version: Version::V1,
};
ctx.write_parquet(csv, options, output_path).await?
ctx.write_parquet(csv, output_path, options).await?
}
other => {
return Err(DataFusionError::NotImplemented(format!(
Expand Down Expand Up @@ -590,8 +583,8 @@ mod tests {
use std::env;
use std::sync::Arc;

use arrow::array::get_display;
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
use datafusion::logical_plan::Expr;
use datafusion::logical_plan::Expr::Cast;

Expand Down Expand Up @@ -786,7 +779,7 @@ mod tests {
return format!("[{}]", r.join(","));
}

array_value_to_string(column, row_index).unwrap()
get_display(column)(row_index)
}

/// Converts the results into a 2d array of strings, `result[row][column]`
Expand All @@ -798,7 +791,7 @@ mod tests {
let row_vec = batch
.columns()
.iter()
.map(|column| col_str(column, row_index))
.map(|column| col_str(column.as_ref(), row_index))
.collect();
result.push(row_vec);
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fn print_batches_to_json<J: JsonFormat>(batches: &[RecordBatch]) -> Result<Strin
{
let mut writer = Writer::<_, J>::new(&mut bytes);
writer.write_batches(batches)?;
writer.finish()?;
}
let formatted = String::from_utf8(bytes)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Expand All @@ -91,7 +92,12 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result<Stri
.has_headers(true)
.delimiter(delimiter)
.from_writer(&mut bytes);
let mut is_first = true;
for batch in batches {
if is_first {
write::write_header(&mut writer, batches[0].schema())?;
is_first = false;
}
write::write_batch(&mut writer, batch, &write::SerializeOptions::default())?;
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "0.1" }
arrow = { package = "arrow2", version="0.5", features = ["io_ipc"] }
datafusion = { path = "../datafusion" }
prost = "0.8"
tonic = "0.5"
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/avro_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow::util::pretty;
use datafusion::arrow::io::print;

use datafusion::error::Result;
use datafusion::physical_plan::avro::AvroReadOptions;
Expand Down Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<()> {
let results = df.collect().await?;

// print the results
pretty::print_batches(&results)?;
print::print(&results);

Ok(())
}
2 changes: 1 addition & 1 deletion datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
// create local execution context
let mut ctx = ExecutionContext::new();

let testdata = datafusion::test::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();

let filename = &format!("{}/alltypes_plain.parquet", testdata);

Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/dataframe_in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::array::{Int32Array, Utf8Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;

Expand All @@ -38,8 +38,8 @@ async fn main() -> Result<()> {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(Utf8Array::<i32>::from_slice(&["a", "b", "c", "d"])),
Arc::new(Int32Array::from_values(vec![1, 10, 10, 100])),
],
)?;

Expand Down
7 changes: 4 additions & 3 deletions datafusion-examples/examples/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use arrow_flight::flight_descriptor;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightDescriptor, Ticket};
use datafusion::arrow::util::pretty;
use datafusion::arrow::io::print;

/// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for
/// Parquet files and executing SQL queries against them on a remote server.
/// This example is run along-side the example `flight_server`.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let testdata = datafusion::crate::test::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();

// Create Flight client
let mut client = FlightServiceClient::connect("http://localhost:50051").await?;
Expand Down Expand Up @@ -67,13 +67,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let record_batch = flight_data_to_arrow_batch(
&flight_data,
schema.clone(),
true,
&dictionaries_by_field,
)?;
results.push(record_batch);
}

// print the results
pretty::print_batches(&results)?;
print::print(&results);

Ok(())
}
6 changes: 4 additions & 2 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;

use arrow::io::ipc::write::IpcWriteOptions;
use arrow_flight::utils::flight_data_from_arrow_schema;
use arrow_flight::{
flight_service_server::FlightService, flight_service_server::FlightServiceServer,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
Expand Down Expand Up @@ -106,9 +108,9 @@ impl FlightService for FlightServiceImpl {
}

// add an initial FlightData message that sends schema
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let options = IpcWriteOptions::default();
let schema_flight_data =
SchemaAsIpc::new(&df.schema().clone().into(), &options).into();
flight_data_from_arrow_schema(&df.schema().clone().into(), &options);

let mut flights: Vec<Result<FlightData, Status>> =
vec![Ok(schema_flight_data)];
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/simple_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ fn create_context() -> Result<ExecutionContext> {
// define data in two partitions
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
vec![Arc::new(Float32Array::from_values(vec![2.0, 4.0, 8.0]))],
)?;
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Float32Array::from(vec![64.0]))],
vec![Arc::new(Float32Array::from_values(vec![64.0]))],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
Expand Down
7 changes: 4 additions & 3 deletions datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion::arrow::{
use datafusion::prelude::*;
use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use std::sync::Arc;
use arrow::array::Array;

// create local execution context with an in-memory table
fn create_context() -> Result<ExecutionContext> {
Expand All @@ -39,8 +40,8 @@ fn create_context() -> Result<ExecutionContext> {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Arc::new(Float32Array::from_values(vec![2.1, 3.1, 4.1, 5.1])),
Arc::new(Float64Array::from_values(vec![1.0, 2.0, 3.0, 4.0])),
],
)?;

Expand Down Expand Up @@ -88,7 +89,7 @@ async fn main() -> Result<()> {
match (base, exponent) {
// in arrow, any value can be null.
// Here we decide to make our UDF to return null when either base or exponent is null.
(Some(base), Some(exponent)) => Some(base.powf(exponent)),
(Some(base), Some(exponent)) => Some(base.powf(*exponent)),
_ => None,
}
})
Expand Down
4 changes: 2 additions & 2 deletions datafusion/benches/data_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ fn create_record_batch(
vec![
Arc::new(Utf8Array::<i32>::from_slice(keys)),
Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
Arc::new(Float64Array::from(values)),
Arc::new(UInt64Array::from(integer_values_wide)),
Arc::new(Float64Array::from_slice(values)),
Arc::new(UInt64Array::from_slice(integer_values_wide)),
Arc::new(UInt64Array::from_slice(integer_values_narrow)),
],
)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
//! let results: Vec<RecordBatch> = df.collect().await?;
//!
//! // format the results
//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
//! let pretty_results = datafusion::arrow::io::print::write(&results);
//!
//! let expected = vec![
//! "+---+--------------------------+",
Expand Down Expand Up @@ -92,7 +92,7 @@
//! let results: Vec<RecordBatch> = df.collect().await?;
//!
//! // format the results
//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
//! let pretty_results = datafusion::arrow::io::print::write(&results);
//!
//! let expected = vec![
//! "+---+----------------+",
Expand Down
18 changes: 9 additions & 9 deletions datafusion/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
// under the License.

use arrow::array::{Int32Array, PrimitiveArray, UInt64Array};
use arrow::compute::kernels::aggregate;
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

Expand All @@ -44,6 +43,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::compute::aggregate;
use async_trait::async_trait;

//// Custom source dataframe tests ////
Expand Down Expand Up @@ -160,18 +160,18 @@ impl ExecutionPlan for CustomExecutionPlan {
.iter()
.map(|i| ColumnStatistics {
null_count: Some(batch.column(*i).null_count()),
min_value: Some(ScalarValue::Int32(aggregate::min(
min_value: Some(ScalarValue::Int32(aggregate::min_primitive(
batch
.column(*i)
.as_any()
.downcast_ref::<PrimitiveArray<Int32Type>>()
.downcast_ref::<PrimitiveArray<i32>>()
.unwrap(),
))),
max_value: Some(ScalarValue::Int32(aggregate::max(
max_value: Some(ScalarValue::Int32(aggregate::max_primitive(
batch
.column(*i)
.as_any()
.downcast_ref::<PrimitiveArray<Int32Type>>()
.downcast_ref::<PrimitiveArray<i32>>()
.unwrap(),
))),
..Default::default()
Expand Down Expand Up @@ -276,9 +276,9 @@ async fn optimizers_catch_all_statistics() {
Field::new("MAX(test.c1)", DataType::Int32, false),
])),
vec![
Arc::new(UInt64Array::from(vec![4])),
Arc::new(Int32Array::from(vec![1])),
Arc::new(Int32Array::from(vec![100])),
Arc::new(UInt64Array::from_values(vec![4])),
Arc::new(Int32Array::from_values(vec![1])),
Arc::new(Int32Array::from_values(vec![100])),
],
)
.unwrap();
Expand Down
Loading

0 comments on commit 4030615

Please sign in to comment.