Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
description="Authenticate using the BasicAuth protobuf."),
Scenario(
"middleware",
description="Ensure headers are propagated via middleware."),
description="Ensure headers are propagated via middleware.",
skip="Rust" # TODO(ARROW-10961): tonic upgrade needed
),
]

runner = IntegrationRunner(json_files, flight_scenarios, testers, **kwargs)
Expand Down
86 changes: 48 additions & 38 deletions dev/archery/archery/integration/tester_rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.

import contextlib
import os
import subprocess

from .tester import Tester
from .util import run_cmd, ARROW_ROOT_DEFAULT, log
Expand All @@ -24,8 +26,8 @@
class RustTester(Tester):
PRODUCER = True
CONSUMER = True
# FLIGHT_SERVER = True
# FLIGHT_CLIENT = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True

EXE_PATH = os.path.join(ARROW_ROOT_DEFAULT, 'rust/target/debug')

Expand All @@ -34,11 +36,11 @@ class RustTester(Tester):
STREAM_TO_FILE = os.path.join(EXE_PATH, 'arrow-stream-to-file')
FILE_TO_STREAM = os.path.join(EXE_PATH, 'arrow-file-to-stream')

# FLIGHT_SERVER_CMD = [
# os.path.join(EXE_PATH, 'flight-test-integration-server')]
# FLIGHT_CLIENT_CMD = [
# os.path.join(EXE_PATH, 'flight-test-integration-client'),
# "-host", "localhost"]
FLIGHT_SERVER_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-server')]
FLIGHT_CLIENT_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-client'),
"--host", "localhost"]

name = 'Rust'

Expand Down Expand Up @@ -72,34 +74,42 @@ def file_to_stream(self, file_path, stream_path):
cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
self.run_shell_command(cmd)

# @contextlib.contextmanager
# def flight_server(self):
# cmd = self.FLIGHT_SERVER_CMD + ['-port=0']
# if self.debug:
# log(' '.join(cmd))
# server = subprocess.Popen(cmd,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# try:
# output = server.stdout.readline().decode()
# if not output.startswith("Server listening on localhost:"):
# server.kill()
# out, err = server.communicate()
# raise RuntimeError(
# "Flight-C++ server did not start properly, "
# "stdout:\n{}\n\nstderr:\n{}\n"
# .format(output + out.decode(), err.decode()))
# port = int(output.split(":")[1])
# yield port
# finally:
# server.kill()
# server.wait(5)

# def flight_request(self, port, json_path):
# cmd = self.FLIGHT_CLIENT_CMD + [
# '-port=' + str(port),
# '-path=' + json_path,
# ]
# if self.debug:
# log(' '.join(cmd))
# run_cmd(cmd)
@contextlib.contextmanager
def flight_server(self, scenario_name=None):
cmd = self.FLIGHT_SERVER_CMD + ['--port=0']
if scenario_name:
cmd = cmd + ["--scenario", scenario_name]
if self.debug:
log(' '.join(cmd))
server = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
output = server.stdout.readline().decode()
if not output.startswith("Server listening on localhost:"):
server.kill()
out, err = server.communicate()
raise RuntimeError(
"Flight-Rust server did not start properly, "
"stdout:\n{}\n\nstderr:\n{}\n"
.format(output + out.decode(), err.decode()))
port = int(output.split(":")[1])
yield port
finally:
server.kill()
server.wait(5)

def flight_request(self, port, json_path=None, scenario_name=None):
cmd = self.FLIGHT_CLIENT_CMD + [
'--port=' + str(port),
]
if json_path:
cmd.extend(('--path', json_path))
elif scenario_name:
cmd.extend(('--scenario', scenario_name))
else:
raise TypeError("Must provide one of json_path or scenario_name")

if self.debug:
log(' '.join(cmd))
run_cmd(cmd)
5 changes: 3 additions & 2 deletions rust/arrow-flight/src/arrow.flight.protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,9 @@ pub mod flight_service_server {
#[async_trait]
pub trait FlightService: Send + Sync + 'static {
#[doc = "Server streaming response type for the Handshake method."]
type HandshakeStream: Stream<Item = Result<super::HandshakeResponse, tonic::Status>>
+ Send
type HandshakeStream: Stream<
Item = Result<super::HandshakeResponse, tonic::Status>,
> + Send
+ Sync
+ 'static;
#[doc = ""]
Expand Down
94 changes: 56 additions & 38 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,48 @@ use std::convert::TryFrom;

use crate::{FlightData, SchemaResult};

use arrow::array::ArrayRef;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions};
use arrow::ipc::{convert, reader, writer, writer::EncodedData, writer::IpcWriteOptions};
use arrow::record_batch::RecordBatch;

/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries
/// and values
/// and a `FlightData` representing the bytes of the batch's values
pub fn flight_data_from_arrow_batch(
batch: &RecordBatch,
options: &IpcWriteOptions,
) -> Vec<FlightData> {
) -> (Vec<FlightData>, FlightData) {
let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new(false);

let (encoded_dictionaries, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, &options)
.expect("DictionaryTracker configured above to not error on replacement");

encoded_dictionaries
.into_iter()
.chain(std::iter::once(encoded_batch))
.map(|data| FlightData {
flight_descriptor: None,
app_metadata: vec![],
let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
let flight_batch = encoded_batch.into();

(flight_dictionaries, flight_batch)
}

impl From<EncodedData> for FlightData {
fn from(data: EncodedData) -> Self {
FlightData {
data_header: data.ipc_message,
data_body: data.arrow_data,
})
.collect()
..Default::default()
}
}
}

/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
pub fn flight_schema_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> SchemaResult {
let data_gen = writer::IpcDataGenerator::default();
let schema_bytes = data_gen.schema_to_bytes(schema, &options);

SchemaResult {
schema: schema_bytes.ipc_message,
schema: flight_schema_as_flatbuffer(schema, options),
}
}

Expand All @@ -69,16 +71,41 @@ pub fn flight_data_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> FlightData {
let data_gen = writer::IpcDataGenerator::default();
let schema = data_gen.schema_to_bytes(schema, &options);
let data_header = flight_schema_as_flatbuffer(schema, options);
FlightData {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema.ipc_message,
data_body: vec![],
data_header,
..Default::default()
}
}

/// Convert a `Schema` to bytes in the format expected in `FlightInfo.schema`
pub fn ipc_message_from_arrow_schema(
arrow_schema: &Schema,
options: &IpcWriteOptions,
) -> Result<Vec<u8>> {
let encoded_data = flight_schema_as_encoded_data(arrow_schema, options);

let mut schema = vec![];
arrow::ipc::writer::write_message(&mut schema, encoded_data, options)?;
Ok(schema)
}

fn flight_schema_as_flatbuffer(
arrow_schema: &Schema,
options: &IpcWriteOptions,
) -> Vec<u8> {
let encoded_data = flight_schema_as_encoded_data(arrow_schema, options);
encoded_data.ipc_message
}

fn flight_schema_as_encoded_data(
arrow_schema: &Schema,
options: &IpcWriteOptions,
) -> EncodedData {
let data_gen = writer::IpcDataGenerator::default();
data_gen.schema_to_bytes(arrow_schema, options)
}

/// Try convert `FlightData` into an Arrow Schema
///
/// Returns an error if the `FlightData` header is not a valid IPC schema
Expand Down Expand Up @@ -113,21 +140,12 @@ impl TryFrom<&SchemaResult> for Schema {
pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
) -> Option<Result<RecordBatch>> {
dictionaries_by_field: &[Option<ArrayRef>],
) -> Result<RecordBatch> {
// check that the data_header is a record batch message
let res = arrow::ipc::root_as_message(&data.data_header[..]);

// Catch error.
if let Err(err) = res {
return Some(Err(ArrowError::ParseError(format!(
"Unable to get root as message: {:?}",
err
))));
}

let message = res.unwrap();

let dictionaries_by_field = Vec::new();
let message = arrow::ipc::root_as_message(&data.data_header[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {:?}", err))
})?;

message
.header_as_record_batch()
Expand All @@ -137,14 +155,14 @@ pub fn flight_data_to_arrow_batch(
)
})
.map_or_else(
|err| Some(Err(err)),
|err| Err(err),
|batch| {
Some(reader::read_record_batch(
reader::read_record_batch(
&data.data_body,
batch,
schema,
&dictionaries_by_field,
))
)
},
)
}
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub fn read_record_batch(

/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries_by_field` with the resulting dictionary
fn read_dictionary(
pub fn read_dictionary(
buf: &[u8],
batch: ipc::DictionaryBatch,
schema: &Schema,
Expand Down
9 changes: 4 additions & 5 deletions rust/arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,9 @@ pub struct EncodedData {
/// Arrow buffers to be written, should be an empty vec for schema messages
pub arrow_data: Vec<u8>,
}

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
fn write_message<W: Write>(
mut writer: &mut BufWriter<W>,
pub fn write_message<W: Write>(
mut writer: W,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize)> {
Expand Down Expand Up @@ -602,7 +601,7 @@ fn write_message<W: Write>(
Ok((aligned_size, body_len))
}

fn write_body_buffers<W: Write>(writer: &mut BufWriter<W>, data: &[u8]) -> Result<usize> {
fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len() as u32;
let pad_len = pad_to_8(len) as u32;
let total_len = len + pad_len;
Expand All @@ -620,7 +619,7 @@ fn write_body_buffers<W: Write>(writer: &mut BufWriter<W>, data: &[u8]) -> Resul
/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_continuation<W: Write>(
writer: &mut BufWriter<W>,
mut writer: W,
write_options: &IpcWriteOptions,
total_len: i32,
) -> Result<usize> {
Expand Down
9 changes: 6 additions & 3 deletions rust/datafusion/examples/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// all the remaining stream messages should be dictionary and record batches
let mut results = vec![];
let dictionaries_by_field = vec![None; schema.fields().len()];
while let Some(flight_data) = stream.message().await? {
// the unwrap is infallible and thus safe
let record_batch =
flight_data_to_arrow_batch(&flight_data, schema.clone()).unwrap()?;
let record_batch = flight_data_to_arrow_batch(
&flight_data,
schema.clone(),
&dictionaries_by_field,
)?;
results.push(record_batch);
}

Expand Down
7 changes: 5 additions & 2 deletions rust/datafusion/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,14 @@ impl FlightService for FlightServiceImpl {
let mut batches: Vec<Result<FlightData, Status>> = results
.iter()
.flat_map(|batch| {
let flight_data =
let (flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(
batch, &options,
);
flight_data.into_iter().map(Ok)
flight_dictionaries
.into_iter()
.chain(std::iter::once(flight_batch))
.map(Ok)
})
.collect();

Expand Down
20 changes: 7 additions & 13 deletions rust/integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,14 @@ edition = "2018"

[dependencies]
arrow = { path = "../arrow" }
arrow-flight = { path = "../arrow-flight" }
async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
hex = "0.4"
prost = "0.6"
serde = { version = "1.0", features = ["rc"] }
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
hex = "0.4"

[[bin]]
name = "arrow-file-to-stream"
path = "src/bin/arrow-file-to-stream.rs"

[[bin]]
name = "arrow-stream-to-file"
path = "src/bin/arrow-stream-to-file.rs"

[[bin]]
name = "arrow-json-integration-test"
path = "src/bin/arrow-json-integration-test.rs"
tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] }
tonic = "0.3"
Loading