Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support proto serialization of InsertExec #7303

Closed
thinkharderdev opened this issue Aug 16, 2023 · 9 comments
Closed

Support proto serialization of InsertExec #7303

thinkharderdev opened this issue Aug 16, 2023 · 9 comments
Labels
enhancement New feature or request

Comments

@thinkharderdev
Copy link
Contributor

thinkharderdev commented Aug 16, 2023

Is your feature request related to a problem or challenge?

Currently plans that include an InsertExec cannot be serialized to protobuf (and hence used in Ballista)

Describe the solution you'd like

The easiest way to support this would be to modify PhysicalExtensionCodec to support serializing/deserializing a dyn DataSink. So something like:

pub trait PhysicalExtensionCodec: Debug + Send + Sync {
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
        registry: &dyn FunctionRegistry,
    ) -> Result<Arc<dyn ExecutionPlan>>;

    fn try_decode_data_sink(&self, buf: &[u8]) -> Result<Arc<dyn DataSink>> {
        // Default impl for backcompat
        Err(DataFusionError::NotImplemented("PhysicalExtensionCodec::try_decode_data_sink not implemented".into()))
    }

    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;

    fn try_encode_data_sink(&self, sink: Arc<dyn DataSink>, buf: &mut Vec<u8>) -> Result<()> {
        // Default impl for backcompat
        Err(DataFusionError::NotImplemented("PhysicalExtensionCodec::try_encode_data_sink not implemented".into()))
    }
}

In this case the "standard" implementations would be handled directly within the main serde logic and if a given Arc<dyn DataSink> wasn't one of the standard cases then it would try and use the extension codec.

Alternatively we might push serialization to the DataSink trait itself:

#[async_trait]
pub trait DataSink: DisplayAs + Debug + Send + Sync {
    // TODO add desired input ordering
    // How does this sink want its input ordered?

    /// Writes the data to the sink, returns the number of values written
    ///
    /// This method will be called exactly once during each DML
    /// statement. Thus prior to return, the sink should do any commit
    /// or rollback required.
    async fn write_all(
        &self,
        data: Vec<SendableRecordBatchStream>,
        context: &Arc<TaskContext>,
    ) -> Result<u64>;

    /// Encode `self` into the provided buffer
    fn encode(&self, buf: &mut Vec<u8>) -> Result<()>;

    /// Decode a instance of `Self` from a buffer
    fn try_decode(buf: &[u8]) -> Result<Self>;
}

Describe alternatives you've considered

Not do anything

Additional context

No response

@thinkharderdev thinkharderdev added the enhancement New feature or request label Aug 16, 2023
@Jefffrey
Copy link
Contributor

I'm not sure it would be desirable to extend PhysicalExtensionCodec to include functions for handling DataSinks as not all extension types would deal with DataSinks. And having the encoding methods instead in DataSink itself I'm also unsure about, as it might be better to handle proto serde within the datafusion-proto crate itself.

I have in mind three solutions.


1

Leave as is, and require anyone desiring this behaviour to implement a PhysicalExtensionNode on their side with an accompanying PhysicalExtensionCodec implementation. This is probably the least desirable option considering FileSinkExec (aka InsertExec) is provided by DataFusion, so one might expect it to support proto SerDe.


2

There are currently 3 implementations of DataSink provided by DataFusion:

  • ParquetSink
  • CsvSink
  • JsonSink

(MemSink I'm not counting as I don't think we can support proto SerDe for it...)

So we could introduce three new PhysicalPlanNode types into proto:

message ParquetFileSinkExecNode {
  ...
}

message CsvFileSinkExecNode {
  ...
}

message JsonFileSinkExecNode {
  ...
}

These represent the default supported FileSinkExecs by DataFusion, and if a consumer has their own version of DataSink implemented for a FileSinkExec (I think that's possible?), then they would need to rely on implementing it via PhysicalExtensionNode with an accompanying PhysicalExtensionCodec.


3

Instead of smashing the FileSinkExec and DataSinks together in option 2, we would separate them:

message FileSinkExec {
  PhysicalPlanNode input = 1;
  DataSink sink = 2;
  ...
}

message DataSink {
  oneof DataSinkImpl {
    ParquetSink parquet_sink = 1;
    CsvSink csv_sink = 2;
    JsonSink json_sink = 3;
    DataSinkExtension extension = 4;
  }
}

message ParquetSink {
  ...
}

message CsvSink {
  ...
}

message JsonSink {
  ...
}

message DataSinkExtension {
  bytes sink = 1;
}

This would more accurately model how it's represented in DataFusion, but has the disadvantage of requiring a new extension type, which presumably would need a new codec as well (similar to how PhysicalExtensionNode requires PhysicalExtensionCodec)


I'm taking a stab at the third option (though maybe the second option would be better? 🤔), but would greatly appreciate any insight on the matter.

@thinkharderdev
Copy link
Contributor Author

I'm not sure it would be desirable to extend PhysicalExtensionCodec to include functions for handling DataSinks as not all extension types would deal with DataSinks. And having the encoding methods instead in DataSink itself I'm also unsure about, as it might be better to handle proto serde within the datafusion-proto crate itself.

I have in mind three solutions.

1

Leave as is, and require anyone desiring this behaviour to implement a PhysicalExtensionNode on their side with an accompanying PhysicalExtensionCodec implementation. This is probably the least desirable option considering FileSinkExec (aka InsertExec) is provided by DataFusion, so one might expect it to support proto SerDe.

2

There are currently 3 implementations of DataSink provided by DataFusion:

  • ParquetSink
  • CsvSink
  • JsonSink

(MemSink I'm not counting as I don't think we can support proto SerDe for it...)

So we could introduce three new PhysicalPlanNode types into proto:

message ParquetFileSinkExecNode {
  ...
}

message CsvFileSinkExecNode {
  ...
}

message JsonFileSinkExecNode {
  ...
}

These represent the default supported FileSinkExecs by DataFusion, and if a consumer has their own version of DataSink implemented for a FileSinkExec (I think that's possible?), then they would need to rely on implementing it via PhysicalExtensionNode with an accompanying PhysicalExtensionCodec.

3

Instead of smashing the FileSinkExec and DataSinks together in option 2, we would separate them:

message FileSinkExec {
  PhysicalPlanNode input = 1;
  DataSink sink = 2;
  ...
}

message DataSink {
  oneof DataSinkImpl {
    ParquetSink parquet_sink = 1;
    CsvSink csv_sink = 2;
    JsonSink json_sink = 3;
    DataSinkExtension extension = 4;
  }
}

message ParquetSink {
  ...
}

message CsvSink {
  ...
}

message JsonSink {
  ...
}

message DataSinkExtension {
  bytes sink = 1;
}

This would more accurately model how it's represented in DataFusion, but has the disadvantage of requiring a new extension type, which presumably would need a new codec as well (similar to how PhysicalExtensionNode requires PhysicalExtensionCodec)

I'm taking a stab at the third option (though maybe the second option would be better? 🤔), but would greatly appreciate any insight on the matter.

FWIW option 2 is what we are doing now and it works easily enough. But I think option 3 sounds pretty good to me. But that seems like it would basically just boil down to adding methods to PhysicalExtensionCodec?

@Jefffrey
Copy link
Contributor

FWIW option 2 is what we are doing now and it works easily enough. But I think option 3 sounds pretty good to me. But that seems like it would basically just boil down to adding methods to PhysicalExtensionCodec?

Yeah haven't given too much thought to the interop code yet, just mainly been considering the proto side, so unsure how the code would turn out.

If you've already been implementing option 2, how have you been handling encoding/decoding ParquetSinks and CsvSinks? Since they contain FileSinkConfigs which contain FileTypeWriterOptions which then if you keep going, will inevitably contain parquet::file::properties::WriterProperties for parquet and arrow::csv::WriterBuilder for csv, both from arrow-rs. Do you just encode the options as a map of string to string?

@thinkharderdev
Copy link
Contributor Author

FWIW option 2 is what we are doing now and it works easily enough. But I think option 3 sounds pretty good to me. But that seems like it would basically just boil down to adding methods to PhysicalExtensionCodec?

Yeah haven't given too much thought to the interop code yet, just mainly been considering the proto side, so unsure how the code would turn out.

If you've already been implementing option 2, how have you been handling encoding/decoding ParquetSinks and CsvSinks? Since they contain FileSinkConfigs which contain FileTypeWriterOptions which then if you keep going, will inevitably contain parquet::file::properties::WriterProperties for parquet and arrow::csv::WriterBuilder for csv, both from arrow-rs. Do you just encode the options as a map of string to string?

We actually have our own DataSink implementation that does some specialized stuff so that's why it was easy

@Jefffrey
Copy link
Contributor

Jefffrey commented Nov 3, 2023

We actually have our own DataSink implementation that does some specialized stuff so that's why it was easy

I see. If we go ahead with option 2 you might still have to use your own implementation of the PhysicalExtensionCodec I guess. I'll go ahead with option 2, maybe with just one of Parquet/CSV/JSON datasinks to limit scope and see how it turns out.

@Jefffrey
Copy link
Contributor

Jefffrey commented Nov 6, 2023

Decided to go with JSON first as it was the most trivial to implement. Parquet is difficult as it relies on WriterProperties struct from arrow-rs crate, and CSV similarly relies on WriterBuilder from arrow-rs so these need extra work on the serde for those structs

@alamb
Copy link
Contributor

alamb commented Nov 7, 2023

One potential option for WriterProperities / builder would be to port the parsing logic upstream to parquet-rs: apache/arrow-rs#4693

And then you could serialize the options as a string 🤔

@Jefffrey
Copy link
Contributor

Issue looks to be completed by #8646

cc @alamb

@alamb
Copy link
Contributor

alamb commented Dec 30, 2023

Issue looks to be completed by #8646

I agree -- thanks for pointing this out @Jefffrey

@alamb alamb closed this as completed Dec 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants