From 0b85d0e204d25619025272b5969af69525376d62 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Nov 2021 06:59:04 -0500 Subject: [PATCH 1/3] Test upgrade to arrow 6.x with new prost/tonic --- Cargo.toml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index c722851e72dec..38cdd3ce8f6df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,11 @@ exclude = ["python"] [profile.release] lto = true codegen-units = 1 + + +# experimentally try arrow 6.x with upgraded prost/tonic +# from https://github.com/apache/arrow-rs/pull/945 +[patch.crates-io] +arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } +arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } +parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } From f3a6769b38243b359f56d199b2380a0a7141bd9d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Nov 2021 07:10:45 -0500 Subject: [PATCH 2/3] Update tonic/prost deps --- ballista-examples/Cargo.toml | 4 ++-- ballista/rust/core/Cargo.toml | 6 +++--- ballista/rust/executor/Cargo.toml | 2 +- ballista/rust/scheduler/Cargo.toml | 6 +++--- datafusion-examples/Cargo.toml | 4 ++-- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index e6c15e0178eae..b94868a76ad86 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -32,8 +32,8 @@ rust-version = "1.56" arrow-flight = { version = "6.1.0" } datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client" } -prost = "0.8" -tonic = "0.5" +prost = "0.9" +tonic = "0.6" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } futures = "0.3" num_cpus = "1.13.0" diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index f90d03b07a2ab..5cb5a452bf2fd 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -35,11 +35,11 @@ async-trait = "0.1.36" futures = "0.3" hashbrown = "0.11" log = "0.4" -prost = "0.8" +prost = "0.9" serde = {version = "1", features = ["derive"]} sqlparser = "0.12.0" tokio = "1.0" -tonic = "0.5" +tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } chrono = "0.4" @@ -51,4 +51,4 @@ datafusion = { path = "../../../datafusion", version = "5.1.0" } tempfile = "3" [build-dependencies] -tonic-build = { version = "0.5" } +tonic-build = { version = "0.6" } diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 5c01f1c2d1470..68b0542030ec4 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true} tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = "0.5" +tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index ac0d98738f804..0d1c6adc8c333 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -44,13 +44,13 @@ http-body = "0.4" hyper = "0.14.4" log = "0.4" parse_arg = "0.1.3" -prost = "0.8" +prost = "0.9" rand = "0.8" serde = {version = "1", features = ["derive"]} sled_package = { package = "sled", version = "0.34", optional = true } tokio = { version = "1.0", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"], optional = true } -tonic = "0.5" +tonic = "0.6" tower = { version = "0.4" } warp = "0.3" @@ -60,7 +60,7 @@ uuid = { version = "0.8", features = ["v4"] } [build-dependencies] configure_me_codegen = "0.4.1" -tonic-build = { version = "0.5" } +tonic-build = { version = "0.6" } [package.metadata.configure_me.bin] scheduler = "scheduler_config_spec.toml" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 86c0830d91746..ce7c9d006fe5e 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -36,8 +36,8 @@ required-features = ["datafusion/avro"] [dev-dependencies] arrow-flight = { version = "6.1.0" } datafusion = { path = "../datafusion" } -prost = "0.8" -tonic = "0.5" +prost = "0.9" +tonic = "0.6" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } futures = "0.3" num_cpus = "1.13.0" From f2cfa381ebbd8711885c93c1724b72a6b2ff3f23 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Nov 2021 07:18:49 -0500 Subject: [PATCH 3/3] Fix flight compile error --- ballista/rust/core/src/client.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index 26c8d22b405d5..4b82c34644f90 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -17,7 +17,7 @@ //! Client API for sending requests to executors. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::{collections::HashMap, pin::Pin}; use std::{ convert::{TryFrom, TryInto}, @@ -135,13 +135,16 @@ impl BallistaClient { } struct FlightDataStream { - stream: Streaming, + stream: Mutex>, schema: SchemaRef, } impl FlightDataStream { pub fn new(stream: Streaming, schema: SchemaRef) -> Self { - Self { stream, schema } + Self { + stream: Mutex::new(stream), + schema, + } } } @@ -149,10 +152,11 @@ impl Stream for FlightDataStream { type Item = ArrowResult; fn poll_next( - mut self: std::pin::Pin<&mut Self>, + self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.stream.poll_next_unpin(cx).map(|x| match x { + let mut stream = self.stream.lock().expect("mutex is bad"); + stream.poll_next_unpin(cx).map(|x| match x { Some(flight_data_chunk_result) => { let converted_chunk = flight_data_chunk_result .map_err(|e| ArrowError::from_external_error(Box::new(e)))