Skip to content

Conversation

cetra3
Copy link
Contributor

@cetra3 cetra3 commented Oct 3, 2025

This PR adjusts the way that dictionary values are handled. Before this PR, they were broken in the following ways:

  • The FlightDataEncoder by default will Hydrate dictionary values, hoisting their values rather than encoding dictionaries. This wasn't handled correctly in the network_ execution plans, that were expecting the schema to not change, and so adding some schema adapters resolved this
  • The metrics data did not handle dictionaries very well either when constructing an empty record batch, as it needs the dictionary schema to be loaded. However, as we've already written the stream using FlightDataEncoder, this means that the dictionary ids can be all messed up.

Rather than pfaff about with all that side of things, I've adjusted it to be a "peek" style stream, whereby we wait until the last flight data, and add the metrics to that

/// Creates a FlightData with the given app_metadata and empty RecordBatch using the provided schema.
/// We don't use [arrow_flight::encode::FlightDataEncoder] (and by extension, the [arrow_flight::encode::FlightDataEncoderBuilder])
/// since they skip messages with empty RecordBatch data.
pub fn empty_flight_data_with_app_metadata(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is overall looking really good! I have one question purely for my understanding:

Before, we were appending the Arrow Flight stream with one extra message containing the app_metadata with the metrics, and no actual body (empty RecordBatch)

Now, we are intercepting the last message in the Arrow Flight stream, and we are enriching it with the app_metadata

I see that without these changes, your new test fails with the following error:

Protocol error: Failed to create empty batch FlightData: Ipc error: no dict id for field company

But I don't fully understand why, as I'd have expected the "Before" and "Now" approach to be equivalent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, based on my understanding, when encoding arrow flight data, there are 3 main types:

  • Schemas
  • Record Batches
  • Dictionaries

When the stream starts, the schema is encoded and sent. At that point (when we encode & send the schema) the dictionary tracker is loaded and populated based upon what dictionaries are present within fields. Then we get to the record batches. If any of them have fields with dictionary types, we use the dictionary tracker to determine if we have sent the dictionary already, and then send that if not, along with the record batch itself.

In the old code, we weren't encoding the schema with the empty record batch, so the dictionary tracker was never populated with the dictionaries. This meant that when it came time to encode the record batch (& possibly dictionary) it checks the schema of the record batch, and, determines a field has a dictionary, so consults the tracker as to what to do.

But because we weren't encoding the schema, the dictionary tracker is in a bad state, and so you get the no dict id for field error. I tried side-stepping this problem by encoding the schema first, to sort out the tracker, and throwing away the result. However: to make matters worse, the arrow flight encoder removes all dictionaries by default, hydrating them to their underlying values. This meant that we essentially had two different schemas for the one stream.

So you can fix it by asking the flight data encoder the schema it has and using that to encode the empty batch, but then if you ever want to change whether to hydrate dictionaries in the future or not it'd be broken. So I figured a simpler fix is what is proposed: don't bother writing out a empty record batch, just append the data to the last value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋🏽 Thanks for the contribution! Sending the metadata with the last message is more optimal for sure. I'm reviewing this PR now.

Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! thanks for this PR.

The "peek" style stream, besides just solving the issue, looks like big simplification over the previous TrailingFlightDataStream, and I think it unlocks further simplifications for removing the CallbackStream in a future PR.

We seem to be losing all the TrailingFlightDataStream unit tests, but because this approach is simpler and there's not that much to test, I'd say it is acceptable.

Leaving a +1, but will wait for @jayshrivastava to take a look at this one as the original trailing_flight_data_stream.rs author

Comment on lines 201 to 202
let stream =
stream::poll_fn(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic could be factored out to a map_last_stream method that is not specific to Arrow Flight or anything. Just a function that takes a stream and a FnOnce closure and returns a stream whose last element gets applied the provided FnOnce.

If we had this generic stream mapping, that would allow us to remove the callback_stream.rs file that does something similar, but with a callback that does not mutate the last element.

That might be too much of a refactor for this single PR though... it's probably better done in a new one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, here's a suggestion about how we can use what this PR brings to remove the old callback_stream.rs:

pydantic#1

Let me know if that makes sense.

…tic/fix_dictionaries

Remove `CallbackStream` in  favor of `map_last_stream`

let metrics_collection_capture = self_ready.metrics_collection.clone();
let adapter = DefaultSchemaAdapterFactory::from_schema(self.schema());
let (mapper, _indices) = adapter.map_schema(&self.schema())?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like 1:1 schema mapping. What does it do? Is this just a way to assert that the schema hasn't changed? I think adding a test which shows why this is necessary would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema does change. The Arrow Flight data hydrates dictionary values as real values, and so the schema of the incoming recordbatch is different. We use the mapper here to map back to what the execution plan expects

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that tests still pass without this line.

IIUC, the root problem was on the server - we were sending an empty flight data to the client without sending the schema / dictionary message first. You've fixed this problem.

I don't see an issue on the client that this solves. The flight decoder in the client should be able to handle any message sent by the encoder on the server.

The metrics collector on the client passes through flight data unchanged, minus clearing the app_metadata.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to either have a test which shows why this is needed or remove the lines. Lmk if you think otherwise though!

Once again, I appreciate the contribution 🙏🏽 - the old empty flight data code was sketchy for sure.

Copy link
Contributor Author

@cetra3 cetra3 Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_metrics_collection_e2e_4 fails with this removed from both the network plans

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry I commented one but not the other. This LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an assert here to make sure the schema matches: a141a3b

use futures::stream;

#[tokio::test]
async fn test_map_last_stream_empty_stream() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works in all cases. Say that the last partition for a task is empty. This behavior means we won't send any metrics for any partitions of the task (because we only send metrics for the entire task after the last partition is done).

It also means we may lose metrics from child tasks because this task may have collected them.

Unfortunately we don't have a test for this case. We would certainly benefit from having that.

Copy link
Contributor Author

@cetra3 cetra3 Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Arrow Flight Encoder will always send something as far as I can tell. Even if there are no recordbatches returned, you will still receive the encoded schema

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay. This makes sense.

@gabotechs
Copy link
Collaborator

Thanks again @cetra3!

@gabotechs gabotechs merged commit 5db1101 into datafusion-contrib:main Oct 9, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants