From c4bc2fd198d28c2f5f3a75d393c661bbc1e10d5d Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 23 Sep 2024 12:23:38 -0700 Subject: [PATCH 01/13] Add flag to always coerce utf8 to binary --- daft/daft/__init__.pyi | 1 + daft/table/table.py | 2 ++ src/daft-micropartition/src/micropartition.rs | 9 ++--- src/daft-parquet/src/python.rs | 16 +++++---- src/daft-parquet/src/read.rs | 36 +++++++++++++++++-- src/daft-scan/src/glob.rs | 7 ++-- 6 files changed, 56 insertions(+), 15 deletions(-) diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 98f91fadb7..30307bd000 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -870,6 +870,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit | None = None, + coerce_string_to_binary: bool | None = None, file_timeout_ms: int | None = None, ): ... def read_parquet_into_pyarrow_bulk( diff --git a/daft/table/table.py b/daft/table/table.py index 707ea6ec98..8593a2c9f9 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -524,6 +524,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), + coerce_string_to_binary: bool = False, file_timeout_ms: int | None = 900_000, # 15 minutes ) -> pa.Table: fields, metadata, columns, num_rows_read = _read_parquet_into_pyarrow( @@ -535,6 +536,7 @@ def read_parquet_into_pyarrow( io_config=io_config, multithreaded_io=multithreaded_io, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, + coerce_string_to_binary=coerce_string_to_binary, file_timeout_ms=file_timeout_ms, ) schema = pa.schema(fields, metadata=metadata) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index cc8439583c..f732327a8e 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -596,9 +596,9 @@ impl MicroPartition { ( _, _, - FileFormatConfig::Parquet(ParquetSourceConfig { + &FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, - field_id_mapping, + ref field_id_mapping, chunk_size, .. }), @@ -646,12 +646,13 @@ impl MicroPartition { if scan_task.sources.len() == 1 { 1 } else { 128 }, // Hardcoded for to 128 bulk reads cfg.multithreaded_io, &ParquetSchemaInferenceOptions { - coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + coerce_int96_timestamp_unit, + ..Default::default() }, Some(schema.clone()), field_id_mapping.clone(), parquet_metadata, - *chunk_size, + chunk_size, ) .context(DaftCoreComputeSnafu) } diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 930eb7e91b..83a2c2b044 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -10,7 +10,9 @@ pub mod pylib { use daft_table::python::PyTable; use pyo3::{pyfunction, types::PyModule, Bound, PyResult, Python}; - use crate::read::{ArrowChunk, ParquetSchemaInferenceOptions}; + use crate::read::{ + ArrowChunk, ParquetSchemaInferenceOptions, ParquetSchemaInferenceOptionsBuilder, + }; #[allow(clippy::too_many_arguments)] #[pyfunction] pub fn read_parquet( @@ -97,16 +99,19 @@ pub mod pylib { io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, + coerce_string_to_binary: Option, file_timeout_ms: Option, ) -> PyResult { - let read_parquet_result = py.allow_threads(|| { + let (schema, all_arrays, num_rows) = py.allow_threads(|| { let io_client = get_io_client( multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), )?; - let schema_infer_options = ParquetSchemaInferenceOptions::new( - coerce_int96_timestamp_unit.map(|tu| tu.timeunit), - ); + let schema_infer_options = ParquetSchemaInferenceOptionsBuilder { + coerce_int96_timestamp_unit, + coerce_string_to_binary, + } + .build(); crate::read::read_parquet_into_pyarrow( uri, @@ -121,7 +126,6 @@ pub mod pylib { file_timeout_ms, ) })?; - let (schema, all_arrays, num_rows) = read_parquet_result; let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; convert_pyarrow_parquet_read_result_into_py(py, schema, all_arrays, num_rows, &pyarrow) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 38974e2b01..0404c14ae1 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -7,6 +7,8 @@ use std::{ use arrow2::{bitmap::Bitmap, io::parquet::read::schema::infer_schema_with_options}; use common_error::DaftResult; use daft_core::prelude::*; +#[cfg(feature = "python")] +use daft_core::python::PyTimeUnit; use daft_dsl::{optimization::get_required_columns, ExprRef}; use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; @@ -22,18 +24,47 @@ use snafu::ResultExt; use crate::{file::ParquetReaderBuilder, JoinSnafu}; +#[cfg(feature = "python")] +#[derive(Default, Clone)] +pub struct ParquetSchemaInferenceOptionsBuilder { + pub coerce_int96_timestamp_unit: Option, + pub coerce_string_to_binary: Option, +} + +#[cfg(feature = "python")] +impl ParquetSchemaInferenceOptionsBuilder { + pub fn build(self) -> ParquetSchemaInferenceOptions { + self.into() + } +} + #[derive(Clone, Copy, Serialize, Deserialize)] pub struct ParquetSchemaInferenceOptions { pub coerce_int96_timestamp_unit: TimeUnit, + pub coerce_string_to_binary: bool, +} + +#[cfg(feature = "python")] +impl From for ParquetSchemaInferenceOptions { + fn from(builder: ParquetSchemaInferenceOptionsBuilder) -> Self { + let coerce_int96_timestamp_unit = builder + .coerce_int96_timestamp_unit + .map_or(TimeUnit::Nanoseconds, |time_unit| time_unit.into()); + let coerce_string_to_binary = builder.coerce_string_to_binary.unwrap_or(false); + Self { + coerce_int96_timestamp_unit, + coerce_string_to_binary, + } + } } impl ParquetSchemaInferenceOptions { pub fn new(coerce_int96_timestamp_unit: Option) -> Self { - let default: ParquetSchemaInferenceOptions = Default::default(); let coerce_int96_timestamp_unit = - coerce_int96_timestamp_unit.unwrap_or(default.coerce_int96_timestamp_unit); + coerce_int96_timestamp_unit.unwrap_or(TimeUnit::Nanoseconds); ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit, + ..Default::default() } } } @@ -42,6 +73,7 @@ impl Default for ParquetSchemaInferenceOptions { fn default() -> Self { ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit: TimeUnit::Nanoseconds, + coerce_string_to_binary: false, } } } diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 5383235c67..90621e510d 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -166,9 +166,9 @@ impl GlobScanOperator { let schema = match infer_schema { true => { let inferred_schema = match file_format_config.as_ref() { - FileFormatConfig::Parquet(ParquetSourceConfig { + &FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, - field_id_mapping, + ref field_id_mapping, .. }) => { let io_stats = IOStatsContext::new(format!( @@ -180,7 +180,8 @@ impl GlobScanOperator { io_client.clone(), Some(io_stats), ParquetSchemaInferenceOptions { - coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + coerce_int96_timestamp_unit, + ..Default::default() }, field_id_mapping.clone(), )?; From 01ca502204bfe3e8505f77a7fd476681b92770a0 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 23 Sep 2024 14:50:51 -0700 Subject: [PATCH 02/13] Add logic to modify datatype based on string-to-binary-coercion flag --- .../src/io/parquet/read/schema/convert.rs | 34 +++++++++++++------ src/arrow2/src/io/parquet/read/schema/mod.rs | 25 +++++++++----- src/daft-micropartition/src/micropartition.rs | 4 +-- src/daft-parquet/src/file.rs | 11 +++--- src/daft-parquet/src/read.rs | 6 ++-- src/daft-parquet/src/stream_reader.rs | 4 +-- 6 files changed, 53 insertions(+), 31 deletions(-) diff --git a/src/arrow2/src/io/parquet/read/schema/convert.rs b/src/arrow2/src/io/parquet/read/schema/convert.rs index cf9838a2a2..5e144a257a 100644 --- a/src/arrow2/src/io/parquet/read/schema/convert.rs +++ b/src/arrow2/src/io/parquet/read/schema/convert.rs @@ -7,24 +7,27 @@ use parquet2::schema::{ Repetition, }; -use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; -use crate::io::parquet::read::schema::SchemaInferenceOptions; +use crate::{ + datatypes::{DataType, Field, IntervalUnit, TimeUnit}, + io::parquet::read::schema::SchemaInferenceOptions, +}; /// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain /// any physical column. #[allow(dead_code)] pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { - parquet_to_arrow_schema_with_options(fields, &None) + parquet_to_arrow_schema_with_options(fields, None) } /// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference pub fn parquet_to_arrow_schema_with_options( fields: &[ParquetType], - options: &Option, + options: Option, ) -> Vec { + let options = options.unwrap_or_default(); fields .iter() - .filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default()))) + .filter_map(|f| to_field(f, &options)) .collect::>() } @@ -145,9 +148,16 @@ fn from_int64( fn from_byte_array( logical_type: &Option, converted_type: &Option, + options: &SchemaInferenceOptions, ) -> DataType { match (logical_type, converted_type) { - (Some(PrimitiveLogicalType::String), _) => DataType::Utf8, + (Some(PrimitiveLogicalType::String), _) => { + if options.string_coerce_to_binary { + DataType::Binary + } else { + DataType::Utf8 + } + } (Some(PrimitiveLogicalType::Json), _) => DataType::Binary, (Some(PrimitiveLogicalType::Bson), _) => DataType::Binary, (Some(PrimitiveLogicalType::Enum), _) => DataType::Binary, @@ -219,9 +229,11 @@ fn to_primitive_type_inner( PhysicalType::Int96 => DataType::Timestamp(options.int96_coerce_to_timeunit, None), PhysicalType::Float => DataType::Float32, PhysicalType::Double => DataType::Float64, - PhysicalType::ByteArray => { - from_byte_array(&primitive_type.logical_type, &primitive_type.converted_type) - } + PhysicalType::ByteArray => from_byte_array( + &primitive_type.logical_type, + &primitive_type.converted_type, + options, + ), PhysicalType::FixedLenByteArray(length) => from_fixed_len_byte_array( length, primitive_type.logical_type, @@ -440,7 +452,6 @@ mod tests { use parquet2::metadata::SchemaDescriptor; use super::*; - use crate::error::Result; #[test] @@ -1123,8 +1134,9 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema_with_options( parquet_schema.fields(), - &Some(SchemaInferenceOptions { + Some(SchemaInferenceOptions { int96_coerce_to_timeunit: tu, + ..Default::default() }), ); assert_eq!(arrow_fields, fields); diff --git a/src/arrow2/src/io/parquet/read/schema/mod.rs b/src/arrow2/src/io/parquet/read/schema/mod.rs index 293473c233..5b418b0dfc 100644 --- a/src/arrow2/src/io/parquet/read/schema/mod.rs +++ b/src/arrow2/src/io/parquet/read/schema/mod.rs @@ -1,21 +1,25 @@ //! APIs to handle Parquet <-> Arrow schemas. -use crate::datatypes::{Schema, TimeUnit}; -use crate::error::Result; +use crate::{ + datatypes::{Schema, TimeUnit}, + error::Result, +}; mod convert; mod metadata; pub use convert::parquet_to_arrow_schema_with_options; -pub use metadata::{apply_schema_to_fields, read_schema_from_metadata}; -pub use parquet2::metadata::{FileMetaData, KeyValue, SchemaDescriptor}; -pub use parquet2::schema::types::ParquetType; - pub(crate) use convert::*; +pub use metadata::{apply_schema_to_fields, read_schema_from_metadata}; +pub use parquet2::{ + metadata::{FileMetaData, KeyValue, SchemaDescriptor}, + schema::types::ParquetType, +}; use self::metadata::parse_key_value_metadata; /// Options when inferring schemas from Parquet +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SchemaInferenceOptions { /// When inferring schemas from the Parquet INT96 timestamp type, this is the corresponding TimeUnit /// in the inferred Arrow Timestamp type. @@ -25,12 +29,17 @@ pub struct SchemaInferenceOptions { /// (e.g. TimeUnit::Milliseconds) will result in loss of precision, but support a larger range of dates /// without overflowing when parsing the data. pub int96_coerce_to_timeunit: TimeUnit, + + /// If `true`, when inferring schemas from the Parquet String (UTF8 encoded) type, always coerce to a binary type. + /// This will avoid any UTF8 validation that would have originally been performed. + pub string_coerce_to_binary: bool, } impl Default for SchemaInferenceOptions { fn default() -> Self { SchemaInferenceOptions { int96_coerce_to_timeunit: TimeUnit::Nanosecond, + string_coerce_to_binary: false, } } } @@ -42,13 +51,13 @@ impl Default for SchemaInferenceOptions { /// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded, /// indicating that that the file's arrow metadata was incorrectly written. pub fn infer_schema(file_metadata: &FileMetaData) -> Result { - infer_schema_with_options(file_metadata, &None) + infer_schema_with_options(file_metadata, None) } /// Like [`infer_schema`] but with configurable options which affects the behavior of inference pub fn infer_schema_with_options( file_metadata: &FileMetaData, - options: &Option, + options: Option, ) -> Result { let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata()); let fields = parquet_to_arrow_schema_with_options(file_metadata.schema().fields(), options); diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index f732327a8e..c3059626fa 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -1163,7 +1163,7 @@ pub(crate) fn read_parquet_into_micropartition>( let schemas = metadata .iter() .map(|m| { - let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; + let schema = infer_schema_with_options(m, Some((*schema_infer_options).into()))?; let daft_schema = daft_core::prelude::Schema::try_from(&schema)?; DaftResult::Ok(Arc::new(daft_schema)) }) @@ -1187,7 +1187,7 @@ pub(crate) fn read_parquet_into_micropartition>( let schemas = metadata .iter() .map(|m| { - let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; + let schema = infer_schema_with_options(m, Some((*schema_infer_options).into()))?; let daft_schema = daft_core::prelude::Schema::try_from(&schema)?; DaftResult::Ok(Arc::new(daft_schema)) }) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 3b84579b6d..8ba9f4ed25 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -259,11 +259,12 @@ impl ParquetReaderBuilder { } pub fn build(self) -> super::Result { - let mut arrow_schema = - infer_schema_with_options(&self.metadata, &Some(self.schema_inference_options.into())) - .context(UnableToParseSchemaFromMetadataSnafu:: { - path: self.uri.clone(), - })?; + let options = self.schema_inference_options.into(); + let mut arrow_schema = infer_schema_with_options(&self.metadata, Some(options)).context( + UnableToParseSchemaFromMetadataSnafu { + path: self.uri.clone(), + }, + )?; if let Some(names_to_keep) = self.selected_columns { arrow_schema diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 0404c14ae1..2487794f3f 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -84,6 +84,7 @@ impl From fn from(value: ParquetSchemaInferenceOptions) -> Self { arrow2::io::parquet::read::schema::SchemaInferenceOptions { int96_coerce_to_timeunit: value.coerce_int96_timestamp_unit.to_arrow(), + string_coerce_to_binary: value.coerce_string_to_binary, } } } @@ -502,7 +503,7 @@ async fn read_parquet_single_into_arrow( schema_infer_options: ParquetSchemaInferenceOptions, field_id_mapping: Option>>, metadata: Option>, -) -> DaftResult<(arrow2::datatypes::SchemaRef, Vec, usize)> { +) -> DaftResult { let field_id_mapping_provided = field_id_mapping.is_some(); let (source_type, fixed_uri) = parse_url(uri)?; let (metadata, schema, all_arrays, num_rows_read) = if matches!(source_type, SourceType::File) { @@ -921,8 +922,7 @@ pub fn read_parquet_schema( let builder = builder.set_infer_schema_options(schema_inference_options); let metadata = builder.metadata; - let arrow_schema = - infer_schema_with_options(&metadata, &Some(schema_inference_options.into()))?; + let arrow_schema = infer_schema_with_options(&metadata, Some(schema_inference_options.into()))?; let schema = Schema::try_from(&arrow_schema)?; Ok((schema, metadata)) } diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index dd88834aaa..178141c64d 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -229,7 +229,7 @@ pub(crate) fn local_parquet_read_into_column_iters( })?, }; - let schema = infer_schema_with_options(&metadata, &Some(schema_infer_options.into())) + let schema = infer_schema_with_options(&metadata, Some(schema_infer_options.into())) .with_context(|_| super::UnableToParseSchemaFromMetadataSnafu { path: uri.to_string(), })?; @@ -325,7 +325,7 @@ pub(crate) fn local_parquet_read_into_arrow( }; // and infer a [`Schema`] from the `metadata`. - let schema = infer_schema_with_options(&metadata, &Some(schema_infer_options.into())) + let schema = infer_schema_with_options(&metadata, Some(schema_infer_options.into())) .with_context(|_| super::UnableToParseSchemaFromMetadataSnafu { path: uri.to_string(), })?; From 012e2d16ff5553d6cba23aa41dc95c9f5af5b99d Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 23 Sep 2024 18:18:57 -0700 Subject: [PATCH 03/13] Add test for invalid UTF-8 coercion to binary when flag is set --- Cargo.lock | 7 ++ Cargo.toml | 1 + src/daft-parquet/Cargo.toml | 1 + src/daft-parquet/src/read.rs | 83 ++++++++++++++++-- .../assets/parquet-data/invalid_utf8.parquet | Bin 0 -> 517 bytes 5 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 tests/assets/parquet-data/invalid_utf8.parquet diff --git a/Cargo.lock b/Cargo.lock index 90e4745ba8..2ef41c2d6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2029,6 +2029,7 @@ dependencies = [ "itertools 0.11.0", "log", "parquet2", + "path_macro", "pyo3", "rayon", "serde", @@ -3920,6 +3921,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "path_macro" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6e819bbd49d5939f682638fa54826bf1650abddcd65d000923de8ad63cc7d15" + [[package]] name = "pem" version = "3.0.4" diff --git a/Cargo.toml b/Cargo.toml index 55403d40bb..c71052df70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,6 +161,7 @@ jaq-std = "1.2.0" num-derive = "0.3.3" num-traits = "0.2" once_cell = "1.19.0" +path_macro = "1.0.0" pretty_assertions = "1.4.0" rand = "^0.8" rayon = "1.10.0" diff --git a/src/daft-parquet/Cargo.toml b/src/daft-parquet/Cargo.toml index 3e1a4876b8..1ec75b3cb1 100644 --- a/src/daft-parquet/Cargo.toml +++ b/src/daft-parquet/Cargo.toml @@ -26,6 +26,7 @@ tokio-util = {workspace = true} [dev-dependencies] bincode = {workspace = true} +path_macro = {workspace = true} [features] python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python", "daft-dsl/python", "common-arrow-ffi/python"] diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 2487794f3f..f2965cf6a4 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -1051,20 +1051,26 @@ pub fn read_parquet_statistics( #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{path::PathBuf, sync::Arc}; - use common_error::DaftResult; + use common_error::{DaftError, DaftResult}; + use daft_core::prelude::DataType; use daft_io::{IOClient, IOConfig}; use futures::StreamExt; - use parquet2::metadata::FileMetaData; + use parquet2::{ + metadata::FileMetaData, + schema::types::{ParquetType, PrimitiveConvertedType, PrimitiveLogicalType}, + }; - use super::{read_parquet, read_parquet_metadata, stream_parquet}; + use super::{ + read_parquet, read_parquet_metadata, stream_parquet, ParquetSchemaInferenceOptions, + }; const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"; const PARQUET_FILE_LOCAL: &str = "tests/assets/parquet-data/mvp.parquet"; fn get_local_parquet_path() -> String { - let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); d.push("../../"); // CARGO_MANIFEST_DIR is at src/daft-parquet d.push(PARQUET_FILE_LOCAL); d.to_str().unwrap().to_string() @@ -1148,4 +1154,71 @@ mod tests { Ok(()) })? } + + #[test] + fn test_invalid_utf8_parquet_reading() { + let parquet: Arc = path_macro::path!( + env!("CARGO_MANIFEST_DIR") + / ".." + / ".." + / "tests" + / "assets" + / "parquet-data" + / "invalid_utf8.parquet" + ) + .to_str() + .unwrap() + .into(); + let io_config = IOConfig::default(); + let io_client = Arc::new(IOClient::new(io_config.into()).unwrap()); + let runtime_handle = daft_io::get_runtime(true).unwrap(); + let file_metadata = runtime_handle + .block_on_io_pool({ + let parquet = parquet.clone(); + let io_client = io_client.clone(); + async move { read_parquet_metadata(&parquet, io_client, None, None).await } + }) + .flatten() + .unwrap(); + let primitive_type = match file_metadata.schema_descr.fields() { + [parquet_type] => match parquet_type { + ParquetType::PrimitiveType(primitive_type) => primitive_type, + ParquetType::GroupType { .. } => { + panic!("Parquet type should be primitive type, not group type") + } + }, + _ => panic!("This test parquet file should have only 1 field"), + }; + assert_eq!( + primitive_type.logical_type, + Some(PrimitiveLogicalType::String) + ); + assert_eq!( + primitive_type.converted_type, + Some(PrimitiveConvertedType::Utf8) + ); + let table = read_parquet( + &parquet, + None, + None, + None, + None, + None, + io_client, + None, + true, + ParquetSchemaInferenceOptions { + coerce_string_to_binary: true, + ..Default::default() + }, + None, + ) + .unwrap(); + let fields = table.schema.fields.values().collect::>(); + let field = match fields.as_slice() { + &[field] => field, + _ => panic!("There should only be one field in the schema"), + }; + assert_eq!(field.dtype, DataType::Binary); + } } diff --git a/tests/assets/parquet-data/invalid_utf8.parquet b/tests/assets/parquet-data/invalid_utf8.parquet new file mode 100644 index 0000000000000000000000000000000000000000..56e9a9595e191d6dd8e2151e3222cc68adc75d24 GIT binary patch literal 517 zcmb_a&r1S96n;C(W*{QcEITlVy|{ExQ%r-P!&}jctW=1?ORSr$OY6F=zo3(J>~HCh z?9EDtI`fStDkExtW$(Q;sz_emFd0S6cRWFXKZjvmv)@GV4gGFRD S&T92qT}btW3k_gEAK)7hhHwf1 literal 0 HcmV?d00001 From 96cd554a93f1bfcedd469849b6fb993fe94a33c6 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 23 Sep 2024 18:36:04 -0700 Subject: [PATCH 04/13] Edit python APIs for coercing utf8s to binary --- daft/daft/__init__.pyi | 2 +- daft/table/table.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 30307bd000..4c876808f1 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -870,7 +870,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit | None = None, - coerce_string_to_binary: bool | None = None, + string_encoding: str | None = "utf-8", file_timeout_ms: int | None = None, ): ... def read_parquet_into_pyarrow_bulk( diff --git a/daft/table/table.py b/daft/table/table.py index 8593a2c9f9..c59ce3f2ed 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -524,7 +524,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), - coerce_string_to_binary: bool = False, + string_encoding: str | None = "utf-8", file_timeout_ms: int | None = 900_000, # 15 minutes ) -> pa.Table: fields, metadata, columns, num_rows_read = _read_parquet_into_pyarrow( @@ -536,7 +536,7 @@ def read_parquet_into_pyarrow( io_config=io_config, multithreaded_io=multithreaded_io, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, - coerce_string_to_binary=coerce_string_to_binary, + string_encoding=string_encoding, file_timeout_ms=file_timeout_ms, ) schema = pa.schema(fields, metadata=metadata) From 1100a0a4f98071ac5e49e73e08ba792bcf617e3f Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 12:09:07 -0700 Subject: [PATCH 05/13] Change `coerce_string_to_binary` to `string_encoding` - more extensible! --- .../src/io/parquet/read/schema/convert.rs | 12 ++-- src/arrow2/src/io/parquet/read/schema/mod.rs | 35 ++++++++-- src/daft-parquet/src/lib.rs | 3 + src/daft-parquet/src/python.rs | 6 +- src/daft-parquet/src/read.rs | 65 ++++++++++++------- 5 files changed, 82 insertions(+), 39 deletions(-) diff --git a/src/arrow2/src/io/parquet/read/schema/convert.rs b/src/arrow2/src/io/parquet/read/schema/convert.rs index 5e144a257a..1fdeccc7a1 100644 --- a/src/arrow2/src/io/parquet/read/schema/convert.rs +++ b/src/arrow2/src/io/parquet/read/schema/convert.rs @@ -7,6 +7,7 @@ use parquet2::schema::{ Repetition, }; +use super::StringEncoding; use crate::{ datatypes::{DataType, Field, IntervalUnit, TimeUnit}, io::parquet::read::schema::SchemaInferenceOptions, @@ -151,13 +152,10 @@ fn from_byte_array( options: &SchemaInferenceOptions, ) -> DataType { match (logical_type, converted_type) { - (Some(PrimitiveLogicalType::String), _) => { - if options.string_coerce_to_binary { - DataType::Binary - } else { - DataType::Utf8 - } - } + (Some(PrimitiveLogicalType::String), _) => match options.string_encoding { + StringEncoding::Utf8 => DataType::Utf8, + StringEncoding::Raw => DataType::Binary, + }, (Some(PrimitiveLogicalType::Json), _) => DataType::Binary, (Some(PrimitiveLogicalType::Bson), _) => DataType::Binary, (Some(PrimitiveLogicalType::Enum), _) => DataType::Binary, diff --git a/src/arrow2/src/io/parquet/read/schema/mod.rs b/src/arrow2/src/io/parquet/read/schema/mod.rs index 5b418b0dfc..8df857db10 100644 --- a/src/arrow2/src/io/parquet/read/schema/mod.rs +++ b/src/arrow2/src/io/parquet/read/schema/mod.rs @@ -15,9 +15,37 @@ pub use parquet2::{ metadata::{FileMetaData, KeyValue, SchemaDescriptor}, schema::types::ParquetType, }; +use serde::{Deserialize, Serialize}; use self::metadata::parse_key_value_metadata; +/// String encoding options. +/// +/// Each variant of this enum maps to a different interpretation of the underlying binary data: +/// 1. `StringEncoding::Utf8` assumes the underlying binary data is UTF-8 encoded. +/// 2. `StringEncoding::Raw` makes no assumptions about the encoding of the underlying binary data. This variant will change the logical type of the column to `DataType::Binary` in the final schema. +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum StringEncoding { + Raw, + #[default] + Utf8, +} + +impl> TryFrom> for StringEncoding { + type Error = crate::error::Error; + + fn try_from(value: Option) -> Result { + match value.as_ref().map(AsRef::as_ref) { + Some("utf8") => Ok(Self::Utf8), + Some(encoding) => Err(crate::error::Error::InvalidArgumentError(format!( + "Unrecognized encoding: {}", + encoding + ))), + None => Ok(Self::Raw), + } + } +} + /// Options when inferring schemas from Parquet #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SchemaInferenceOptions { @@ -30,16 +58,15 @@ pub struct SchemaInferenceOptions { /// without overflowing when parsing the data. pub int96_coerce_to_timeunit: TimeUnit, - /// If `true`, when inferring schemas from the Parquet String (UTF8 encoded) type, always coerce to a binary type. - /// This will avoid any UTF8 validation that would have originally been performed. - pub string_coerce_to_binary: bool, + /// The string encoding to assume when inferring the schema from Parquet data. + pub string_encoding: StringEncoding, } impl Default for SchemaInferenceOptions { fn default() -> Self { SchemaInferenceOptions { int96_coerce_to_timeunit: TimeUnit::Nanosecond, - string_coerce_to_binary: false, + string_encoding: StringEncoding::default(), } } } diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index d1057e95f7..e907fec22e 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -19,6 +19,9 @@ pub use python::register_modules; #[derive(Debug, Snafu)] pub enum Error { + #[snafu(display("{source}"))] + Arrow2Error { source: arrow2::error::Error }, + #[snafu(display("{source}"))] DaftIOError { source: daft_io::Error }, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 83a2c2b044..7886f7c857 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -99,7 +99,7 @@ pub mod pylib { io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, - coerce_string_to_binary: Option, + string_encoding: Option, file_timeout_ms: Option, ) -> PyResult { let (schema, all_arrays, num_rows) = py.allow_threads(|| { @@ -109,9 +109,9 @@ pub mod pylib { )?; let schema_infer_options = ParquetSchemaInferenceOptionsBuilder { coerce_int96_timestamp_unit, - coerce_string_to_binary, + string_encoding, } - .build(); + .build()?; crate::read::read_parquet_into_pyarrow( uri, diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index f2965cf6a4..e7c8610207 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -4,7 +4,12 @@ use std::{ time::Duration, }; -use arrow2::{bitmap::Bitmap, io::parquet::read::schema::infer_schema_with_options}; +use arrow2::{ + bitmap::Bitmap, + io::parquet::read::schema::{ + infer_schema_with_options, SchemaInferenceOptions, StringEncoding, + }, +}; use common_error::DaftResult; use daft_core::prelude::*; #[cfg(feature = "python")] @@ -25,39 +30,50 @@ use snafu::ResultExt; use crate::{file::ParquetReaderBuilder, JoinSnafu}; #[cfg(feature = "python")] -#[derive(Default, Clone)] +#[derive(Clone)] pub struct ParquetSchemaInferenceOptionsBuilder { pub coerce_int96_timestamp_unit: Option, - pub coerce_string_to_binary: Option, + pub string_encoding: Option, } #[cfg(feature = "python")] impl ParquetSchemaInferenceOptionsBuilder { - pub fn build(self) -> ParquetSchemaInferenceOptions { - self.into() + pub fn build(self) -> crate::Result { + self.try_into() } } -#[derive(Clone, Copy, Serialize, Deserialize)] -pub struct ParquetSchemaInferenceOptions { - pub coerce_int96_timestamp_unit: TimeUnit, - pub coerce_string_to_binary: bool, +#[cfg(feature = "python")] +impl TryFrom for ParquetSchemaInferenceOptions { + type Error = crate::Error; + + fn try_from(value: ParquetSchemaInferenceOptionsBuilder) -> crate::Result { + Ok(ParquetSchemaInferenceOptions { + coerce_int96_timestamp_unit: value + .coerce_int96_timestamp_unit + .map_or(TimeUnit::Nanoseconds, |py_timeunit| py_timeunit.into()), + string_encoding: StringEncoding::try_from(value.string_encoding) + .context(crate::Arrow2Snafu)?, + }) + } } #[cfg(feature = "python")] -impl From for ParquetSchemaInferenceOptions { - fn from(builder: ParquetSchemaInferenceOptionsBuilder) -> Self { - let coerce_int96_timestamp_unit = builder - .coerce_int96_timestamp_unit - .map_or(TimeUnit::Nanoseconds, |time_unit| time_unit.into()); - let coerce_string_to_binary = builder.coerce_string_to_binary.unwrap_or(false); +impl Default for ParquetSchemaInferenceOptionsBuilder { + fn default() -> Self { Self { - coerce_int96_timestamp_unit, - coerce_string_to_binary, + coerce_int96_timestamp_unit: Some(PyTimeUnit::nanoseconds().unwrap()), + string_encoding: Some("utf8".into()), } } } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ParquetSchemaInferenceOptions { + pub coerce_int96_timestamp_unit: TimeUnit, + pub string_encoding: StringEncoding, +} + impl ParquetSchemaInferenceOptions { pub fn new(coerce_int96_timestamp_unit: Option) -> Self { let coerce_int96_timestamp_unit = @@ -73,18 +89,16 @@ impl Default for ParquetSchemaInferenceOptions { fn default() -> Self { ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit: TimeUnit::Nanoseconds, - coerce_string_to_binary: false, + string_encoding: StringEncoding::Utf8, } } } -impl From - for arrow2::io::parquet::read::schema::SchemaInferenceOptions -{ +impl From for SchemaInferenceOptions { fn from(value: ParquetSchemaInferenceOptions) -> Self { - arrow2::io::parquet::read::schema::SchemaInferenceOptions { + SchemaInferenceOptions { int96_coerce_to_timeunit: value.coerce_int96_timestamp_unit.to_arrow(), - string_coerce_to_binary: value.coerce_string_to_binary, + string_encoding: value.string_encoding, } } } @@ -1053,7 +1067,8 @@ pub fn read_parquet_statistics( mod tests { use std::{path::PathBuf, sync::Arc}; - use common_error::{DaftError, DaftResult}; + use arrow2::io::parquet::read::schema::StringEncoding; + use common_error::DaftResult; use daft_core::prelude::DataType; use daft_io::{IOClient, IOConfig}; use futures::StreamExt; @@ -1208,7 +1223,7 @@ mod tests { None, true, ParquetSchemaInferenceOptions { - coerce_string_to_binary: true, + string_encoding: StringEncoding::Raw, ..Default::default() }, None, From ca3d4b79252a9d19c521947d0c2e4607ef38fe6e Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 12:22:15 -0700 Subject: [PATCH 06/13] Update conversion logic --- src/daft-parquet/src/read.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index e7c8610207..0d81e8528e 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -51,8 +51,10 @@ impl TryFrom for ParquetSchemaInferenceOpt Ok(ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit: value .coerce_int96_timestamp_unit - .map_or(TimeUnit::Nanoseconds, |py_timeunit| py_timeunit.into()), - string_encoding: StringEncoding::try_from(value.string_encoding) + .map_or(TimeUnit::Nanoseconds, From::from), + string_encoding: value + .string_encoding + .try_into() .context(crate::Arrow2Snafu)?, }) } From 43965fe8949bab9e3a4d5f996fb1116f2f8efc1a Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 13:28:48 -0700 Subject: [PATCH 07/13] Change "utf8" to "utf-8" instead --- src/arrow2/src/io/parquet/read/schema/mod.rs | 2 +- src/daft-parquet/src/read.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/arrow2/src/io/parquet/read/schema/mod.rs b/src/arrow2/src/io/parquet/read/schema/mod.rs index 8df857db10..5240ea3950 100644 --- a/src/arrow2/src/io/parquet/read/schema/mod.rs +++ b/src/arrow2/src/io/parquet/read/schema/mod.rs @@ -36,7 +36,7 @@ impl> TryFrom> for StringEncoding { fn try_from(value: Option) -> Result { match value.as_ref().map(AsRef::as_ref) { - Some("utf8") => Ok(Self::Utf8), + Some("utf-8") => Ok(Self::Utf8), Some(encoding) => Err(crate::error::Error::InvalidArgumentError(format!( "Unrecognized encoding: {}", encoding diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 0d81e8528e..dba04a6ae4 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -65,7 +65,7 @@ impl Default for ParquetSchemaInferenceOptionsBuilder { fn default() -> Self { Self { coerce_int96_timestamp_unit: Some(PyTimeUnit::nanoseconds().unwrap()), - string_encoding: Some("utf8".into()), + string_encoding: Some("utf-8".into()), } } } From 75a2a5f5a988a7a204197e0c6d2ceca7cc9564dc Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 14:28:40 -0700 Subject: [PATCH 08/13] Add python test for testing new string-encoding option --- tests/table/table_io/test_parquet.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index e568a043f6..48266715a4 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -5,6 +5,7 @@ import os import pathlib import tempfile +from pathlib import Path import pyarrow as pa import pyarrow.parquet as papq @@ -13,6 +14,7 @@ import daft from daft.daft import NativeStorageConfig, PythonStorageConfig, StorageConfig from daft.datatype import DataType, TimeUnit +from daft.exceptions import DaftCoreException from daft.logical.schema import Schema from daft.runners.partitioning import TableParseParquetOptions, TableReadOptions from daft.table import ( @@ -397,3 +399,25 @@ def test_read_parquet_file_missing_column_partial_read_with_pyarrow_bulk(tmpdir) read_back = read_parquet_into_pyarrow_bulk([file_path.as_posix()], columns=["x", "MISSING"]) assert len(read_back) == 1 assert tab.drop("y") == read_back[0] # only read "x" + + +@pytest.mark.parametrize( + "parquet_path", [Path(__file__).parents[2] / "assets" / "parquet-data" / "invalid_utf8.parquet"] +) +def test_parquet_read_string_utf8_into_binary(parquet_path: Path): + import pyarrow as pa + + assert parquet_path.exists() + + try: + read_parquet_into_pyarrow(path=parquet_path.as_posix()) + except DaftCoreException: + # should throw an exception without passing the "string_encoding=None" parameter + pass + + read_back = read_parquet_into_pyarrow(path=parquet_path.as_posix(), string_encoding=None) + + schema = read_back.schema + assert len(schema) == 1 + assert schema[0].name == "invalid_string" + assert schema[0].type == pa.binary() From a73000cf4fdac78487dd9d3ed52f8fcf9cb439db Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 14:53:44 -0700 Subject: [PATCH 09/13] Instead of `str | None = "utf-8"` for string-encoding option, use `str = "utf-8"` - users can specify `"raw"` instead --- src/arrow2/src/io/parquet/read/schema/mod.rs | 20 +++++++++------- src/daft-parquet/src/python.rs | 2 +- src/daft-parquet/src/read.rs | 25 +++++++------------- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/src/arrow2/src/io/parquet/read/schema/mod.rs b/src/arrow2/src/io/parquet/read/schema/mod.rs index 5240ea3950..adb27b2fd9 100644 --- a/src/arrow2/src/io/parquet/read/schema/mod.rs +++ b/src/arrow2/src/io/parquet/read/schema/mod.rs @@ -1,8 +1,10 @@ //! APIs to handle Parquet <-> Arrow schemas. +use std::str::FromStr; + use crate::{ datatypes::{Schema, TimeUnit}, - error::Result, + error::{Error, Result}, }; mod convert; @@ -31,17 +33,17 @@ pub enum StringEncoding { Utf8, } -impl> TryFrom> for StringEncoding { - type Error = crate::error::Error; +impl FromStr for StringEncoding { + type Err = Error; - fn try_from(value: Option) -> Result { - match value.as_ref().map(AsRef::as_ref) { - Some("utf-8") => Ok(Self::Utf8), - Some(encoding) => Err(crate::error::Error::InvalidArgumentError(format!( + fn from_str(value: &str) -> Result { + match value { + "utf-8" => Ok(Self::Utf8), + "raw" => Ok(Self::Raw), + encoding => Err(Error::InvalidArgumentError(format!( "Unrecognized encoding: {}", - encoding + encoding, ))), - None => Ok(Self::Raw), } } } diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 7886f7c857..2d965053c2 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -92,6 +92,7 @@ pub mod pylib { pub fn read_parquet_into_pyarrow( py: Python, uri: &str, + string_encoding: String, columns: Option>, start_offset: Option, num_rows: Option, @@ -99,7 +100,6 @@ pub mod pylib { io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, - string_encoding: Option, file_timeout_ms: Option, ) -> PyResult { let (schema, all_arrays, num_rows) = py.allow_threads(|| { diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index dba04a6ae4..eed16ae5b9 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -33,7 +33,7 @@ use crate::{file::ParquetReaderBuilder, JoinSnafu}; #[derive(Clone)] pub struct ParquetSchemaInferenceOptionsBuilder { pub coerce_int96_timestamp_unit: Option, - pub string_encoding: Option, + pub string_encoding: String, } #[cfg(feature = "python")] @@ -52,10 +52,7 @@ impl TryFrom for ParquetSchemaInferenceOpt coerce_int96_timestamp_unit: value .coerce_int96_timestamp_unit .map_or(TimeUnit::Nanoseconds, From::from), - string_encoding: value - .string_encoding - .try_into() - .context(crate::Arrow2Snafu)?, + string_encoding: value.string_encoding.parse().context(crate::Arrow2Snafu)?, }) } } @@ -65,7 +62,7 @@ impl Default for ParquetSchemaInferenceOptionsBuilder { fn default() -> Self { Self { coerce_int96_timestamp_unit: Some(PyTimeUnit::nanoseconds().unwrap()), - string_encoding: Some("utf-8".into()), + string_encoding: "utf-8".into(), } } } @@ -1069,9 +1066,8 @@ pub fn read_parquet_statistics( mod tests { use std::{path::PathBuf, sync::Arc}; - use arrow2::io::parquet::read::schema::StringEncoding; + use arrow2::{datatypes::DataType, io::parquet::read::schema::StringEncoding}; use common_error::DaftResult; - use daft_core::prelude::DataType; use daft_io::{IOClient, IOConfig}; use futures::StreamExt; use parquet2::{ @@ -1079,9 +1075,7 @@ mod tests { schema::types::{ParquetType, PrimitiveConvertedType, PrimitiveLogicalType}, }; - use super::{ - read_parquet, read_parquet_metadata, stream_parquet, ParquetSchemaInferenceOptions, - }; + use super::*; const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"; const PARQUET_FILE_LOCAL: &str = "tests/assets/parquet-data/mvp.parquet"; @@ -1214,13 +1208,12 @@ mod tests { primitive_type.converted_type, Some(PrimitiveConvertedType::Utf8) ); - let table = read_parquet( + let (schema, _, _) = read_parquet_into_pyarrow( &parquet, None, None, None, None, - None, io_client, None, true, @@ -1231,11 +1224,9 @@ mod tests { None, ) .unwrap(); - let fields = table.schema.fields.values().collect::>(); - let field = match fields.as_slice() { - &[field] => field, + match schema.fields.as_slice() { + [field] => assert_eq!(field.data_type, DataType::Binary), _ => panic!("There should only be one field in the schema"), }; - assert_eq!(field.dtype, DataType::Binary); } } From e846a2cb2b486afaed0218a3e77980bc5abf733c Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 14:58:13 -0700 Subject: [PATCH 10/13] Update python bindings --- daft/daft/__init__.pyi | 4 ++-- daft/table/table.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 4c876808f1..219cce3d23 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1,7 +1,7 @@ import builtins import datetime from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, Iterator +from typing import TYPE_CHECKING, Any, Callable, Iterator, Literal from daft.dataframe.display import MermaidOptions from daft.execution import physical_plan @@ -870,7 +870,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit | None = None, - string_encoding: str | None = "utf-8", + string_encoding: Literal["utf-8"] | Literal["raw"] = "utf-8", file_timeout_ms: int | None = None, ): ... def read_parquet_into_pyarrow_bulk( diff --git a/daft/table/table.py b/daft/table/table.py index c59ce3f2ed..b6dfe2f6d2 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal from daft.arrow_utils import ensure_table from daft.daft import ( @@ -524,7 +524,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), - string_encoding: str | None = "utf-8", + string_encoding: Literal["utf-8"] | Literal["raw"] = "utf-8", file_timeout_ms: int | None = 900_000, # 15 minutes ) -> pa.Table: fields, metadata, columns, num_rows_read = _read_parquet_into_pyarrow( From d04e67223701e5ffd7d3b5fbf8531f6d46d69f05 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 14:59:47 -0700 Subject: [PATCH 11/13] Edit comment --- tests/table/table_io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 48266715a4..9a26ee1380 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -412,7 +412,7 @@ def test_parquet_read_string_utf8_into_binary(parquet_path: Path): try: read_parquet_into_pyarrow(path=parquet_path.as_posix()) except DaftCoreException: - # should throw an exception without passing the "string_encoding=None" parameter + # should throw an exception when `string_encoding` is not specified and is instead defaulted to `"utf-8"` pass read_back = read_parquet_into_pyarrow(path=parquet_path.as_posix(), string_encoding=None) From 37dfa936424fe30f29b99c2f4abbd1e4b8925cf2 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 15:27:32 -0700 Subject: [PATCH 12/13] Change from None to `"raw"` --- tests/table/table_io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 9a26ee1380..25df086cd3 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -415,7 +415,7 @@ def test_parquet_read_string_utf8_into_binary(parquet_path: Path): # should throw an exception when `string_encoding` is not specified and is instead defaulted to `"utf-8"` pass - read_back = read_parquet_into_pyarrow(path=parquet_path.as_posix(), string_encoding=None) + read_back = read_parquet_into_pyarrow(path=parquet_path.as_posix(), string_encoding="raw") schema = read_back.schema assert len(schema) == 1 From 58c07304d2386335a7ec134dccd26b1f349e8a4e Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 24 Sep 2024 15:34:27 -0700 Subject: [PATCH 13/13] Address pytest changes that were requested --- tests/table/table_io/test_parquet.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 25df086cd3..3a17edd4ec 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -409,15 +409,12 @@ def test_parquet_read_string_utf8_into_binary(parquet_path: Path): assert parquet_path.exists() - try: + with pytest.raises(DaftCoreException, match="invalid utf-8 sequence"): read_parquet_into_pyarrow(path=parquet_path.as_posix()) - except DaftCoreException: - # should throw an exception when `string_encoding` is not specified and is instead defaulted to `"utf-8"` - pass read_back = read_parquet_into_pyarrow(path=parquet_path.as_posix(), string_encoding="raw") - schema = read_back.schema assert len(schema) == 1 assert schema[0].name == "invalid_string" assert schema[0].type == pa.binary() + assert read_back["invalid_string"][0].as_py() == b"\x80\x80\x80"