Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 128 additions & 24 deletions arrow-json/src/reader/binary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,70 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::{
BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder, GenericStringBuilder,
};
use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder};
use arrow_array::{Array, GenericStringArray, OffsetSizeTrait};
use arrow_data::ArrayData;
use arrow_schema::ArrowError;
use std::io::Write;
use std::marker::PhantomData;

use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};

/// Decode a hex-encoded string into bytes
fn decode_hex_string(hex_string: &str) -> Result<Vec<u8>, ArrowError> {
let mut decoded = Vec::with_capacity(hex_string.len() / 2);
for substr in hex_string.as_bytes().chunks(2) {
let str = std::str::from_utf8(substr).map_err(|e| {
ArrowError::JsonError(format!("invalid utf8 in hex encoded binary data: {e}"))
})?;
let byte = u8::from_str_radix(str, 16).map_err(|e| {
ArrowError::JsonError(format!("invalid hex encoding in binary data: {e}"))
})?;
decoded.push(byte);
#[inline]
fn decode_hex_digit(byte: u8) -> Option<u8> {
match byte {
b'0'..=b'9' => Some(byte - b'0'),
b'a'..=b'f' => Some(byte - b'a' + 10),
b'A'..=b'F' => Some(byte - b'A' + 10),
_ => None,
}
Ok(decoded)
}

fn invalid_hex_error_at(index: usize, byte: u8) -> ArrowError {
ArrowError::JsonError(format!(
"invalid hex encoding in binary data: invalid digit 0x{byte:02x} at position {index}"
))
}

fn decode_hex_to_writer<W: Write>(hex_string: &str, writer: &mut W) -> Result<(), ArrowError> {
let bytes = hex_string.as_bytes();
let mut iter = bytes.chunks_exact(2);
let mut buffer = [0u8; 64];
let mut buffered = 0;

for (pair_index, pair) in (&mut iter).enumerate() {
let base = pair_index * 2;
let high = decode_hex_digit(pair[0]).ok_or_else(|| invalid_hex_error_at(base, pair[0]))?;
let low =
decode_hex_digit(pair[1]).ok_or_else(|| invalid_hex_error_at(base + 1, pair[1]))?;
buffer[buffered] = (high << 4) | low;
buffered += 1;

if buffered == buffer.len() {
writer
.write_all(&buffer)
.map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
buffered = 0;
}
}

let remainder = iter.remainder();
if !remainder.is_empty() {
let index = (bytes.len() / 2) * 2;
let low = decode_hex_digit(remainder[0])
.ok_or_else(|| invalid_hex_error_at(index, remainder[0]))?;
buffer[buffered] = low;
buffered += 1;
}

if buffered > 0 {
writer
.write_all(&buffer[..buffered])
.map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
}

Ok(())
}

#[derive(Default)]
Expand All @@ -59,14 +99,14 @@ impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {

let mut builder = GenericBinaryBuilder::<O>::with_capacity(pos.len(), data_capacity);

GenericStringBuilder::<O>::with_capacity(pos.len(), data_capacity);

for p in pos {
match tape.get(*p) {
TapeElement::String(idx) => {
let string = tape.get_string(idx);
let decoded = decode_hex_string(string)?;
builder.append_value(&decoded);
// Decode directly into the builder for performance. If decoding fails,
// the error is terminal and the builder is discarded by the caller.
decode_hex_to_writer(string, &mut builder)?;
builder.append_value(b"");
}
TapeElement::Null => builder.append_null(),
_ => unreachable!(),
Expand All @@ -91,13 +131,17 @@ impl FixedSizeBinaryArrayDecoder {
impl ArrayDecoder for FixedSizeBinaryArrayDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len);
// Preallocate for the decoded byte width (FixedSizeBinary len), not the hex string length.
let mut scratch = Vec::with_capacity(self.len as usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why self.len? Isn't that the length of the input? Also the scratch used below is different (no initial capacity)

Copy link
Member Author

@Weijun-H Weijun-H Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Here self.len comes from FixedSizeBinary and represents the decoded byte width, not the input hex string length. The input string length is expected to be 2 * len. For variable-width types we use Vec::new() and reserve per value instead.


for p in pos {
match tape.get(*p) {
TapeElement::String(idx) => {
let string = tape.get_string(idx);
let decoded = decode_hex_string(string)?;
builder.append_value(&decoded)?;
scratch.clear();
scratch.reserve(string.len().div_ceil(2));
decode_hex_to_writer(string, &mut scratch)?;
builder.append_value(&scratch)?;
}
TapeElement::Null => builder.append_null(),
_ => unreachable!(),
Expand All @@ -115,13 +159,16 @@ impl ArrayDecoder for BinaryViewDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let data_capacity = estimate_data_capacity(tape, pos)?;
let mut builder = BinaryViewBuilder::with_capacity(data_capacity);
let mut scratch = Vec::new();

for p in pos {
match tape.get(*p) {
TapeElement::String(idx) => {
let string = tape.get_string(idx);
let decoded = decode_hex_string(string)?;
builder.append_value(&decoded);
scratch.clear();
scratch.reserve(string.len().div_ceil(2));
decode_hex_to_writer(string, &mut scratch)?;
builder.append_value(&scratch);
}
TapeElement::Null => builder.append_null(),
_ => unreachable!(),
Expand All @@ -139,7 +186,7 @@ fn estimate_data_capacity(tape: &Tape<'_>, pos: &[u32]) -> Result<usize, ArrowEr
TapeElement::String(idx) => {
let string_len = tape.get_string(idx).len();
// two hex characters represent one byte
let decoded_len = string_len / 2;
let decoded_len = string_len.div_ceil(2);
data_capacity += decoded_len;
}
TapeElement::Null => {}
Expand All @@ -150,3 +197,60 @@ fn estimate_data_capacity(tape: &Tape<'_>, pos: &[u32]) -> Result<usize, ArrowEr
}
Ok(data_capacity)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ReaderBuilder;
use arrow_schema::{DataType, Field};
use std::io::Cursor;

#[test]
fn test_decode_hex_to_writer_empty() {
let mut out = Vec::new();
decode_hex_to_writer("", &mut out).unwrap();
assert!(out.is_empty());
}

#[test]
fn test_decode_hex_to_writer_odd_length() {
let mut out = Vec::new();
decode_hex_to_writer("0f0", &mut out).unwrap();
assert_eq!(out, vec![0x0f, 0x00]);

out.clear();
decode_hex_to_writer("a", &mut out).unwrap();
assert_eq!(out, vec![0x0a]);
}

#[test]
fn test_decode_hex_to_writer_invalid() {
let mut out = Vec::new();
let err = decode_hex_to_writer("0f0g", &mut out).unwrap_err();
match err {
ArrowError::JsonError(msg) => {
assert!(msg.contains("invalid hex encoding in binary data"));
assert!(msg.contains("position 3"));
}
_ => panic!("expected JsonError"),
}
}

#[test]
fn test_binary_reader_invalid_hex_is_terminal() {
let field = Field::new("item", DataType::Binary, false);
let data = b"\"0f0g\"\n\"0f00\"\n";
let mut reader = ReaderBuilder::new_with_field(field)
.build(Cursor::new(data))
.unwrap();

let err = reader.next().unwrap().unwrap_err().to_string();
assert!(err.contains("invalid hex encoding in binary data"));

match reader.next() {
None => {}
Some(Err(_)) => {}
Some(Ok(_)) => panic!("expected terminal error after invalid hex"),
}
}
}
Loading