Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure IPC stream messages are contiguous #6321

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion arrow-integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
// under the License.

use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::ipc::writer::FileWriter;
use arrow_integration_test::*;
use arrow_integration_testing::{canonicalize_schema, open_json_file};
use clap::Parser;
use std::fs::File;
use std::io::{Seek, SeekFrom};

#[derive(clap::ValueEnum, Debug, Clone)]
#[clap(rename_all = "SCREAMING_SNAKE_CASE")]
Expand Down Expand Up @@ -179,5 +180,23 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
));
}

let mut arrow_file = File::open(arrow_name)?;

arrow_file.seek(SeekFrom::Start(8))?;
let arrow_stream_reader = StreamReader::try_new(arrow_file, None)?;

if canonicalize_schema(&json_file.schema) != canonicalize_schema(&arrow_stream_reader.schema())
{
return Err(ArrowError::ComputeError(format!(
"Schemas do not match. JSON: {:?}. Embedded stream: {:?}",
json_file.schema,
arrow_stream_reader.schema()
)));
}

for batch in arrow_stream_reader {
let _ = batch?;
}

Ok(())
}
126 changes: 85 additions & 41 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::CONTINUATION_MARKER;
/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
#[derive(Debug, Clone)]
pub struct IpcWriteOptions {
/// Write padding after memory buffers to this multiple of bytes.
/// Write padding to ensure that each data buffer is aligned to this multiple of bytes.
/// Must be 8, 16, 32, or 64 - defaults to 64.
alignment: u8,
/// The legacy format is for releases before 0.15.0, and uses metadata V4
Expand Down Expand Up @@ -824,12 +824,12 @@ impl DictionaryTracker {
pub struct FileWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
schema: SchemaRef,
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
Comment on lines -831 to -832
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this was doing so I'm not sure I understand what it means that it is going away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the former mechanism for tracking absolute position in the output file. It was named according to the only thing it was used for: setting Block::offsets. Since it's now used to track how much padding should be written after flatbuffers Messages and to make its expected value more obvious, I renamed it.

/// Dictionary blocks that will be written as part of the IPC footer
dictionary_blocks: Vec<crate::Block>,
/// Record blocks that will be written as part of the IPC footer
Expand Down Expand Up @@ -879,20 +879,31 @@ impl<W: Write> FileWriter<W> {
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
// write magic to header aligned on alignment boundary
let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
let header_size = super::ARROW_MAGIC.len() + pad_len;

let mut written_len = 0;

// write magic and padding
writer.write_all(&super::ARROW_MAGIC)?;
written_len += super::ARROW_MAGIC.len();
let pad_len = pad_to_alignment(8, written_len);
writer.write_all(&PADDING[..pad_len])?;
// write the schema, set the written bytes to the schema + header
written_len += pad_len;

// write the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;

let (meta, _data) =
write_message_at_offset(&mut writer, written_len, encoded_message, &write_options)?;

// written bytes = padded_magic + schema
written_len += meta;

let preserve_dict_id = write_options.preserve_dict_id;
Ok(Self {
writer,
written_len,
write_options,
schema: Arc::new(schema.clone()),
block_offsets: meta + data + header_size,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
Expand Down Expand Up @@ -924,23 +935,32 @@ impl<W: Write> FileWriter<W> {
)?;

for encoded_dictionary in encoded_dictionaries {
let (meta, data) =
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;

let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
let block = crate::Block::new(self.written_len as i64, meta as i32, data as i64);
self.dictionary_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
}

let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
// add a record block for the footer
let block = crate::Block::new(
self.block_offsets as i64,
self.written_len as i64,
meta as i32, // TODO: is this still applicable?
data as i64,
);
self.record_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -953,7 +973,7 @@ impl<W: Write> FileWriter<W> {
}

// write EOS
write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;

let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
Expand Down Expand Up @@ -1041,6 +1061,8 @@ impl<W: Write> RecordBatchWriter for FileWriter<W> {
pub struct StreamWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// Whether the writer footer has been written, and the writer is finished
Expand Down Expand Up @@ -1084,12 +1106,15 @@ impl<W: Write> StreamWriter<W> {
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();

// write the schema, set the written bytes to the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
write_message(&mut writer, encoded_message, &write_options)?;
let (meta, _data) = write_message(&mut writer, encoded_message, &write_options)?;

let preserve_dict_id = write_options.preserve_dict_id;
Ok(Self {
writer,
written_len: meta,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
Expand All @@ -1114,10 +1139,22 @@ impl<W: Write> StreamWriter<W> {
.expect("StreamWriter is configured to not error on dictionary replacement");

for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;
self.written_len += meta + data;
}

write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -1129,7 +1166,7 @@ impl<W: Write> StreamWriter<W> {
));
}

write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;

self.finished = true;

Expand Down Expand Up @@ -1221,49 +1258,56 @@ pub struct EncodedData {
}
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(
writer: W,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
write_message_at_offset(writer, 0, encoded, write_options)
}

fn write_message_at_offset<W: Write>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: this function name confuses me slightly. If I'm writing to a "file-like object' (e.g. Write) then there is no underlying capability to write at an offset. "Skip offset bytes" doesn't make sense in a write context (as opposed to a read context).

I think...instead...you are maybe using the offset to determine how much padding to add at the end?

Maybe write_remaining_message with offset replaced by already_written?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about write_message_positioned_absolutely(writer, position, encoded, options)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, since write_message doesn't seem widely used I could just add the already_written_len argument there rather than adding a new fn

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace how about now?

mut writer: W,
offset: usize,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % usize::from(write_options.alignment) != 0 {
if offset % 8 != 0 {
return Err(ArrowError::MemoryError(
"Arrow data not aligned".to_string(),
"Writing an IPC Message unaligned to 8 bytes".to_string(),
));
}

let a = usize::from(write_options.alignment - 1);
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = if write_options.write_legacy_ipc_format {
let continuation_size = if write_options.write_legacy_ipc_format {
4
} else {
8
};
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
let flatbuf_size = encoded.ipc_message.len();
assert_ne!(flatbuf_size, 0);

let padding_size = pad_to_alignment(
write_options.alignment,
offset + continuation_size + flatbuf_size,
);
let padded_size = continuation_size + flatbuf_size + padding_size;
assert_eq!((offset + padded_size) % write_options.alignment as usize, 0);

// write continuation, flatbuf, and padding
write_continuation(
&mut writer,
write_options,
(aligned_size - prefix_size) as i32,
(padded_size - continuation_size) as i32,
)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&PADDING[..padding_bytes])?;
writer.write_all(&encoded.ipc_message)?;
writer.write_all(&PADDING[..padding_size])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
let body_len = if !encoded.arrow_data.is_empty() {
write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
} else {
0
};

Ok((aligned_size, body_len))
Ok((padded_size, body_len))
}

fn write_body_buffers<W: Write>(
Expand Down
Loading