-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: ipc decode panic with invalid data #8931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Do you have an example test that shows this failing? Is there a way to exercise this unwrap with the public API? |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow-ipc/src/reader.rs
Outdated
| fn next_buffer(&mut self) -> Result<Buffer, ArrowError> { | ||
| read_buffer(self.buffers.next().unwrap(), self.data, self.compression) | ||
| read_buffer( | ||
| self.buffers.next().ok_or_else(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @tustvold we should have a test if possible
Also, can we make the error more specific? It seems like the error is that there are fewer buffers than expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will happen if metadata claims more buffers than it supplied.
For example, in our case, a user constructed a primitive integer array without validity mask(for some unknown reason). Therefore only one buffer is encoded while RecordBatchDecoder requires two at here, which will panic during the second invocation of next_buffer.
arrow-rs/arrow-ipc/src/reader.rs
Line 240 in 1c90efe
| let buffers = [self.next_buffer()?, self.next_buffer()?]; |
It's not convenient to construct such kind of corrupted input for testing, do you have any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leiysky You can make use of #8931 (comment).
But it is a manual constructed test vector so I'd expect use it as a snapshot. If the internal code changes, the snapshot may be outdated and need update then.
I can't find other way to "reliably construct a dedicated invalid data".
|
Here is a test that can verify the issue: diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 987188548..078b96297 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -569,7 +569,10 @@ impl<'a> RecordBatchDecoder<'a> {
}
fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
- read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
+ let next_buffer = self.buffers.next().ok_or_else(|| {
+ ArrowError::IpcError("Fewer buffers than expected in IPC RecordBatch".to_string())
+ })?;
+ read_buffer(next_buffer, self.data, self.compression)
}
fn skip_buffer(&mut self) {
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 4a849c116..1224a2c90 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -539,6 +539,7 @@ impl IpcDataGenerator {
arrow_data.extend_from_slice(&PADDING[..pad_len]);
// write data
+ buffers.pop();
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
diff --git a/arrow-ipc/tests/test_invalid_data.rs b/arrow-ipc/tests/test_invalid_data.rs
index e69de29bb..cceb4b278 100644
--- a/arrow-ipc/tests/test_invalid_data.rs
+++ b/arrow-ipc/tests/test_invalid_data.rs
@@ -0,0 +1,21 @@
+use arrow_array::record_batch;
+use arrow_ipc::reader::StreamReader;
+use arrow_ipc::writer::StreamWriter;
+
+#[test]
+fn test_invalid_data() {
+ let mut stream = vec![];
+
+ let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+ let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+ dbg!(&stream);
+
+ let stream = stream.as_slice();
+ let reader = StreamReader::try_new(stream, None).unwrap();
+
+ for batch in reader {
+ batch.unwrap();
+ }The test vector is: And thus the final test: use arrow_ipc::reader::StreamReader;
#[test]
fn test_invalid_data() {
let stream = [
255, 255, 255, 255, 120, 0, 0, 0, 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 10, 0, 9, 0, 4, 0, 10,
0, 0, 0, 16, 0, 0, 0, 0, 1, 4, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0,
0, 20, 0, 0, 0, 16, 0, 20, 0, 16, 0, 14, 0, 15, 0, 4, 0, 0, 0, 8, 0, 16, 0, 0, 0, 24, 0, 0,
0, 32, 0, 0, 0, 0, 0, 1, 2, 28, 0, 0, 0, 8, 0, 12, 0, 4, 0, 11, 0, 8, 0, 0, 0, 32, 0, 0, 0,
0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 97, 0, 0, 0, 255, 255, 255, 255, 120, 0, 0, 0, 16, 0,
0, 0, 12, 0, 26, 0, 24, 0, 23, 0, 4, 0, 8, 0, 12, 0, 0, 0, 32, 0, 0, 0, 128, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 4, 0, 10, 0, 24, 0, 12, 0, 8, 0, 4, 0, 10, 0, 0, 0, 44, 0, 0,
0, 16, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,
0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0,
]
.as_slice();
let mut reader = StreamReader::try_new(stream, None).unwrap();
let batch = reader.next().unwrap();
let err = batch.unwrap_err().to_string();
assert!(
err.contains("Fewer buffers than expected in IPC RecordBatch"),
"unexpected error message: {err}",
);
} |
|
This is a valid issue we encounter in the real world while user self-constructed arrow ipc data are invalid (due to their software bug). Since Arrow IPC data can come from any source, we can't assume that it's always valid/constructed with this library. |
| #[test] | ||
| fn test_missing_buffer_metadata_error() { | ||
| use crate::r#gen::Message::*; | ||
| use flatbuffers::FlatBufferBuilder; | ||
|
|
||
| let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)])); | ||
|
|
||
| // create RecordBatch buffer metadata with invalid buffer count | ||
| // Int32Array needs 2 buffers (validity + data) but we provide only 1 | ||
| let mut fbb = FlatBufferBuilder::new(); | ||
| let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]); | ||
| let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]); | ||
| let batch_offset = RecordBatch::create( | ||
| &mut fbb, | ||
| &RecordBatchArgs { | ||
| length: 2, | ||
| nodes: Some(nodes), | ||
| buffers: Some(buffers), | ||
| compression: None, | ||
| variadicBufferCounts: None, | ||
| }, | ||
| ); | ||
| fbb.finish_minimal(batch_offset); | ||
| let batch_bytes = fbb.finished_data().to_vec(); | ||
| let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap(); | ||
|
|
||
| let data_buffer = Buffer::from(vec![0u8; 8]); | ||
| let dictionaries: HashMap<i64, ArrayRef> = HashMap::new(); | ||
| let metadata = MetadataVersion::V5; | ||
|
|
||
| let decoder = RecordBatchDecoder::try_new( | ||
| &data_buffer, | ||
| batch, | ||
| schema.clone(), | ||
| &dictionaries, | ||
| &metadata, | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let result = decoder.read_record_batch(); | ||
|
|
||
| match result { | ||
| Err(ArrowError::IpcError(msg)) => { | ||
| assert_eq!(msg, "Buffer count mismatched with metadata"); | ||
| } | ||
| other => panic!("unexpected error: {other:?}"), | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb PTAL
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will panic if input buffer is invalid.
CLOUDFLARE ALERT!!!