diff --git a/.github/workflows/labeler/labeler-config.yml b/.github/workflows/labeler/labeler-config.yml index e408130725215..38d88059dab70 100644 --- a/.github/workflows/labeler/labeler-config.yml +++ b/.github/workflows/labeler/labeler-config.yml @@ -58,7 +58,7 @@ execution: datasource: - changed-files: - - any-glob-to-any-file: ['datafusion/datasource/**/*', 'datafusion/datasource-avro/**/*', 'datafusion/datasource-csv/**/*', 'datafusion/datasource-json/**/*', 'datafusion/datasource-parquet/**/*'] + - any-glob-to-any-file: ['datafusion/datasource/**/*', 'datafusion/datasource-avro/**/*', 'datafusion/datasource-arrow/**/*', 'datafusion/datasource-csv/**/*', 'datafusion/datasource-json/**/*', 'datafusion/datasource-parquet/**/*'] functions: - changed-files: diff --git a/Cargo.lock b/Cargo.lock index 00bd64f21eb11..f81f54b7e5ad6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1789,7 +1789,6 @@ name = "datafusion" version = "50.2.0" dependencies = [ "arrow", - "arrow-ipc", "arrow-schema", "async-trait", "bytes", @@ -1803,6 +1802,7 @@ dependencies = [ "datafusion-common", "datafusion-common-runtime", "datafusion-datasource", + "datafusion-datasource-arrow", "datafusion-datasource-avro", "datafusion-datasource-csv", "datafusion-datasource-json", @@ -2026,6 +2026,29 @@ dependencies = [ "zstd", ] +[[package]] +name = "datafusion-datasource-arrow" +version = "50.2.0" +dependencies = [ + "arrow", + "arrow-ipc", + "async-trait", + "bytes", + "chrono", + "datafusion-common", + "datafusion-common-runtime", + "datafusion-datasource", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "datafusion-session", + "futures", + "itertools 0.14.0", + "object_store", + "tokio", +] + [[package]] name = "datafusion-datasource-avro" version = "50.2.0" diff --git a/Cargo.toml b/Cargo.toml index dd0b20de528af..79c14d6cca799 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "datafusion/catalog", "datafusion/catalog-listing", "datafusion/datasource", + "datafusion/datasource-arrow", "datafusion/datasource-avro", "datafusion/datasource-csv", "datafusion/datasource-json", @@ -116,6 +117,7 @@ datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = "5 datafusion-common = { path = "datafusion/common", version = "50.2.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "50.2.0" } datafusion-datasource = { path = "datafusion/datasource", version = "50.2.0", default-features = false } +datafusion-datasource-arrow = { path = "datafusion/datasource-arrow", version = "50.2.0", default-features = false } datafusion-datasource-avro = { path = "datafusion/datasource-avro", version = "50.2.0", default-features = false } datafusion-datasource-csv = { path = "datafusion/datasource-csv", version = "50.2.0", default-features = false } datafusion-datasource-json = { path = "datafusion/datasource-json", version = "50.2.0", default-features = false } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index d3bc4546588de..a5a715cea94f1 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -47,7 +47,7 @@ compression = [ "bzip2", "flate2", "zstd", - "arrow-ipc/zstd", + "datafusion-datasource-arrow/compression", "datafusion-datasource/compression", ] crypto_expressions = ["datafusion-functions/crypto_expressions"] @@ -109,7 +109,6 @@ extended_tests = [] [dependencies] arrow = { workspace = true } -arrow-ipc = { workspace = true } arrow-schema = { workspace = true, features = ["canonical_extension_types"] } async-trait = { workspace = true } bytes = { workspace = true } @@ -120,6 +119,7 @@ datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } datafusion-datasource = { workspace = true } +datafusion-datasource-arrow = { workspace = true } datafusion-datasource-avro = { workspace = true, optional = true } datafusion-datasource-csv = { workspace = true } datafusion-datasource-json = { workspace = true } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 25bc166d657a5..8701f96eb3b84 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -15,510 +15,5 @@ // specific language governing permissions and limitations // under the License. -//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions -//! -//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) - -use std::any::Any; -use std::borrow::Cow; -use std::collections::HashMap; -use std::fmt::{self, Debug}; -use std::sync::Arc; - -use super::file_compression_type::FileCompressionType; -use super::write::demux::DemuxedStreamReceiver; -use super::write::SharedBuffer; -use super::FileFormatFactory; -use crate::datasource::file_format::write::get_writer_schema; -use crate::datasource::file_format::FileFormat; -use crate::datasource::physical_plan::{ArrowSource, FileSink, FileSinkConfig}; -use crate::error::Result; -use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; - -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::error::ArrowError; -use arrow::ipc::convert::fb_to_schema; -use arrow::ipc::reader::FileReader; -use arrow::ipc::writer::IpcWriteOptions; -use arrow::ipc::{root_as_message, CompressionType}; -use datafusion_catalog::Session; -use datafusion_common::parsers::CompressionTypeVariant; -use datafusion_common::{ - internal_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics, - DEFAULT_ARROW_EXTENSION, -}; -use datafusion_common_runtime::{JoinSet, SpawnedTask}; -use datafusion_datasource::display::FileGroupDisplay; -use datafusion_datasource::file::FileSource; -use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; -use datafusion_datasource::sink::{DataSink, DataSinkExec}; -use datafusion_datasource::write::ObjectWriterBuilder; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_expr::dml::InsertOp; -use datafusion_physical_expr_common::sort_expr::LexRequirement; - -use async_trait::async_trait; -use bytes::Bytes; -use datafusion_datasource::source::DataSourceExec; -use futures::stream::BoxStream; -use futures::StreamExt; -use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; -use tokio::io::AsyncWriteExt; - -/// Initial writing buffer size. Note this is just a size hint for efficiency. It -/// will grow beyond the set value if needed. -const INITIAL_BUFFER_BYTES: usize = 1048576; - -/// If the buffered Arrow data exceeds this size, it is flushed to object store -const BUFFER_FLUSH_BYTES: usize = 1024000; - -#[derive(Default, Debug)] -/// Factory struct used to create [ArrowFormat] -pub struct ArrowFormatFactory; - -impl ArrowFormatFactory { - /// Creates an instance of [ArrowFormatFactory] - pub fn new() -> Self { - Self {} - } -} - -impl FileFormatFactory for ArrowFormatFactory { - fn create( - &self, - _state: &dyn Session, - _format_options: &HashMap, - ) -> Result> { - Ok(Arc::new(ArrowFormat)) - } - - fn default(&self) -> Arc { - Arc::new(ArrowFormat) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl GetExt for ArrowFormatFactory { - fn get_ext(&self) -> String { - // Removes the dot, i.e. ".parquet" -> "parquet" - DEFAULT_ARROW_EXTENSION[1..].to_string() - } -} - -/// Arrow `FileFormat` implementation. -#[derive(Default, Debug)] -pub struct ArrowFormat; - -#[async_trait] -impl FileFormat for ArrowFormat { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_ext(&self) -> String { - ArrowFormatFactory::new().get_ext() - } - - fn get_ext_with_compression( - &self, - file_compression_type: &FileCompressionType, - ) -> Result { - let ext = self.get_ext(); - match file_compression_type.get_variant() { - CompressionTypeVariant::UNCOMPRESSED => Ok(ext), - _ => Err(internal_datafusion_err!( - "Arrow FileFormat does not support compression." - )), - } - } - - fn compression_type(&self) -> Option { - None - } - - async fn infer_schema( - &self, - _state: &dyn Session, - store: &Arc, - objects: &[ObjectMeta], - ) -> Result { - let mut schemas = vec![]; - for object in objects { - let r = store.as_ref().get(&object.location).await?; - let schema = match r.payload { - #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(mut file, _) => { - let reader = FileReader::try_new(&mut file, None)?; - reader.schema() - } - GetResultPayload::Stream(stream) => { - infer_schema_from_file_stream(stream).await? - } - }; - schemas.push(schema.as_ref().clone()); - } - let merged_schema = Schema::try_merge(schemas)?; - Ok(Arc::new(merged_schema)) - } - - async fn infer_stats( - &self, - _state: &dyn Session, - _store: &Arc, - table_schema: SchemaRef, - _object: &ObjectMeta, - ) -> Result { - Ok(Statistics::new_unknown(&table_schema)) - } - - async fn create_physical_plan( - &self, - _state: &dyn Session, - conf: FileScanConfig, - ) -> Result> { - let source = Arc::new(ArrowSource::default()); - let config = FileScanConfigBuilder::from(conf) - .with_source(source) - .build(); - - Ok(DataSourceExec::from_data_source(config)) - } - - async fn create_writer_physical_plan( - &self, - input: Arc, - _state: &dyn Session, - conf: FileSinkConfig, - order_requirements: Option, - ) -> Result> { - if conf.insert_op != InsertOp::Append { - return not_impl_err!("Overwrites are not implemented yet for Arrow format"); - } - - let sink = Arc::new(ArrowFileSink::new(conf)); - - Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) - } - - fn file_source(&self) -> Arc { - Arc::new(ArrowSource::default()) - } -} - -/// Implements [`FileSink`] for writing to arrow_ipc files -struct ArrowFileSink { - config: FileSinkConfig, -} - -impl ArrowFileSink { - fn new(config: FileSinkConfig) -> Self { - Self { config } - } -} - -#[async_trait] -impl FileSink for ArrowFileSink { - fn config(&self) -> &FileSinkConfig { - &self.config - } - - async fn spawn_writer_tasks_and_join( - &self, - context: &Arc, - demux_task: SpawnedTask>, - mut file_stream_rx: DemuxedStreamReceiver, - object_store: Arc, - ) -> Result { - let mut file_write_tasks: JoinSet> = - JoinSet::new(); - - let ipc_options = - IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)? - .try_with_compression(Some(CompressionType::LZ4_FRAME))?; - while let Some((path, mut rx)) = file_stream_rx.recv().await { - let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES); - let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options( - shared_buffer.clone(), - &get_writer_schema(&self.config), - ipc_options.clone(), - )?; - let mut object_store_writer = ObjectWriterBuilder::new( - FileCompressionType::UNCOMPRESSED, - &path, - Arc::clone(&object_store), - ) - .with_buffer_size(Some( - context - .session_config() - .options() - .execution - .objectstore_writer_buffer_size, - )) - .build()?; - file_write_tasks.spawn(async move { - let mut row_count = 0; - while let Some(batch) = rx.recv().await { - row_count += batch.num_rows(); - arrow_writer.write(&batch)?; - let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap(); - if buff_to_flush.len() > BUFFER_FLUSH_BYTES { - object_store_writer - .write_all(buff_to_flush.as_slice()) - .await?; - buff_to_flush.clear(); - } - } - arrow_writer.finish()?; - let final_buff = shared_buffer.buffer.try_lock().unwrap(); - - object_store_writer.write_all(final_buff.as_slice()).await?; - object_store_writer.shutdown().await?; - Ok(row_count) - }); - } - - let mut row_count = 0; - while let Some(result) = file_write_tasks.join_next().await { - match result { - Ok(r) => { - row_count += r?; - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - } - - demux_task - .join_unwind() - .await - .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; - Ok(row_count as u64) - } -} - -impl Debug for ArrowFileSink { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ArrowFileSink").finish() - } -} - -impl DisplayAs for ArrowFileSink { - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "ArrowFileSink(file_groups=",)?; - FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?; - write!(f, ")") - } - DisplayFormatType::TreeRender => { - writeln!(f, "format: arrow")?; - write!(f, "file={}", &self.config.original_url) - } - } - } -} - -#[async_trait] -impl DataSink for ArrowFileSink { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> &SchemaRef { - self.config.output_schema() - } - - async fn write_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - FileSink::write_all(self, data, context).await - } -} - -const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; -const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; - -/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. -/// See -async fn infer_schema_from_file_stream( - mut stream: BoxStream<'static, object_store::Result>, -) -> Result { - // Expected format: - // - 6 bytes - // - 2 bytes - // - 4 bytes, not present below v0.15.0 - // - 4 bytes - // - // - - // So in first read we need at least all known sized sections, - // which is 6 + 2 + 4 + 4 = 16 bytes. - let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; - - // Files should start with these magic bytes - if bytes[0..6] != ARROW_MAGIC { - return Err(ArrowError::ParseError( - "Arrow file does not contain correct header".to_string(), - ))?; - } - - // Since continuation marker bytes added in later versions - let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER { - (&bytes[12..16], 16) - } else { - (&bytes[8..12], 12) - }; - - let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; - let meta_len = i32::from_le_bytes(meta_len); - - // Read bytes for Schema message - let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize { - // Need to read more bytes to decode Message - let mut block_data = Vec::with_capacity(meta_len as usize); - // In case we had some spare bytes in our initial read chunk - block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]); - let size_to_read = meta_len as usize - block_data.len(); - let block_data = - collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?; - Cow::Owned(block_data) - } else { - // Already have the bytes we need - let end_index = meta_len as usize + rest_of_bytes_start_index; - let block_data = &bytes[rest_of_bytes_start_index..end_index]; - Cow::Borrowed(block_data) - }; - - // Decode Schema message - let message = root_as_message(&block_data).map_err(|err| { - ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}")) - })?; - let ipc_schema = message.header_as_schema().ok_or_else(|| { - ArrowError::IpcError("Unable to read IPC message as schema".to_string()) - })?; - let schema = fb_to_schema(ipc_schema); - - Ok(Arc::new(schema)) -} - -async fn collect_at_least_n_bytes( - stream: &mut BoxStream<'static, object_store::Result>, - n: usize, - extend_from: Option>, -) -> Result> { - let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n)); - // If extending existing buffer then ensure we read n additional bytes - let n = n + buf.len(); - while let Some(bytes) = stream.next().await.transpose()? { - buf.extend_from_slice(&bytes); - if buf.len() >= n { - break; - } - } - if buf.len() < n { - return Err(ArrowError::ParseError( - "Unexpected end of byte stream for Arrow IPC file".to_string(), - ))?; - } - Ok(buf) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::execution::context::SessionContext; - - use chrono::DateTime; - use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path}; - - #[tokio::test] - async fn test_infer_schema_stream() -> Result<()> { - let mut bytes = std::fs::read("tests/data/example.arrow")?; - bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file - let location = Path::parse("example.arrow")?; - let in_memory_store: Arc = Arc::new(InMemory::new()); - in_memory_store.put(&location, bytes.into()).await?; - - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); - let object_meta = ObjectMeta { - location, - last_modified: DateTime::default(), - size: u64::MAX, - e_tag: None, - version: None, - }; - - let arrow_format = ArrowFormat {}; - let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; - - // Test chunk sizes where too small so we keep having to read more bytes - // And when large enough that first read contains all we need - for chunk_size in [7, 3000] { - let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); - let inferred_schema = arrow_format - .infer_schema( - &state, - &(store.clone() as Arc), - std::slice::from_ref(&object_meta), - ) - .await?; - let actual_fields = inferred_schema - .fields() - .iter() - .map(|f| format!("{}: {:?}", f.name(), f.data_type())) - .collect::>(); - assert_eq!(expected, actual_fields); - } - - Ok(()) - } - - #[tokio::test] - async fn test_infer_schema_short_stream() -> Result<()> { - let mut bytes = std::fs::read("tests/data/example.arrow")?; - bytes.truncate(20); // should cause error that file shorter than expected - let location = Path::parse("example.arrow")?; - let in_memory_store: Arc = Arc::new(InMemory::new()); - in_memory_store.put(&location, bytes.into()).await?; - - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); - let object_meta = ObjectMeta { - location, - last_modified: DateTime::default(), - size: u64::MAX, - e_tag: None, - version: None, - }; - - let arrow_format = ArrowFormat {}; - - let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7)); - let err = arrow_format - .infer_schema( - &state, - &(store.clone() as Arc), - std::slice::from_ref(&object_meta), - ) - .await; - - assert!(err.is_err()); - assert_eq!( - "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", - err.unwrap_err().to_string().lines().next().unwrap() - ); - - Ok(()) - } -} +//! Re-exports the [`datafusion_datasource_arrow::file_format`] module, and contains tests for it. +pub use datafusion_datasource_arrow::file_format::*; diff --git a/datafusion/core/src/datasource/physical_plan/arrow.rs b/datafusion/core/src/datasource/physical_plan/arrow.rs new file mode 100644 index 0000000000000..392eaa8c4be49 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/arrow.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reexports the [`datafusion_datasource_arrow::source`] module, containing [Arrow] based [`FileSource`]. +//! +//! [Arrow]: https://arrow.apache.org/docs/python/ipc.html +//! [`FileSource`]: datafusion_datasource::file::FileSource + +pub use datafusion_datasource_arrow::source::*; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3a9dedaa028f2..1ac292e260fdf 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -17,7 +17,7 @@ //! Execution plans that read file formats -mod arrow_file; +pub mod arrow; pub mod csv; pub mod json; @@ -35,10 +35,9 @@ pub use datafusion_datasource_parquet::source::ParquetSource; #[cfg(feature = "parquet")] pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFactory}; -pub use arrow_file::ArrowSource; - pub use json::{JsonOpener, JsonSource}; +pub use arrow::{ArrowOpener, ArrowSource}; pub use csv::{CsvOpener, CsvSource}; pub use datafusion_datasource::file::FileSource; pub use datafusion_datasource::file_groups::FileGroup; diff --git a/datafusion/datasource-arrow/Cargo.toml b/datafusion/datasource-arrow/Cargo.toml new file mode 100644 index 0000000000000..b3d1e3f2accc9 --- /dev/null +++ b/datafusion/datasource-arrow/Cargo.toml @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-datasource-arrow" +description = "datafusion-datasource-arrow" +readme = "README.md" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[package.metadata.docs.rs] +all-features = true + +[dependencies] +arrow = { workspace = true } +arrow-ipc = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +datafusion-common = { workspace = true, features = ["object_store"] } +datafusion-common-runtime = { workspace = true } +datafusion-datasource = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +datafusion-physical-plan = { workspace = true } +datafusion-session = { workspace = true } +futures = { workspace = true } +itertools = { workspace = true } +object_store = { workspace = true } +tokio = { workspace = true } + +[dev-dependencies] +chrono = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_datasource_arrow" +path = "src/mod.rs" + +[features] +compression = [ + "arrow-ipc/zstd", +] diff --git a/datafusion/datasource-arrow/LICENSE.txt b/datafusion/datasource-arrow/LICENSE.txt new file mode 100644 index 0000000000000..d74c6b599d2ae --- /dev/null +++ b/datafusion/datasource-arrow/LICENSE.txt @@ -0,0 +1,212 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + +This project includes code from Apache Aurora. + +* dev/release/{release,changelog,release-candidate} are based on the scripts from + Apache Aurora + +Copyright: 2016 The Apache Software Foundation. +Home page: https://aurora.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 diff --git a/datafusion/datasource-arrow/NOTICE.txt b/datafusion/datasource-arrow/NOTICE.txt new file mode 100644 index 0000000000000..7f3c80d606c07 --- /dev/null +++ b/datafusion/datasource-arrow/NOTICE.txt @@ -0,0 +1,5 @@ +Apache DataFusion +Copyright 2019-2025 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). diff --git a/datafusion/datasource-arrow/README.md b/datafusion/datasource-arrow/README.md new file mode 100644 index 0000000000000..9901b52105dd4 --- /dev/null +++ b/datafusion/datasource-arrow/README.md @@ -0,0 +1,34 @@ + + +# Apache DataFusion Arrow DataSource + +[Apache DataFusion] is an extensible query execution framework, written in Rust, that uses [Apache Arrow] as its in-memory format. + +This crate is a submodule of DataFusion that defines a Arrow based file source. +It works with files following the [Arrow IPC format]. + +Most projects should use the [`datafusion`] crate directly, which re-exports +this module. If you are already using the [`datafusion`] crate, there is no +reason to use this crate directly in your project as well. + +[apache arrow]: https://arrow.apache.org/ +[apache datafusion]: https://datafusion.apache.org/ +[`datafusion`]: https://crates.io/crates/datafusion +[arrow ipc format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs new file mode 100644 index 0000000000000..3b85640804219 --- /dev/null +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -0,0 +1,603 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions +//! +//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) + +use std::any::Any; +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::{self, Debug}; +use std::sync::Arc; + +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::ArrowError; +use arrow::ipc::convert::fb_to_schema; +use arrow::ipc::reader::FileReader; +use arrow::ipc::writer::IpcWriteOptions; +use arrow::ipc::{root_as_message, CompressionType}; +use datafusion_common::error::Result; +use datafusion_common::parsers::CompressionTypeVariant; +use datafusion_common::{ + internal_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics, + DEFAULT_ARROW_EXTENSION, +}; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; +use datafusion_datasource::display::FileGroupDisplay; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; +use datafusion_datasource::sink::{DataSink, DataSinkExec}; +use datafusion_datasource::write::{ + get_writer_schema, ObjectWriterBuilder, SharedBuffer, +}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::dml::InsertOp; +use datafusion_physical_expr_common::sort_expr::LexRequirement; + +use crate::source::ArrowSource; +use async_trait::async_trait; +use bytes::Bytes; +use datafusion_datasource::file_compression_type::FileCompressionType; +use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; +use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::write::demux::DemuxedStreamReceiver; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion_session::Session; +use futures::stream::BoxStream; +use futures::StreamExt; +use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; +use tokio::io::AsyncWriteExt; + +/// Initial writing buffer size. Note this is just a size hint for efficiency. It +/// will grow beyond the set value if needed. +const INITIAL_BUFFER_BYTES: usize = 1048576; + +/// If the buffered Arrow data exceeds this size, it is flushed to object store +const BUFFER_FLUSH_BYTES: usize = 1024000; + +#[derive(Default, Debug)] +/// Factory struct used to create [ArrowFormat] +pub struct ArrowFormatFactory; + +impl ArrowFormatFactory { + /// Creates an instance of [ArrowFormatFactory] + pub fn new() -> Self { + Self {} + } +} + +impl FileFormatFactory for ArrowFormatFactory { + fn create( + &self, + _state: &dyn Session, + _format_options: &HashMap, + ) -> Result> { + Ok(Arc::new(ArrowFormat)) + } + + fn default(&self) -> Arc { + Arc::new(ArrowFormat) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl GetExt for ArrowFormatFactory { + fn get_ext(&self) -> String { + // Removes the dot, i.e. ".parquet" -> "parquet" + DEFAULT_ARROW_EXTENSION[1..].to_string() + } +} + +/// Arrow `FileFormat` implementation. +#[derive(Default, Debug)] +pub struct ArrowFormat; + +#[async_trait] +impl FileFormat for ArrowFormat { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_ext(&self) -> String { + ArrowFormatFactory::new().get_ext() + } + + fn get_ext_with_compression( + &self, + file_compression_type: &FileCompressionType, + ) -> Result { + let ext = self.get_ext(); + match file_compression_type.get_variant() { + CompressionTypeVariant::UNCOMPRESSED => Ok(ext), + _ => Err(internal_datafusion_err!( + "Arrow FileFormat does not support compression." + )), + } + } + + fn compression_type(&self) -> Option { + None + } + + async fn infer_schema( + &self, + _state: &dyn Session, + store: &Arc, + objects: &[ObjectMeta], + ) -> Result { + let mut schemas = vec![]; + for object in objects { + let r = store.as_ref().get(&object.location).await?; + let schema = match r.payload { + #[cfg(not(target_arch = "wasm32"))] + GetResultPayload::File(mut file, _) => { + let reader = FileReader::try_new(&mut file, None)?; + reader.schema() + } + GetResultPayload::Stream(stream) => { + infer_schema_from_file_stream(stream).await? + } + }; + schemas.push(schema.as_ref().clone()); + } + let merged_schema = Schema::try_merge(schemas)?; + Ok(Arc::new(merged_schema)) + } + + async fn infer_stats( + &self, + _state: &dyn Session, + _store: &Arc, + table_schema: SchemaRef, + _object: &ObjectMeta, + ) -> Result { + Ok(Statistics::new_unknown(&table_schema)) + } + + async fn create_physical_plan( + &self, + _state: &dyn Session, + conf: FileScanConfig, + ) -> Result> { + let source = Arc::new(ArrowSource::default()); + let config = FileScanConfigBuilder::from(conf) + .with_source(source) + .build(); + + Ok(DataSourceExec::from_data_source(config)) + } + + async fn create_writer_physical_plan( + &self, + input: Arc, + _state: &dyn Session, + conf: FileSinkConfig, + order_requirements: Option, + ) -> Result> { + if conf.insert_op != InsertOp::Append { + return not_impl_err!("Overwrites are not implemented yet for Arrow format"); + } + + let sink = Arc::new(ArrowFileSink::new(conf)); + + Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) + } + + fn file_source(&self) -> Arc { + Arc::new(ArrowSource::default()) + } +} + +/// Implements [`FileSink`] for writing to arrow_ipc files +struct ArrowFileSink { + config: FileSinkConfig, +} + +impl ArrowFileSink { + fn new(config: FileSinkConfig) -> Self { + Self { config } + } +} + +#[async_trait] +impl FileSink for ArrowFileSink { + fn config(&self) -> &FileSinkConfig { + &self.config + } + + async fn spawn_writer_tasks_and_join( + &self, + context: &Arc, + demux_task: SpawnedTask>, + mut file_stream_rx: DemuxedStreamReceiver, + object_store: Arc, + ) -> Result { + let mut file_write_tasks: JoinSet> = + JoinSet::new(); + + let ipc_options = + IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)? + .try_with_compression(Some(CompressionType::LZ4_FRAME))?; + while let Some((path, mut rx)) = file_stream_rx.recv().await { + let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES); + let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options( + shared_buffer.clone(), + &get_writer_schema(&self.config), + ipc_options.clone(), + )?; + let mut object_store_writer = ObjectWriterBuilder::new( + FileCompressionType::UNCOMPRESSED, + &path, + Arc::clone(&object_store), + ) + .with_buffer_size(Some( + context + .session_config() + .options() + .execution + .objectstore_writer_buffer_size, + )) + .build()?; + file_write_tasks.spawn(async move { + let mut row_count = 0; + while let Some(batch) = rx.recv().await { + row_count += batch.num_rows(); + arrow_writer.write(&batch)?; + let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap(); + if buff_to_flush.len() > BUFFER_FLUSH_BYTES { + object_store_writer + .write_all(buff_to_flush.as_slice()) + .await?; + buff_to_flush.clear(); + } + } + arrow_writer.finish()?; + let final_buff = shared_buffer.buffer.try_lock().unwrap(); + + object_store_writer.write_all(final_buff.as_slice()).await?; + object_store_writer.shutdown().await?; + Ok(row_count) + }); + } + + let mut row_count = 0; + while let Some(result) = file_write_tasks.join_next().await { + match result { + Ok(r) => { + row_count += r?; + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + } + + demux_task + .join_unwind() + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + Ok(row_count as u64) + } +} + +impl Debug for ArrowFileSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ArrowFileSink").finish() + } +} + +impl DisplayAs for ArrowFileSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ArrowFileSink(file_groups=",)?; + FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?; + write!(f, ")") + } + DisplayFormatType::TreeRender => { + writeln!(f, "format: arrow")?; + write!(f, "file={}", &self.config.original_url) + } + } + } +} + +#[async_trait] +impl DataSink for ArrowFileSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> &SchemaRef { + self.config.output_schema() + } + + async fn write_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + FileSink::write_all(self, data, context).await + } +} + +const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; +const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; + +/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. +/// See +async fn infer_schema_from_file_stream( + mut stream: BoxStream<'static, object_store::Result>, +) -> Result { + // Expected format: + // - 6 bytes + // - 2 bytes + // - 4 bytes, not present below v0.15.0 + // - 4 bytes + // + // + + // So in first read we need at least all known sized sections, + // which is 6 + 2 + 4 + 4 = 16 bytes. + let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; + + // Files should start with these magic bytes + if bytes[0..6] != ARROW_MAGIC { + return Err(ArrowError::ParseError( + "Arrow file does not contain correct header".to_string(), + ))?; + } + + // Since continuation marker bytes added in later versions + let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER { + (&bytes[12..16], 16) + } else { + (&bytes[8..12], 12) + }; + + let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; + let meta_len = i32::from_le_bytes(meta_len); + + // Read bytes for Schema message + let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize { + // Need to read more bytes to decode Message + let mut block_data = Vec::with_capacity(meta_len as usize); + // In case we had some spare bytes in our initial read chunk + block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]); + let size_to_read = meta_len as usize - block_data.len(); + let block_data = + collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?; + Cow::Owned(block_data) + } else { + // Already have the bytes we need + let end_index = meta_len as usize + rest_of_bytes_start_index; + let block_data = &bytes[rest_of_bytes_start_index..end_index]; + Cow::Borrowed(block_data) + }; + + // Decode Schema message + let message = root_as_message(&block_data).map_err(|err| { + ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}")) + })?; + let ipc_schema = message.header_as_schema().ok_or_else(|| { + ArrowError::IpcError("Unable to read IPC message as schema".to_string()) + })?; + let schema = fb_to_schema(ipc_schema); + + Ok(Arc::new(schema)) +} + +async fn collect_at_least_n_bytes( + stream: &mut BoxStream<'static, object_store::Result>, + n: usize, + extend_from: Option>, +) -> Result> { + let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n)); + // If extending existing buffer then ensure we read n additional bytes + let n = n + buf.len(); + while let Some(bytes) = stream.next().await.transpose()? { + buf.extend_from_slice(&bytes); + if buf.len() >= n { + break; + } + } + if buf.len() < n { + return Err(ArrowError::ParseError( + "Unexpected end of byte stream for Arrow IPC file".to_string(), + ))?; + } + Ok(buf) +} + +#[cfg(test)] +mod tests { + use super::*; + + use chrono::DateTime; + use datafusion_common::config::TableOptions; + use datafusion_common::DFSchema; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path}; + + struct MockSession { + config: SessionConfig, + runtime_env: Arc, + } + + impl MockSession { + fn new() -> Self { + Self { + config: SessionConfig::new(), + runtime_env: Arc::new(RuntimeEnv::default()), + } + } + } + + #[async_trait::async_trait] + impl Session for MockSession { + fn session_id(&self) -> &str { + unimplemented!() + } + + fn config(&self) -> &SessionConfig { + &self.config + } + + async fn create_physical_plan( + &self, + _logical_plan: &LogicalPlan, + ) -> Result> { + unimplemented!() + } + + fn create_physical_expr( + &self, + _expr: Expr, + _df_schema: &DFSchema, + ) -> Result> { + unimplemented!() + } + + fn scalar_functions(&self) -> &HashMap> { + unimplemented!() + } + + fn aggregate_functions(&self) -> &HashMap> { + unimplemented!() + } + + fn window_functions(&self) -> &HashMap> { + unimplemented!() + } + + fn runtime_env(&self) -> &Arc { + &self.runtime_env + } + + fn execution_props(&self) -> &ExecutionProps { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + + fn table_options(&self) -> &TableOptions { + unimplemented!() + } + + fn table_options_mut(&mut self) -> &mut TableOptions { + unimplemented!() + } + + fn task_ctx(&self) -> Arc { + unimplemented!() + } + } + + #[tokio::test] + async fn test_infer_schema_stream() -> Result<()> { + let mut bytes = std::fs::read("tests/data/example.arrow")?; + bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file + let location = Path::parse("example.arrow")?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let state = MockSession::new(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + let arrow_format = ArrowFormat {}; + let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; + + // Test chunk sizes where too small so we keep having to read more bytes + // And when large enough that first read contains all we need + for chunk_size in [7, 3000] { + let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); + let inferred_schema = arrow_format + .infer_schema( + &state, + &(store.clone() as Arc), + std::slice::from_ref(&object_meta), + ) + .await?; + let actual_fields = inferred_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(expected, actual_fields); + } + + Ok(()) + } + + #[tokio::test] + async fn test_infer_schema_short_stream() -> Result<()> { + let mut bytes = std::fs::read("tests/data/example.arrow")?; + bytes.truncate(20); // should cause error that file shorter than expected + let location = Path::parse("example.arrow")?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let state = MockSession::new(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + let arrow_format = ArrowFormat {}; + + let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7)); + let err = arrow_format + .infer_schema( + &state, + &(store.clone() as Arc), + std::slice::from_ref(&object_meta), + ) + .await; + + assert!(err.is_err()); + assert_eq!( + "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", + err.unwrap_err().to_string().lines().next().unwrap() + ); + + Ok(()) + } +} diff --git a/datafusion/datasource-arrow/src/mod.rs b/datafusion/datasource-arrow/src/mod.rs new file mode 100644 index 0000000000000..18bb8792c3ffe --- /dev/null +++ b/datafusion/datasource-arrow/src/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Make sure fast / cheap clones on Arc are explicit: +// https://github.com/apache/datafusion/issues/11143 +#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] + +pub mod file_format; +pub mod source; + +pub use file_format::*; diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/datasource-arrow/src/source.rs similarity index 98% rename from datafusion/core/src/datasource/physical_plan/arrow_file.rs rename to datafusion/datasource-arrow/src/source.rs index b37dc499d4035..f43f11880182b 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -18,20 +18,21 @@ use std::any::Any; use std::sync::Arc; -use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; -use crate::error::Result; use datafusion_datasource::as_file_source; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use arrow::buffer::Buffer; use arrow::datatypes::SchemaRef; use arrow_ipc::reader::FileDecoder; +use datafusion_common::error::Result; use datafusion_common::{exec_datafusion_err, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::PartitionedFile; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_datasource::file_stream::FileOpenFuture; +use datafusion_datasource::file_stream::FileOpener; use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; diff --git a/datafusion/core/tests/data/example.arrow b/datafusion/datasource-arrow/tests/data/example.arrow similarity index 100% rename from datafusion/core/tests/data/example.arrow rename to datafusion/datasource-arrow/tests/data/example.arrow diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index 62453ec4bf3e6..b3975e0c3f471 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -29,7 +29,7 @@ statement ok CREATE EXTERNAL TABLE arrow_simple STORED AS ARROW -LOCATION '../core/tests/data/example.arrow'; +LOCATION '../datasource-arrow/tests/data/example.arrow'; # physical plan @@ -37,7 +37,7 @@ query TT EXPLAIN SELECT * FROM arrow_simple ---- logical_plan TableScan: arrow_simple projection=[f0, f1, f2] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, f1, f2], file_type=arrow +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/datasource-arrow/tests/data/example.arrow]]}, projection=[f0, f1, f2], file_type=arrow # correct content query ITB @@ -50,8 +50,8 @@ SELECT * FROM arrow_simple # Ensure that local files can not be read by default (a potential security issue) # (url table is only supported when DynamicFileCatalog is enabled) -statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/example.arrow' not found -SELECT * FROM '../core/tests/data/example.arrow'; +statement error DataFusion error: Error during planning: table 'datafusion.public.../datasource-arrow/tests/data/example.arrow' not found +SELECT * FROM '../datasource-arrow/tests/data/example.arrow'; # ARROW partitioned table statement ok diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 03ef08e1a5f83..bc6cbfab0caed 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -312,7 +312,7 @@ DROP TABLE aggregate_simple # Arrow format statement ok -CREATE external table arrow_simple STORED as ARROW LOCATION '../core/tests/data/example.arrow'; +CREATE external table arrow_simple STORED as ARROW LOCATION '../datasource-arrow/tests/data/example.arrow'; query ITB rowsort SELECT * FROM arrow_simple order by f1 LIMIT 1 @@ -796,7 +796,7 @@ logical_plan 02)--Values: (Int64(1), Int64(2), Int64(3)) query TT -explain CREATE EXTERNAL TEMPORARY TABLE tty STORED as ARROW LOCATION '../core/tests/data/example.arrow'; +explain CREATE EXTERNAL TEMPORARY TABLE tty STORED as ARROW LOCATION '../datasource-arrow/tests/data/example.arrow'; ---- logical_plan CreateExternalTable: Bare { table: "tty" } @@ -804,7 +804,7 @@ statement ok set datafusion.explain.logical_plan_only=false; statement error DataFusion error: This feature is not implemented: Temporary tables not supported -CREATE EXTERNAL TEMPORARY TABLE tty STORED as ARROW LOCATION '../core/tests/data/example.arrow'; +CREATE EXTERNAL TEMPORARY TABLE tty STORED as ARROW LOCATION '../datasource-arrow/tests/data/example.arrow'; statement error DataFusion error: This feature is not implemented: Temporary views not supported CREATE TEMPORARY VIEW y AS VALUES (1,2,3); diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index c536c8165c5a3..41718b3aebc27 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -244,7 +244,7 @@ DROP TABLE json_table; statement ok CREATE EXTERNAL TABLE arrow_table STORED AS ARROW -LOCATION '../core/tests/data/example.arrow'; +LOCATION '../datasource-arrow/tests/data/example.arrow'; # It would be great to see the file read as "4" groups with even sizes (offsets) eventually @@ -253,7 +253,7 @@ query TT EXPLAIN SELECT * FROM arrow_table ---- logical_plan TableScan: arrow_table projection=[f0, f1, f2] -physical_plan DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:0..461], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:461..922], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:922..1383], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:1383..1842]]}, projection=[f0, f1, f2], file_type=arrow +physical_plan DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/datasource-arrow/tests/data/example.arrow:0..461], [WORKSPACE_ROOT/datafusion/datasource-arrow/tests/data/example.arrow:461..922], [WORKSPACE_ROOT/datafusion/datasource-arrow/tests/data/example.arrow:922..1383], [WORKSPACE_ROOT/datafusion/datasource-arrow/tests/data/example.arrow:1383..1842]]}, projection=[f0, f1, f2], file_type=arrow # correct content query ITB diff --git a/dev/release/README.md b/dev/release/README.md index d70e256f73831..1b78f8d13be98 100644 --- a/dev/release/README.md +++ b/dev/release/README.md @@ -295,6 +295,7 @@ Verify that the Cargo.toml in the tarball contains the correct version (cd datafusion/catalog && cargo publish) (cd datafusion/catalog-listing && cargo publish) (cd datafusion/functions-table && cargo publish) +(cd datafusion/datasource-arrow && cargo publish) (cd datafusion/datasource-csv && cargo publish) (cd datafusion/datasource-json && cargo publish) (cd datafusion/datasource-parquet && cargo publish)