Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Serialization of Columns within Parquet RowGroups #7655

Merged
merged 5 commits into from
Oct 25, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Sep 25, 2023

This PR is draft as it depends on apache/arrow-rs#4871

Which issue does this PR close?

Closes #7591
Closes #7589
Closes #7590
Closes #7632
Related to apache/arrow-rs#1718

Rationale for this change

The original parallel parquet writer independently serialized rowgroups. This has two major disadvantages:

  1. High memory usage as each parallel RowGroup is entirely buffered in memory before being flushed to ObjectStore.
  2. No easy way to respect the WriterProperties.max_row_group_size setting as the number of RowGroups is determined by the parallelism.

This PR implements a new approach which parallelizes parquet serialization primarily by serializing the different columns of a single row group in parallel. Different row groups can still be serialized in parallel if RecordBatches are being produced fast enough and the configured buffer size is large enough.

What changes are included in this PR?

  1. New process to break apart an ArrowRowGroupWriter into ArrowColumnWriter components and send each to dedicated parallel task.
  2. Respects the WriterProperties.max_row_group_size setting, closing a row group as soon as this threshold is reached.
  3. If RecordBatches are being produced fast enough, the next row group can begin serializing before the prior has finished.
  4. User Configurable limits on the amount of RowGroups and RecordBatches that are allowed to accumulate in memory before backpressure kicks in. These are made user configurable since optimal values will depend on the systems mix of cpu, memory, and I/O resources.
  5. Refactor parallel parquet code to improve readabiltiy

Are these changes tested?

Yes, by existing tests.

Benchmarking Script

use datafusion::{
    dataframe::DataFrameWriteOptions,
    parquet::{basic::ZstdLevel, file::properties::WriterProperties},
    prelude::*,
};
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use peak_alloc::PeakAlloc;
use std::{sync::Arc, time::Instant};
use url::Url;

#[global_allocator]
static PEAK_ALLOC: PeakAlloc = PeakAlloc;

const FILENAME: &str =
    "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let mut runtimes = Vec::new();
    let mut max_memory = Vec::new();
    for parallelism in ["on", "off"] {
        for single_file_output in [true, false] {
            PEAK_ALLOC.reset_peak_usage();
            let _ctx = if parallelism == "on" {
                let config = SessionConfig::new().set(
                    "datafusion.execution.parquet.allow_single_file_parallelism",
                    true.into(),
                );
                SessionContext::with_config(config)
            } else {
                let config = SessionConfig::new().set(
                    "datafusion.execution.parquet.allow_single_file_parallelism",
                    false.into(),
                );
                SessionContext::with_config(config)
            };
            let local = Arc::new(LocalFileSystem::new());
            let local_url = Url::parse("file://local").unwrap();
            _ctx.runtime_env().register_object_store(&local_url, local);

            let _read_options = ParquetReadOptions::default();

            let _df = _ctx
                .read_parquet(FILENAME, _read_options.clone())
                .await?;
            let _df = _df.clone()
                .union(_df.clone())?
                .union(_df.clone())?
                .union(_df.clone())?
                ;

            let out_path = if single_file_output{
                format!(
                "file://local/home/dev/arrow-datafusion/test_out/bench{parallelism}.parquet"
            )} else{
                format!(
                    "file://local/home/dev/arrow-datafusion/test_out/{parallelism}/"
                )
            };

            let start = Instant::now();
            _df.clone()
                .write_parquet(
                    out_path.as_str(),
                    DataFrameWriteOptions::new().with_single_file_output(single_file_output),
                    None,
                )
                .await?;
            let elapsed = Instant::now() - start;
            println!("write as parquet with parallelism {parallelism} to disk took -> {elapsed:?}");
            runtimes.push(elapsed);

            let peak_mem = PEAK_ALLOC.peak_usage_as_mb();
            println!(
                "Peak memory usage with parallelism {parallelism} is: {}MB",
                peak_mem
            );
            max_memory.push(peak_mem);

            let test_read = _ctx.read_parquet(out_path, _read_options).await.unwrap();

            // Ensure a query over all rows produces a correct result
            test_read
                .aggregate(vec![], vec![avg(col("l_partkey")), count(col("l_partkey"))])?
                .show()
                .await?;
        }
    }

    println!("Runtimes: {:?}", runtimes);
    println!("Peak memory: {:?}", max_memory);

    Ok(())
}

Benchmarking Results

Single File Output = True

  Single File Parallelism Off Single File Parallelism On Diff (%)
Execution Time (s) 154.98 22.65 -85.39%
Peak Memory Usage (MB) 449.7 684.4 52.19%

Single File Output = False

  Single File Parallelism Off Single File Parallelism On  
Execution Time (s) 37.76 25.22 -33.21%
Peak Memory Usage (MB) 1210.0 1017.6 -15.90%

Are there any user-facing changes?

Faster parquet serialization

@devinjdangelo
Copy link
Contributor Author

@alamb @tustvold I updated this branch to build against arrow-rs master branch while we wait for the next release. Currently getting a compile error, which I opened a PR to fix here apache/arrow-rs#4893.

Once that PR is merged, the checks on this branch should pass and if you want to try it locally it should work (so long as there are no breaking changes pushed to arrow-rs master).

@alamb
Copy link
Contributor

alamb commented Oct 5, 2023

Thank you @devinjdangelo

@devinjdangelo devinjdangelo marked this pull request as ready for review October 23, 2023 21:04
@alamb
Copy link
Contributor

alamb commented Oct 23, 2023

I plan to review this tomorrow

@devinjdangelo
Copy link
Contributor Author

With the latest release of arrow-rs, this PR is now ready for review. I have merged in all of the conflicting write related changes that have happened since this has been waiting, and updated the benchmarking/results as well (see PR description).

cc @alamb

tustvold
tustvold previously approved these changes Oct 24, 2023
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, just some minor comments

@@ -271,7 +271,7 @@ config_namespace! {
/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 2
pub max_buffered_batches_per_output_file: usize, default = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very high

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lowered this one back to 2 with no noticeable impact on performance.

/// up in memory in case the parallel writers cannot consume them fast
/// enough. Lowering this number limits memory growth at the cost
/// of potentially lower write speeds.
pub maximum_buffered_record_batches_per_stream: usize, default = 200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems extraordinarily high, in order places we buffer up to 1

Copy link
Contributor Author

@devinjdangelo devinjdangelo Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one does have a significant impact on performance if lowered significantly. I spent some time testing and tuning the exact values. Setting max parallel row groups to 2 and maximum_buffered_record_batches_per_stream to 128 allows two row groups to run in parallel. If this is set too low, backpressure will kick in too long before a second row group can be spawned and everything will wait on just 1 rowgroup to write.

                 ┌─────────────────┐        ┌────────────────────────┐
 ExecPlan───────►│ RowGroup1 Queue ├───────►│Parallel Col Serializers│
   │             └─────────────────┘        └────────────────────────┘
   │
   │
   │
   │             ┌─────────────────┐        ┌────────────────────────┐
   └────────────►│RowGroup2 Queue  ├───────►│Parallel Col Serializers│
                 └─────────────────┘        └────────────────────────┘
Once max_rowgroup_rows
Sent to RowGroup1 Queue
Spawn a new Queue with
its own parallel writers

RowGroup2 Queue won't be created until RowGroup1 Queue has received the desired number of rows. The goal is to have two row groups serializing in parallel if RecordBatches are being produced fast enough. For a streaming plan reading from disk, we probably never need more than 2 in parallel. If we are writing already in-memory data on a system with many cores, it is highly beneficial to boost these queue sizes even more so we could have an arbitrarily large number of row groups serializing in parallel.

Copy link
Contributor

@tustvold tustvold Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead rely on input partitioning? Relying on being able to buffer an entire row group in memory as arrow data has caused major issues in the past. I don't feel comfortable merging this as is, as it will lead to a major regression in memory usage.

Edit: do we even really need row group parallelism, we already have parallelism at the file and column level, it seems a tad unnecessary tbh, and comes with major drawbacks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lower the buffer size down to 2 would save a few 100MB of memory, but at the cost of more than double the serialization time. The decision comes down to which is a better default behavior. Imo, most users can spare the memory for the performance benefit, and for those that can't they can always lower the buffer size. We could instead default to a low buffer size (favoring minimum memory usage over execution time) and I could update the docs to suggest increasing the buffer for signficant performance gains on systems with many cores. Here are the numbers I gathered using the script in the description:

Parallel Parquet Writer, Varying Row Group Buffer Sizes

  Buffer Size=2 Buffer Size=64 Buffer Size=128
Execution Time (s) 62.6 35.6 25.02
Peak Memory Usage (MB) 495.0 606.8 712.05

For comparison, the non parallelized writer takes 155s and peak memory usage is 449.7MB for the same task.

Copy link
Contributor

@tustvold tustvold Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the memory usage of these buffered RecordBatch tracked with a MemoryReservation at all?

For context apache/arrow-rs#3871 was the issue from back when ArrowWriter did something similar, and this caused operational problems for us as writers could easily consume GB of memory when writing large files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't added anything special to track with a MemoryReservation. It is just a bounded channel size. Memory usage can't grow without bound, but it can grow up to however large 128 RecordBatches is in memory. So with a particularly wide table or an extra large batch_size setting, I could see it climbing into Gb of memory. If we are concerned about that, we could use buffer_size=2 as the default and leave it up to user's if it is worth the memory/performance tradeoff to increase the buffer size.

It is also true that the numbers above are gathered from a fairly extreme case of writing ~250 million rows to a single parquet file, and you could instead just write 2 or more files in parallel to close the performance gap. For a more reasonably sized ~50million rows and 1.5Gb the gap is smaller, but it is still there:

Parallel Parquet Writer, Varying Row Group Buffer Sizes

  Buffer Size=2 Buffer Size=64 Buffer Size=128
Execution Time (s) 19.76 13.41 13.68
Peak Memory Usage (MB) 272.2 277.2 281.8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I pushed a commit to change the buffer size back down to 2. I think this is a good default in most cases. I called out in the docs that if you have memory to spare and in particular if you are writing out cached in memory data (like a cached data frame) then you will likely benefit significantly from boosting the buffer sizes.

This optimization could perhaps be automatic at some point if we could automatically set the buffer size based on a known memory budget.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think once you start buffering non-trivial numbers of batches, and therefore non-trivial amounts of data in memory, it is important that this is accounted for in the MemoryPool. Ideally we would account everywhere, but for practical reasons we just really need to get the places where non-trivial amounts can build up, so as to prevent OOMs.

/// parallel in case of limited memory or slow I/O speed causing
/// OOM errors. Lowering this number limits memory growth at the cost
/// of potentially slower write speeds.
pub maximum_parallel_row_group_writers: usize, default = 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in thinking this is still bounded by the input parallelism? Is it worth noting this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be limited based on the queue sizes as described in my other comment.

/// Spawns a tokio task which joins the parallel column writer tasks,
/// and finalizes the row group.
fn spawn_rg_join_and_finalize_task(
column_writer_handles: Vec<JoinHandle<Result<ArrowColumnWriter>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using a JoinSet here so that if a task fails it aborts them all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the order they are joined/appended matters for the SerializedRowGroupWriter, but I would have to double check.


// Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows
drop(col_array_channels);
let finalize_rg_task =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this check that current_rg_rows != 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good call, I updated this.

@tustvold tustvold self-requested a review October 25, 2023 03:37
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

@alamb
Copy link
Contributor

alamb commented Oct 25, 2023

Thank you @devinjdangelo and @tustvold

@alamb alamb merged commit 148f890 into apache:main Oct 25, 2023
23 checks passed
@andygrove andygrove added the enhancement New feature or request label Nov 5, 2023
Dandandan added a commit to coralogix/arrow-datafusion that referenced this pull request Nov 9, 2023
* Cleanup logical optimizer rules.  (apache#7919)

* Initial commit

* Address todos

* Update comments

* Simplifications

* Minor simplifications

* Address reviews

* Add TableScan constructor

* Minor changes

* make try_new_with_schema method of Aggregate private

* Use projection try_new instead of try_new_schema

* Simplifications, add comment

* Review changes

* Improve comments

* Move get_wider_type to type_coercion module

* Clean up type coercion file

---------

Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Parallelize Serialization of Columns within Parquet RowGroups (apache#7655)

* merge main

* fixes and cmt

* review comments, tuning parameters, updating docs

* cargo fmt

* reduce default buffer size to 2 and update docs

* feat: Use bloom filter when reading parquet to skip row groups  (apache#7821)

* feat: implement read bloom filter support

* test: add unit test for read bloom filter

* Simplify bloom filter application

* test: add unit test for bloom filter with sql `in`

* fix: imrpove bloom filter match express

* fix: add more test for bloom filter

* ci: rollback dependences

* ci: merge main branch

* fix: unit tests for bloom filter

* ci: cargo clippy

* ci: cargo clippy

---------

Co-authored-by: Andrew Lamb <[email protected]>

* fix: don't push down volatile predicates in projection (apache#7909)

* fix: don't push down volatile predicates in projection

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Andrew Lamb <[email protected]>

* add suggestions

* fix

* fix doc

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Jonah Gao <[email protected]>

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Jonah Gao <[email protected]>

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Jonah Gao <[email protected]>

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Jonah Gao <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Jonah Gao <[email protected]>

* Add `parquet` feature flag, enabled by default, and make parquet conditional  (apache#7745)

* Make parquet an option by adding multiple cfg attributes without significant code changes.

* Extract parquet logic into submodule from execution::context

* Extract parquet logic into submodule from datafusion_core::dataframe

* Extract more logic into submodule from execution::context

* Move tests from execution::context

* Rename submodules

* [MINOR]: Simplify enforce_distribution, minor changes (apache#7924)

* Initial commit

* Simplifications

* Cleanup imports

* Review

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Add simple window query to sqllogictest (apache#7928)

* ci: upgrade node to version 20 (apache#7918)

* Change input for `to_timestamp` function to be seconds rather than nanoseconds, add `to_timestamp_nanos` (apache#7844)

* Change input for `to_timestamp` function

* docs

* fix examples

* output `to_timestamp` signature as ns

* Minor: Document `parquet` crate feature (apache#7927)

* Minor: reduce some #cfg(feature = "parquet") (apache#7929)

* Minor: reduce use of cfg(parquet) in tests (apache#7930)

* Fix CI failures on `to_timestamp()` calls (apache#7941)

* Change input for `to_timestamp` function

* docs

* fix examples

* output `to_timestamp` signature as ns

* Fix CI `to_timestamp()` failed

* Update datafusion/expr/src/built_in_function.rs

Co-authored-by: Andrew Lamb <[email protected]>

* fix typo

* fix

---------

Co-authored-by: Andrew Lamb <[email protected]>

* minor: add a datatype casting for the updated value (apache#7922)

* minor: cast the updated value to the data type of target column

* Update datafusion/sqllogictest/test_files/update.slt

Co-authored-by: Alex Huang <[email protected]>

* Update datafusion/sqllogictest/test_files/update.slt

Co-authored-by: Alex Huang <[email protected]>

* Update datafusion/sqllogictest/test_files/update.slt

Co-authored-by: Alex Huang <[email protected]>

* fix tests

---------

Co-authored-by: Alex Huang <[email protected]>

* fix (apache#7946)

* Add simple exclude all columns test to sqllogictest (apache#7945)

* Add simple exclude all columns test to sqllogictest

* Add more exclude test cases

* Support Partitioning Data by Dictionary Encoded String Array Types (apache#7896)

* support dictionary encoded string columns for partition cols

* remove debug prints

* cargo fmt

* generic dictionary cast and dict encoded test

* updates from review

* force retry checks

* try checks again

* Minor: Remove array() in array_expression (apache#7961)

* remove array

Signed-off-by: jayzhan211 <[email protected]>

* cleanup others

Signed-off-by: jayzhan211 <[email protected]>

* clippy

Signed-off-by: jayzhan211 <[email protected]>

* cleanup cast

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* cleanup cast

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>

* Minor: simplify update code (apache#7943)

* Add some initial content about creating logical plans (apache#7952)

* Minor: Change from `&mut SessionContext` to `&SessionContext` in substrait (apache#7965)

* Lower &mut SessionContext in substrait

* rm mut ctx in tests

* Fix crate READMEs (apache#7964)

* Minor: Improve `HashJoinExec` documentation (apache#7953)

* Minor: Improve `HashJoinExec` documentation

* Apply suggestions from code review

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* chore: clean useless clone baesd on clippy (apache#7973)

* Add README.md to `core`, `execution` and `physical-plan` crates (apache#7970)

* Add README.md to `core`, `execution` and `physical-plan` crates

* prettier

* Update datafusion/physical-plan/README.md

* Update datafusion/wasmtest/README.md

---------

Co-authored-by: Daniël Heres <[email protected]>

* Move source repartitioning into `ExecutionPlan::repartition` (apache#7936)

* Move source repartitioning into ExecutionPlan::repartition

* cleanup

* update test

* update test

* refine docs

* fix merge

* minor: fix broken links in README.md (apache#7986)

* minor: fix broken links in README.md

* fix proto link

* Minor: Upate the `sqllogictest` crate README (apache#7971)

* Minor: Upate the sqllogictest crate README

* prettier

* Apply suggestions from code review

Co-authored-by: Jonah Gao <[email protected]>
Co-authored-by: jakevin <[email protected]>

---------

Co-authored-by: Jonah Gao <[email protected]>
Co-authored-by: jakevin <[email protected]>

* Improve MemoryCatalogProvider default impl block placement (apache#7975)

* Fix `ScalarValue` handling of NULL values for ListArray (apache#7969)

* Fix try_from_array data type for NULL value in ListArray

* Fix

* Explicitly assert the datatype

* For review

* Refactor of Ordering and Prunability Traversals and States (apache#7985)

* simplify ExprOrdering

* Comment improvements

* Move map/transform comment up

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Keep output as scalar for scalar function if all inputs are scalar (apache#7967)

* Keep output as scalar for scalar function if all inputs are scalar

* Add end-to-end tests

* Fix crate READMEs for core, execution, physical-plan (apache#7990)

* Update sqlparser requirement from 0.38.0 to 0.39.0 (apache#7983)

* chore: Update sqlparser requirement from 0.38.0 to 0.39.0

* support FILTER Aggregates

* Fix panic in multiple distinct aggregates by fixing `ScalarValue::new_list` (apache#7989)

* Fix panic in multiple distinct aggregates by fixing ScalarValue::new_list

* Update datafusion/common/src/scalar.rs

Co-authored-by: Daniël Heres <[email protected]>

---------

Co-authored-by: Daniël Heres <[email protected]>

* MemoryReservation exposes MemoryConsumer (apache#8000)

... as a getter method.

* fix: generate logical plan for `UPDATE SET FROM` statement (apache#7984)

* Create temporary files for reading or writing (apache#8005)

* Create temporary files for reading or writing

* nit

* addr comment

---------

Co-authored-by: zhongjingxiong <[email protected]>

* doc: minor fix to SortExec::with_fetch comment (apache#8011)

* Fix: dataframe_subquery example Optimizer rule `common_sub_expression_eliminate` failed (apache#8016)

* Fix: Optimizer rule 'common_sub_expression_eliminate' failed

* nit

* nit

* nit

---------

Co-authored-by: zhongjingxiong <[email protected]>

* Percent Decode URL Paths (apache#8009) (apache#8012)

* Treat ListingTableUrl as URL-encoded (apache#8009)

* Update lockfile

* Review feedback

* Minor: Extract common deps into workspace (apache#7982)

* Improve datafusion-*

* More common crates

* Extract async-trait

* Extract more

* Fix cli

---------

Co-authored-by: Andrew Lamb <[email protected]>

* minor: change some plan_err to exec_err (apache#7996)

* minor: change some plan_err to exec_err

Signed-off-by: Ruihang Xia <[email protected]>

* change unreachable code to internal error

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>

* Minor: error on unsupported RESPECT NULLs syntax (apache#7998)

* Minor: error on unsupported RESPECT NULLs syntax

* fix clippy

* Update datafusion/sql/tests/sql_integration.rs

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* GroupedHashAggregateStream breaks spill batch (apache#8004)

... into smaller chunks to decrease memory required for merging.

* Minor: Add implementation examples to ExecutionPlan::execute (apache#8013)

* Add implementation examples to ExecutionPlan::execute

* Review feedback

* address comment (apache#7993)

Signed-off-by: jayzhan211 <[email protected]>

* GroupedHashAggregateStream should register spillable consumer (apache#8002)

* fix: single_distinct_aggretation_to_group_by fail (apache#7997)

* fix: single_distinct_aggretation_to_group_by faile

* fix

* move test to groupby.slt

* Read only enough bytes to infer Arrow IPC file schema via stream (apache#7962)

* Read only enough bytes to infer Arrow IPC file schema via stream

* Error checking for collect bytes func

* Update datafusion/core/src/datasource/file_format/arrow.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Minor: remove a strange char (apache#8030)

* Minor: Improve documentation for Filter Pushdown (apache#8023)

* Minor: Improve documentation for Fulter Pushdown

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: jakevin <[email protected]>

* Apply suggestions from code review

* Update datafusion/optimizer/src/push_down_filter.rs

Co-authored-by: Alex Huang <[email protected]>

---------

Co-authored-by: jakevin <[email protected]>
Co-authored-by: Alex Huang <[email protected]>

* Minor: Improve `ExecutionPlan` documentation (apache#8019)

* Minor: Improve `ExecutionPlan` documentation

* Add link to Partitioning

* fix: clippy warnings from nightly rust 1.75 (apache#8025)

Signed-off-by: Ruihang Xia <[email protected]>

* Minor: Avoid recomputing compute_array_ndims in align_array_dimensions (apache#7963)

* Refactor align_array_dimensions

Signed-off-by: jayzhan211 <[email protected]>

* address comment

Signed-off-by: jayzhan211 <[email protected]>

* remove unwrap

Signed-off-by: jayzhan211 <[email protected]>

* address comment

Signed-off-by: jayzhan211 <[email protected]>

* fix rebase

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>

* Minor: fix doc check (apache#8037)

* Minor: remove uncessary #cfg test (apache#8036)

* Minor: remove uncessary #cfg test

* fmt

* Update datafusion/core/src/datasource/file_format/arrow.rs

Co-authored-by: Ruihang Xia <[email protected]>

---------

Co-authored-by: Daniël Heres <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>

* Minor: Improve documentation for  `PartitionStream` and `StreamingTableExec` (apache#8035)

* Minor: Improve documentation for  `PartitionStream` and `StreamingTableExec`

* fmt

* fmt

* Combine Equivalence and Ordering equivalence to simplify state (apache#8006)

* combine equivalence and ordering equivalence

* Remove EquivalenceProperties struct

* Minor changes

* all tests pass

* Refactor oeq

* Simplifications

* Resolve linter errors

* Minor changes

* Minor changes

* Add new tests

* Simplifications window mode selection

* Simplifications

* Use set_satisfy api

* Use utils for aggregate

* Minor changes

* Minor changes

* Minor changes

* All tests pass

* Simplifications

* Simplifications

* Minor changes

* Simplifications

* All tests pass, fix bug

* Remove unnecessary code

* Simplifications

* Minor changes

* Simplifications

* Move oeq join to methods

* Simplifications

* Remove redundant code

* Minor changes

* Minor changes

* Simplifications

* Simplifications

* Simplifications

* Move window to util from method, simplifications

* Simplifications

* Propagate meet in the union

* Simplifications

* Minor changes, rename

* Address berkay reviews

* Simplifications

* Add new buggy test

* Add data test for sort requirement

* Add experimental check

* Add random test

* Minor changes

* Random test gives error

* Fix missing test case

* Minor changes

* Minor changes

* Simplifications

* Minor changes

* Add new test case

* Minor changes

* Address reviews

* Minor changes

* Increase coverage of random tests

* Remove redundant code

* Simplifications

* Simplifications

* Refactor on tests

* Solving clippy errors

* prune_lex improvements

* Fix failing tests

* Update get_finer and get_meet

* Fix window lex ordering implementation

* Buggy state

* Do not use output ordering in the aggregate

* Add union test

* Update comment

* Fix bug, when batch_size is small

* Review Part 1

* Review Part 2

* Change union meet implementation

* Update comments

* Remove redundant check

* Simplify project out_expr function

* Remove Option<Vec<_>> API.

* Do not use project_out_expr

* Simplifications

* Review Part 3

* Review Part 4

* Review Part 5

* Review Part 6

* Review Part 7

* Review Part 8

* Update comments

* Add new unit tests, simplifications

* Resolve linter errors

* Simplify test codes

* Review Part 9

* Add unit tests for remove_redundant entries

* Simplifications

* Review Part 10

* Fix test

* Add new test case, fix implementation

* Review Part 11

* Review Part 12

* Update comments

* Review Part 13

* Review Part 14

* Review Part 15

* Review Part 16

* Review Part 17

* Review Part 18

* Review Part 19

* Review Part 20

* Review Part 21

* Review Part 22

* Review Part 23

* Review Part 24

* Do not construct idx and sort_expr unnecessarily, Update comments, Union meet single entry

* Review Part 25

* Review Part 26

* Name Changes, comment updates

* Review Part 27

* Add issue links

* Address reviews

* Fix failing test

* Update comments

* SortPreservingMerge, SortPreservingRepartition only preserves given expression ordering among input ordering equivalences

---------

Co-authored-by: metesynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Encapsulate `ProjectionMapping` as a struct (apache#8033)

* Minor: Fix bugs in docs for `to_timestamp`, `to_timestamp_seconds`, ... (apache#8040)

* Minor: Fix bugs in docs for `to_timestamp`, `to_timestamp_seconds`, etc

* prettier

* Update docs/source/user-guide/sql/scalar_functions.md

Co-authored-by: comphead <[email protected]>

* Update docs/source/user-guide/sql/scalar_functions.md

Co-authored-by: comphead <[email protected]>

---------

Co-authored-by: comphead <[email protected]>

* Improve comments for `PartitionSearchMode` struct (apache#8047)

* Improve comments

* Make comments partition/group agnostic

* General approach for Array replace (apache#8050)

* checkpoint

Signed-off-by: jayzhan211 <[email protected]>

* optimize non-list

Signed-off-by: jayzhan211 <[email protected]>

* replace list ver

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* rename

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>

* Minor: Remove the irrelevant note from the Expression API doc (apache#8053)

* Minor: Add more documentation about Partitioning (apache#8022)

* Minor: Add more documentation about Partitioning

* fix typo

* Apply suggestions from code review

Co-authored-by: comphead <[email protected]>

* Add more diagrams, improve text

* undo unintended changes

* undo unintended changes

* fix links

* Try and clarify

---------

Co-authored-by: comphead <[email protected]>

* Minor: improve documentation for IsNotNull, DISTINCT, etc (apache#8052)

* Minor: improve documentation for IsNotNull, DISTINCT, etc

* fix

* Prepare 33.0.0 Release (apache#8057)

* changelog

* update version

* update changelog

* Minor: improve error message by adding types to message (apache#8065)

* Minor: improve error message

* add test

* Minor: Remove redundant BuiltinScalarFunction::supports_zero_argument() (apache#8059)

* deprecate BuiltinScalarFunction::supports_zero_argument()

* unify old supports_zero_argument() impl

* Add example to ci (apache#8060)

* feat: add example to ci

* nit

* addr comments

---------

Co-authored-by: zhongjingxiong <[email protected]>

* Update substrait requirement from 0.18.0 to 0.19.0 (apache#8076)

Updates the requirements on [substrait](https://github.com/substrait-io/substrait-rs) to permit the latest version.
- [Release notes](https://github.com/substrait-io/substrait-rs/releases)
- [Changelog](https://github.com/substrait-io/substrait-rs/blob/main/CHANGELOG.md)
- [Commits](substrait-io/substrait-rs@v0.18.0...v0.19.0)

---
updated-dependencies:
- dependency-name: substrait
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Fix incorrect results in COUNT(*) queries with LIMIT (apache#8049)

Co-authored-by: Mark Sirek <[email protected]>

* feat: Support determining extensions from names like `foo.parquet.snappy` as well as `foo.parquet` (apache#7972)

* feat: read files based on the file extention

* fix: some the file extension might be started with . and some not

* fix: rename extention to extension

* chore: use exec_err

* chore: rename extention to extension

* chore: rename extention to extension

* chore: simplify the code

* fix: check table is empty

* ci: fix test

* fix: add err info

* refactor: extract the logic to infer_types

* fix: add tests for different extensions

* fix: ci clippy

* fix: add more tests

* fix: simplify the logic

* fix: ci

* Use FairSpillPool for TaskContext with spillable config (apache#8072)

* Minor: Improve HashJoinStream docstrings (apache#8070)

* Minor: Improve HashJoinStream docstrings

* fix comments

* Update datafusion/physical-plan/src/joins/hash_join.rs

Co-authored-by: comphead <[email protected]>

* Update datafusion/physical-plan/src/joins/hash_join.rs

Co-authored-by: comphead <[email protected]>

---------

Co-authored-by: Daniël Heres <[email protected]>
Co-authored-by: comphead <[email protected]>

* Fixing broken link (apache#8085)

* Fixing broken link

* Update docs/source/contributor-guide/index.md

Thanks for spotting this as well

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* fix: DataFusion suggests invalid functions (apache#8083)

* fix: DataFusion suggests invalid functions

* update test

* Add test for BuiltInWindowFunction

* Replace macro with function for  `array_repeat` (apache#8071)

* General array repeat

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* add test

Signed-off-by: jayzhan211 <[email protected]>

* add test

Signed-off-by: jayzhan211 <[email protected]>

* done

Signed-off-by: jayzhan211 <[email protected]>

* remove test

Signed-off-by: jayzhan211 <[email protected]>

* add comment

Signed-off-by: jayzhan211 <[email protected]>

* fm

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>

* Minor: remove unnecessary projection in `single_distinct_to_group_by` rule (apache#8061)

* Minor: remove unnecessary projection

* fix ci

* minor: Remove duplicate version numbers for arrow, object_store, and parquet dependencies (apache#8095)

* remove duplicate version numbers for arrow, object_store, and parquet dependencies

* cargo update

* use default features in parquet crate

* disable default parquet features in wasmtest

* fix: add match encode/decode  scalar function type (apache#8089)

* feat: Protobuf serde for Json file sink (apache#8062)

* Protobuf serde for Json file sink

* Fix tests

* Fix test

* Minor: use `Expr::alias` in a few places to make the code more concise (apache#8097)

* Minor: Cleanup BuiltinScalarFunction::return_type() (apache#8088)

* Expose metrics from FileSinkExec impl of ExecutionPlan

---------

Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: Ruihang Xia <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Devin D'Angelo <[email protected]>
Co-authored-by: Hengfei Yang <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Huaijin <[email protected]>
Co-authored-by: Jonah Gao <[email protected]>
Co-authored-by: Chih Wang <[email protected]>
Co-authored-by: Jeffrey <[email protected]>
Co-authored-by: Marco Neumann <[email protected]>
Co-authored-by: comphead <[email protected]>
Co-authored-by: Alex Huang <[email protected]>
Co-authored-by: Jay Zhan <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
Co-authored-by: yi wang <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: jakevin <[email protected]>
Co-authored-by: 张林伟 <[email protected]>
Co-authored-by: Berkay Şahin <[email protected]>
Co-authored-by: Marko Milenković <[email protected]>
Co-authored-by: jokercurry <[email protected]>
Co-authored-by: zhongjingxiong <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
Co-authored-by: metesynnada <[email protected]>
Co-authored-by: Yongting You <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Mark Sirek <[email protected]>
Co-authored-by: Mark Sirek <[email protected]>
Co-authored-by: Edmondo Porcu <[email protected]>
Co-authored-by: Syleechan <[email protected]>
Co-authored-by: Dan Harris <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request sqllogictest SQL Logic Tests (.slt)
Projects
None yet
4 participants