Skip to content

Commit

Permalink
Extend insert into to support Parquet backed tables (#7244)
Browse files Browse the repository at this point in the history
* initial AsyncArrowWriter immplementation

* clean up

* fix information_schema test

* refactor code to new write.rs mod

* remove config dependency on parquet crate

* fmt

* finish implementing session write configs and parsing

* fix ndv doc

* rebase resolve conflicts

* split up test_string_expressions into 2 tests to avoid stack overflow error

* add comments explaining test split
  • Loading branch information
devinjdangelo committed Aug 13, 2023
1 parent 0062778 commit ed85abb
Show file tree
Hide file tree
Showing 14 changed files with 1,072 additions and 424 deletions.
64 changes: 63 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ config_namespace! {
}

config_namespace! {
/// Options related to reading of parquet files
/// Options related to parquet files
pub struct ParquetOptions {
/// If true, reads the Parquet data page level metadata (the
/// Page Index), if present, to reduce the I/O and number of
Expand Down Expand Up @@ -286,6 +286,66 @@ config_namespace! {
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false

// The following map to parquet::file::properties::WriterProperties

/// Sets best effort maximum size of data page in bytes
pub data_pagesize_limit: usize, default = 1024 * 1024

/// Sets write_batch_size in bytes
pub write_batch_size: usize, default = 1024

/// Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".into()

/// Sets default parquet compression codec
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive.
pub compression: String, default = "snappy".into()

/// Sets if dictionary encoding is enabled
pub dictionary_enabled: bool, default = true

/// Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024

/// Sets if statistics are enabled for any column
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive.
pub statistics_enabled: String, default = "page".into()

/// Sets max statistics size for any column
pub max_statistics_size: usize, default = 4096

/// Sets maximum number of rows in a row group
pub max_row_group_size: usize, default = 1024 * 1024

/// Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()

/// Sets column index trucate length
pub column_index_truncate_length: Option<usize>, default = None

/// Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = usize::MAX

/// Sets default encoding for any column
/// Valid values are: plain, plain_dictionary, rle,
/// bit_packed, delta_binary_packed, delta_length_byte_array,
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case sensitive.
pub encoding: String, default = "plain".into()

/// Sets if bloom filter is enabled for any column
pub bloom_filter_enabled: bool, default = false

/// Sets bloom filter false positive probability
pub bloom_filter_fpp: f64, default = 0.05

/// Sets bloom filter number of distinct values
pub bloom_filter_ndv: u64, default = 1_000_000_u64
}
}

Expand Down Expand Up @@ -745,6 +805,8 @@ macro_rules! config_field {
config_field!(String);
config_field!(bool);
config_field!(usize);
config_field!(f64);
config_field!(u64);

/// An implementation trait used to recursively walk configuration
trait Visit {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::{create_writer, stateless_serialize_and_write_files, FileFormat};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::FileWriterMode;
use crate::datasource::file_format::{BatchSerializer, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode,
};
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ use crate::physical_plan::insert::InsertExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::create_writer;
use super::stateless_serialize_and_write_files;
use super::BatchSerializer;
use super::FileFormat;
use super::FileScanConfig;
use super::FileWriterMode;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode,
};
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::physical_plan::NdJsonExec;
Expand Down
Loading

0 comments on commit ed85abb

Please sign in to comment.