Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
description="Authenticate using the BasicAuth protobuf."),
Scenario(
"middleware",
description="Ensure headers are propagated via middleware."),
description="Ensure headers are propagated via middleware.",
skip="Rust" # TODO(ARROW-10961): tonic upgrade needed
),
]

runner = IntegrationRunner(json_files, flight_scenarios, testers, **kwargs)
Expand Down
86 changes: 48 additions & 38 deletions dev/archery/archery/integration/tester_rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.

import contextlib
import os
import subprocess

from .tester import Tester
from .util import run_cmd, ARROW_ROOT_DEFAULT, log
Expand All @@ -24,8 +26,8 @@
class RustTester(Tester):
PRODUCER = True
CONSUMER = True
# FLIGHT_SERVER = True
# FLIGHT_CLIENT = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True

EXE_PATH = os.path.join(ARROW_ROOT_DEFAULT, 'rust/target/debug')

Expand All @@ -34,11 +36,11 @@ class RustTester(Tester):
STREAM_TO_FILE = os.path.join(EXE_PATH, 'arrow-stream-to-file')
FILE_TO_STREAM = os.path.join(EXE_PATH, 'arrow-file-to-stream')

# FLIGHT_SERVER_CMD = [
# os.path.join(EXE_PATH, 'flight-test-integration-server')]
# FLIGHT_CLIENT_CMD = [
# os.path.join(EXE_PATH, 'flight-test-integration-client'),
# "-host", "localhost"]
FLIGHT_SERVER_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-server')]
FLIGHT_CLIENT_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-client'),
"--host", "localhost"]

name = 'Rust'

Expand Down Expand Up @@ -72,34 +74,42 @@ def file_to_stream(self, file_path, stream_path):
cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
self.run_shell_command(cmd)

# @contextlib.contextmanager
# def flight_server(self):
# cmd = self.FLIGHT_SERVER_CMD + ['-port=0']
# if self.debug:
# log(' '.join(cmd))
# server = subprocess.Popen(cmd,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# try:
# output = server.stdout.readline().decode()
# if not output.startswith("Server listening on localhost:"):
# server.kill()
# out, err = server.communicate()
# raise RuntimeError(
# "Flight-C++ server did not start properly, "
# "stdout:\n{}\n\nstderr:\n{}\n"
# .format(output + out.decode(), err.decode()))
# port = int(output.split(":")[1])
# yield port
# finally:
# server.kill()
# server.wait(5)

# def flight_request(self, port, json_path):
# cmd = self.FLIGHT_CLIENT_CMD + [
# '-port=' + str(port),
# '-path=' + json_path,
# ]
# if self.debug:
# log(' '.join(cmd))
# run_cmd(cmd)
@contextlib.contextmanager
def flight_server(self, scenario_name=None):
cmd = self.FLIGHT_SERVER_CMD + ['--port=0']
if scenario_name:
cmd = cmd + ["--scenario", scenario_name]
if self.debug:
log(' '.join(cmd))
server = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
output = server.stdout.readline().decode()
if not output.startswith("Server listening on localhost:"):
server.kill()
out, err = server.communicate()
raise RuntimeError(
"Flight-Rust server did not start properly, "
"stdout:\n{}\n\nstderr:\n{}\n"
.format(output + out.decode(), err.decode()))
port = int(output.split(":")[1])
yield port
finally:
server.kill()
server.wait(5)

def flight_request(self, port, json_path=None, scenario_name=None):
cmd = self.FLIGHT_CLIENT_CMD + [
'--port=' + str(port),
]
if json_path:
cmd.extend(('--path', json_path))
elif scenario_name:
cmd.extend(('--scenario', scenario_name))
else:
raise TypeError("Must provide one of json_path or scenario_name")

if self.debug:
log(' '.join(cmd))
run_cmd(cmd)
107 changes: 61 additions & 46 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,48 @@ use std::convert::TryFrom;

use crate::{FlightData, SchemaResult};

use arrow::array::ArrayRef;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions};
use arrow::ipc::{convert, reader, writer, writer::EncodedData, writer::IpcWriteOptions};
use arrow::record_batch::RecordBatch;

/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries
/// and values
/// and a `FlightData` representing the bytes of the batch's values
pub fn flight_data_from_arrow_batch(
batch: &RecordBatch,
options: &IpcWriteOptions,
) -> Vec<FlightData> {
) -> (Vec<FlightData>, FlightData) {
let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new(false);

let (encoded_dictionaries, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, &options)
.expect("DictionaryTracker configured above to not error on replacement");

encoded_dictionaries
.into_iter()
.chain(std::iter::once(encoded_batch))
.map(|data| FlightData {
flight_descriptor: None,
app_metadata: vec![],
let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
let flight_batch = encoded_batch.into();

(flight_dictionaries, flight_batch)
}

impl From<EncodedData> for FlightData {
fn from(data: EncodedData) -> Self {
FlightData {
data_header: data.ipc_message,
data_body: data.arrow_data,
})
.collect()
..Default::default()
}
}
}

/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
pub fn flight_schema_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> SchemaResult {
let data_gen = writer::IpcDataGenerator::default();
let schema_bytes = data_gen.schema_to_bytes(schema, &options);

SchemaResult {
schema: schema_bytes.ipc_message,
schema: flight_schema_as_flatbuffer(schema, options),
}
}

Expand All @@ -69,16 +71,41 @@ pub fn flight_data_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> FlightData {
let data_gen = writer::IpcDataGenerator::default();
let schema = data_gen.schema_to_bytes(schema, &options);
let data_header = flight_schema_as_flatbuffer(schema, options);
FlightData {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema.ipc_message,
data_body: vec![],
data_header,
..Default::default()
}
}

/// Convert a `Schema` to bytes in the format expected in `FlightInfo.schema`
pub fn ipc_message_from_arrow_schema(
arrow_schema: &Schema,
options: &IpcWriteOptions,
) -> Result<Vec<u8>> {
let encoded_data = flight_schema_as_encoded_data(arrow_schema, options);

let mut schema = vec![];
arrow::ipc::writer::write_message(&mut schema, encoded_data, options)?;
Ok(schema)
}

fn flight_schema_as_flatbuffer(
arrow_schema: &Schema,
options: &IpcWriteOptions,
) -> Vec<u8> {
let encoded_data = flight_schema_as_encoded_data(arrow_schema, options);
encoded_data.ipc_message
}

fn flight_schema_as_encoded_data(
arrow_schema: &Schema,
options: &IpcWriteOptions,
) -> EncodedData {
let data_gen = writer::IpcDataGenerator::default();
data_gen.schema_to_bytes(arrow_schema, options)
}

/// Try convert `FlightData` into an Arrow Schema
///
/// Returns an error if the `FlightData` header is not a valid IPC schema
Expand Down Expand Up @@ -113,21 +140,12 @@ impl TryFrom<&SchemaResult> for Schema {
pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
) -> Option<Result<RecordBatch>> {
dictionaries_by_field: &[Option<ArrayRef>],
) -> Result<RecordBatch> {
// check that the data_header is a record batch message
let res = arrow::ipc::root_as_message(&data.data_header[..]);

// Catch error.
if let Err(err) = res {
return Some(Err(ArrowError::ParseError(format!(
"Unable to get root as message: {:?}",
err
))));
}

let message = res.unwrap();

let dictionaries_by_field = Vec::new();
let message = arrow::ipc::root_as_message(&data.data_header[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {:?}", err))
})?;

message
.header_as_record_batch()
Expand All @@ -136,17 +154,14 @@ pub fn flight_data_to_arrow_batch(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.map_or_else(
|err| Some(Err(err)),
|batch| {
Some(reader::read_record_batch(
&data.data_body,
batch,
schema,
&dictionaries_by_field,
))
},
)
.map(|batch| {
reader::read_record_batch(
&data.data_body,
batch,
schema,
&dictionaries_by_field,
)
})?
}

// TODO: add more explicit conversion that exposes flight descriptor and metadata options
2 changes: 1 addition & 1 deletion rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub fn read_record_batch(

/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries_by_field` with the resulting dictionary
fn read_dictionary(
pub fn read_dictionary(
buf: &[u8],
batch: ipc::DictionaryBatch,
schema: &Schema,
Expand Down
9 changes: 4 additions & 5 deletions rust/arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,9 @@ pub struct EncodedData {
/// Arrow buffers to be written, should be an empty vec for schema messages
pub arrow_data: Vec<u8>,
}

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
fn write_message<W: Write>(
mut writer: &mut BufWriter<W>,
pub fn write_message<W: Write>(
mut writer: W,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize)> {
Expand Down Expand Up @@ -602,7 +601,7 @@ fn write_message<W: Write>(
Ok((aligned_size, body_len))
}

fn write_body_buffers<W: Write>(writer: &mut BufWriter<W>, data: &[u8]) -> Result<usize> {
fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len() as u32;
let pad_len = pad_to_8(len) as u32;
let total_len = len + pad_len;
Expand All @@ -620,7 +619,7 @@ fn write_body_buffers<W: Write>(writer: &mut BufWriter<W>, data: &[u8]) -> Resul
/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_continuation<W: Write>(
writer: &mut BufWriter<W>,
mut writer: W,
write_options: &IpcWriteOptions,
total_len: i32,
) -> Result<usize> {
Expand Down
3 changes: 2 additions & 1 deletion rust/benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ clone the repository and build the source code.
git clone [email protected]:databricks/tpch-dbgen.git
cd tpch-dbgen
make
export TPCH_DATA=$(pwd)
```

Data can now be generated with the following command. Note that `-s 1` means use Scale Factor 1 or ~1 GB of
Expand All @@ -63,7 +64,7 @@ This utility does not yet provide support for changing the number of partitions
option is to use the following Docker image to perform the conversion from `tbl` files to CSV or Parquet.

```bash
docker run -it ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT
docker run -it ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT
-h, --help Show help message

Subcommand: convert-tpch
Expand Down
Loading