Skip to content

Conversation

@jecsand838
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Decoding Avro single-object encoded streams was brittle when data arrived in partial chunks (e.g., from async or networked sources). The old implementation relied on ad‑hoc prefix handling and assumed a full record would be available, producing hard errors for otherwise normal “incomplete buffer” situations. Additionally, the Avro OCF (Object Container File) path iterated record‑by‑record through a shared row decoder, adding overhead.

This PR introduces a small state machine for single‑object decoding and a block‑aware path for OCF, making streaming more robust and OCF decoding more efficient while preserving the public API surface.

What changes are included in this PR?

Single‑object decoding (streaming)

  • Replace ad‑hoc prefix parsing (expect_prefix, handle_prefix, handle_fingerprint) with an explicit state machine:
    • New enum DecoderState { Magic, Fingerprint, Record, SchemaChange, Finished }.
    • Decoder now tracks state, bytes_remaining, and a fingerprint_buf to incrementally assemble the fingerprint.
  • New helper is_incomplete_data(&ArrowError) -> bool to treat “Unexpected EOF”, “bad varint”, and “offset overflow” as incomplete input instead of fatal errors.
  • Reworked Decoder::decode(&[u8]) -> Result<usize, ArrowError>:
    • Consumes data according to the state machine.
    • Cleanly returns when more bytes are needed (no spurious errors for partial chunks).
    • Defers schema switching until after flushing currently decoded rows.
  • Updated Decoder::flush() to emit a batch only when rows are ready and to transition the state correctly (including a staged SchemaChange).

OCF (Object Container File) decoding

  • Add block‑aware decoding methods on Decoder used by Reader:
    • decode_block(&[u8], count: usize) -> Result<(consumed, records_decoded), ArrowError>
    • flush_block() -> Result<Option<RecordBatch>, ArrowError>
  • Reader now tracks block_count and decodes up to the number of records in the current block, reducing per‑row overhead and improving throughput.
  • ReaderBuilder::build initializes the new block_count path.

API / struct adjustments

  • Remove internal expect_prefix flag from Decoder; behavior is driven by the state machine.
  • ReaderBuilder::make_decoder_with_parts updated accordingly (no behavior change to public builder methods).
  • No public API signature changes for Reader, Decoder, or ReaderBuilder.

Tests

  • Add targeted streaming tests:
    • test_two_messages_same_schema
    • test_two_messages_schema_switch
    • test_split_message_across_chunks
  • Update prefix‑handling tests to validate state transitions (MagicFingerprint, etc.) and new error messages.
  • Retain and exercise existing suites (types, lists, nested structures, decimals, enums, strict mode) with minimal adjustments.

Are these changes tested?

Yes.

  • New unit tests cover:
    • Multi‑message streams with/without schema switches
    • Messages split across chunk boundaries
    • Incremental prefix/fingerprint parsing
  • Existing tests continue to cover OCF reading, compression, complex/nested types, strict mode, etc.
  • The new OCF path is exercised by the unchanged OCF tests since Reader now uses decode_block/flush_block.

Are there any user-facing changes?

N/A

@github-actions github-actions bot added the arrow Changes to the arrow crate label Aug 9, 2025
@jecsand838 jecsand838 changed the title Refactor Avro Decoder to support partial decoding and improve decod… Refactor arrow-avro Decoder to support partial decoding Aug 9, 2025
@jecsand838
Copy link
Contributor Author

jecsand838 commented Aug 9, 2025

@scovich @alamb Here's the follow-up PR with the partial decoding enhancements along with the different paths for file and single object decoding.

@jecsand838 jecsand838 force-pushed the avro-reader-chunking branch 2 times, most recently from c3cd755 to de7fa16 Compare August 9, 2025 22:21
@jecsand838 jecsand838 force-pushed the avro-reader-chunking branch from de7fa16 to adfcb1c Compare August 9, 2025 22:23
Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks nice. Some possible simplifications, but my main question/concern is about zero-length records causing zero bytes consumed? The code seems to keep going back and forth on whether it's allowed/possible?

Ok(n) if n > 0 => {
self.remaining_capacity -= 1;
total_consumed += n;
self.awaiting_body = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there always a fingerprint after each record? Or just a chance to see a fingerprint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's always a magic + fingerprint prefix at the start of each single object encoded record.

Comment on lines 198 to 199
return Err(ArrowError::ParseError(
"Record decoder consumed 0 bytes".into(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought zero-byte records were legal, and we're supposed to keep looping until the output batch is full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think they were legal for single object encodings, but sure enough they are. So I'll remove this. Should have re-read the specs lol.

// Decode either the block count of remaining capacity from `data` (an OCF block payload).
//
// Returns the number of bytes consumed from `data` along with the number of records decoded.
fn decode_block(&mut self, data: &[u8], count: usize) -> Result<(usize, usize), ArrowError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two same-type return values with very different meanings... is it worth defining a struct for it so they have names? Or are there few enough (and always internal) callers to keep track of it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That occurred to me as well. I decided to leave it that way because the only caller is Reader::read and decode_block is not public.

As an aside, I was unsure of the value a public decode_block method would offer. This is due to block encodings only existing in Object Container Files, which the Reader handles. In the future if there's demand for decoding blocks outside of the Reader, then we'd probably want to refactor the code to support Decoder::decode_block(block: Block, codec: Option<CompressionCodec>) -> Result<DecodeRes, ArrowError> or something along those lines like you pointed out. I was just concerned this would be a pre-mature optimization.

@jecsand838
Copy link
Contributor Author

jecsand838 commented Aug 10, 2025

Overall looks nice. Some possible simplifications, but my main question/concern is about zero-length records causing zero bytes consumed? The code seems to keep going back and forth on whether it's allowed/possible?

Ty!

Only reason I added that back in was I didn't think zero byte encodings were legal for single object encodings, but it seems I was wrong and they are. So I'll push up a change to account for that.

jecsand838 and others added 6 commits August 10, 2025 14:04
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
@jecsand838 jecsand838 force-pushed the avro-reader-chunking branch from 40c7e34 to 664d344 Compare August 10, 2025 19:48
@jecsand838
Copy link
Contributor Author

@scovich I went ahead and pushed up changes that remove the zero byte error on single object encodings and improve the Decoder::decode method's readability.

Let me know what you think.

@jecsand838 jecsand838 requested a review from scovich August 10, 2025 19:53
Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code is good now, except I'm worried about robustness in the face of invalid input bytes.

Comment on lines 132 to 138
fn is_incomplete_data(err: &ArrowError) -> bool {
matches!(
err,
ArrowError::ParseError(msg)
if msg.contains("Unexpected EOF")
|| msg.contains("bad varint")
|| msg.contains("offset overflow")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, after thinking more about this over the weekend --

Trying to interpret/suppress these errors will almost certainly make the decoder brittle in the face of malformed input bytes that legitimately trigger these errors. For example, we could put the decoder in an infinite loop where it keeps trying to fetch more and more bytes in hopes of eliminating the error, when the error is fully contained in the existing buffer.

Copy link
Contributor Author

@jecsand838 jecsand838 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I planned to clean this up in the Schema Resolution RecordDecoder PR that's coming after #8047 get merged in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up modifying is_incomplete_data to this:

fn is_incomplete_data(err: &ArrowError) -> bool {
    matches!(
        err,
        ArrowError::ParseError(msg)
            if msg.contains("Unexpected EOF")
    )
}

I double checked the arrow-avro/src/reader/cursor.rs file and we should only get the Unexpected EOF error if there's too few bytes.

I'll also improve the logic in arrow-avro/src/reader/cursor.rs to support a more deliberate and less rigid implementation in a future PR. I left a comment in the code calling this out.

Let me know what you think of this approach.

}
let batch = self.active_decoder.flush()?;
self.remaining_capacity = self.batch_size;
self.apply_pending_schema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush and flush_block are identical except this call to self.apply_pending_schema?
Is there a way to deduplicate the code? Maybe a flush_internal that takes a boolean argument (which the compiler would aggressively inline away as if it were a generic parameter)?

Or just call self.apply_pending_schema unconditionally, knowing it should be a no-op during block decoding because self.pending_schema is always None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call out. I'll create a helper for:

        let batch = self.active_decoder.flush()?;
        self.remaining_capacity = self.batch_size;

I'm wanting to keep the schema change logic completely decoupled from the block decoder/flush path for now. Just to avoid confusion for future contributors and to setup us up for any future Decoder decomposition efforts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed up changes which include a new flush_and_reset method:

    fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
        if self.batch_is_empty() {
            return Ok(None);
        }
        let batch = self.active_decoder.flush()?;
        self.remaining_capacity = self.batch_size;
        Ok(Some(batch))
    }

It abstracted out most of the flush logic and flush_block is now just a public wrapper for that new method.

jecsand838 and others added 2 commits August 11, 2025 12:22
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
@jecsand838 jecsand838 force-pushed the avro-reader-chunking branch from 7096c0e to 9456a11 Compare August 11, 2025 20:22
@jecsand838 jecsand838 force-pushed the avro-reader-chunking branch from 9456a11 to 14303c3 Compare August 11, 2025 20:25
@jecsand838
Copy link
Contributor Author

Code is good now, except I'm worried about robustness in the face of invalid input bytes.

Ty for the fast follow-up review. Let me know what you think of these latest changes!

@jecsand838 jecsand838 requested a review from scovich August 11, 2025 20:31
Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

}

/// Returns true if the decoder has not decoded any batches yet.
pub fn batch_is_empty(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not sure it needs to be pub? But I guess it's plenty well-defined (batch_is_empty is true if flush would return None), so no harm leaving it public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any harm in leaving it pub. Figured it maybe useful for someone, plus batch_is_full was already pub as well so I was trying to keep it consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't realize we already had a pub batch_is_full method. Makes sense!

@alamb
Copy link
Contributor

alamb commented Aug 12, 2025

Thanks for the review and code @jecsand838 and @scovich -- let's keep the train moving

@alamb alamb merged commit 97c0e7c into apache:main Aug 12, 2025
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

arrow Changes to the arrow crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants