diff --git a/LICENSE.txt b/LICENSE.txt index 2a779b2ef93..d1c834b5df9 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -940,6 +940,33 @@ License: The MIT License (MIT) -------------------------------------------------------------------------------- +This project includes code from the cymove project: + +* python/pyarrow/includes/common.pxd includes code from the cymove project + +The MIT License (MIT) +Copyright (c) 2019 Omer Ozarslan + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +OR OTHER DEALINGS IN THE SOFTWARE. + +-------------------------------------------------------------------------------- + This project include code from CMake. * cpp/cmake_modules/FindGTest.cmake is based on code from CMake. diff --git a/ci/scripts/python_build.sh b/ci/scripts/python_build.sh index 22adf327044..5d93afa7ac2 100755 --- a/ci/scripts/python_build.sh +++ b/ci/scripts/python_build.sh @@ -32,6 +32,7 @@ export PYARROW_WITH_FLIGHT=${ARROW_FLIGHT:-OFF} export PYARROW_WITH_PLASMA=${ARROW_PLASMA:-OFF} export PYARROW_WITH_GANDIVA=${ARROW_GANDIVA:-OFF} export PYARROW_WITH_PARQUET=${ARROW_PARQUET:-OFF} +export PYARROW_WITH_DATASET=${ARROW_DATASET:-OFF} export LD_LIBRARY_PATH=${ARROW_HOME}/lib:${LD_LIBRARY_PATH} diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index a17845e1210..aa19ef328c6 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -97,7 +97,8 @@ class ARROW_DS_EXPORT DataSource { /// May be null, which indicates no information is available. const ExpressionPtr& partition_expression() const { return partition_expression_; } - virtual std::string type() const = 0; + /// \brief The name identifying the kind of data source + virtual std::string type_name() const = 0; virtual ~DataSource() = default; @@ -123,7 +124,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override; - std::string type() const override { return "simple_data_source"; } + std::string type_name() const override { return "simple"; } private: DataFragmentVector fragments_; @@ -136,7 +137,7 @@ class ARROW_DS_EXPORT TreeDataSource : public DataSource { DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override; - std::string type() const override { return "tree_data_source"; } + std::string type_name() const override { return "tree"; } private: DataSourceVector children_; @@ -149,7 +150,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { /// \brief Build a Dataset from uniform sources. // /// \param[in] sources one or more input data sources - /// \param[in] schema a known schema to conform to, may be nullptr + /// \param[in] schema a known schema to conform to static Result Make(DataSourceVector sources, std::shared_ptr schema); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 69e69e2d0fd..0fd91bbbe72 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -46,6 +46,7 @@ class Filter; /// be read like a file class ARROW_DS_EXPORT FileSource { public: + // NOTE(kszucs): it'd be better to separate the BufferSource from FileSource enum SourceType { PATH, BUFFER }; FileSource(std::string path, fs::FileSystem* filesystem, @@ -130,7 +131,8 @@ class ARROW_DS_EXPORT FileFormat { public: virtual ~FileFormat() = default; - virtual std::string name() const = 0; + /// \brief The name identifying the kind of file format + virtual std::string type_name() const = 0; /// \brief Indicate if the FileSource is supported/readable by this format. virtual Result IsSupported(const FileSource& source) const = 0; @@ -215,7 +217,7 @@ class ARROW_DS_EXPORT FileSystemDataSource : public DataSource { ExpressionVector partitions, ExpressionPtr source_partition, FileFormatPtr format); - std::string type() const override { return "filesystem_data_source"; } + std::string type_name() const override { return "filesystem"; } std::string ToString() const; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 2998bb96762..5d33bbf3de5 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -50,7 +50,7 @@ class ParquetScanTask : public ScanTask { column_projection_(std::move(column_projection)), reader_(reader) {} - Result Scan() { + Result Execute() { // The construction of parquet's RecordBatchReader is deferred here to // control the memory usage of consumers who materialize all ScanTasks // before dispatching them, e.g. for scheduling purposes. diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 9a93171beae..38ba3b37af6 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -46,7 +46,7 @@ class ARROW_DS_EXPORT ParquetWriteOptions : public FileWriteOptions { /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: - std::string name() const override { return "parquet"; } + std::string type_name() const override { return "parquet"; } Result IsSupported(const FileSource& source) const override; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 913ac00a94b..82bd4c22b2d 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -174,7 +174,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { for (auto maybe_task : scan_task_it) { ASSERT_OK_AND_ASSIGN(auto task, std::move(maybe_task)); - ASSERT_OK_AND_ASSIGN(auto rb_it, task->Scan()); + ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute()); for (auto maybe_batch : rb_it) { ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); row_count += batch->num_rows(); @@ -221,7 +221,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { for (auto maybe_task : scan_task_it) { ASSERT_OK_AND_ASSIGN(auto task, std::move(maybe_task)); - ASSERT_OK_AND_ASSIGN(auto rb_it, task->Scan()); + ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute()); for (auto maybe_batch : rb_it) { ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); row_count += batch->num_rows(); @@ -262,7 +262,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { for (auto maybe_task : scan_task_it) { ASSERT_OK_AND_ASSIGN(auto task, std::move(maybe_task)); - ASSERT_OK_AND_ASSIGN(auto rb_it, task->Scan()); + ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute()); for (auto maybe_batch : rb_it) { ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); row_count += batch->num_rows(); @@ -307,7 +307,7 @@ void CountRowsInScan(ScanTaskIterator& it, int64_t expected_rows, for (auto maybe_scan_task : it) { ASSERT_OK_AND_ASSIGN(auto scan_task, std::move(maybe_scan_task)); - ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Scan()); + ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Execute()); for (auto maybe_record_batch : rb_it) { ASSERT_OK_AND_ASSIGN(auto record_batch, std::move(maybe_record_batch)); actual_rows += record_batch->num_rows(); @@ -329,7 +329,7 @@ class TestParquetFileFormatPushDown : public TestParquetFileFormat { ASSERT_OK_AND_ASSIGN(auto it, fragment.Scan(ctx_)); for (auto maybe_scan_task : it) { ASSERT_OK_AND_ASSIGN(auto scan_task, std::move(maybe_scan_task)); - ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Scan()); + ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Execute()); for (auto maybe_record_batch : rb_it) { ASSERT_OK_AND_ASSIGN(auto record_batch, std::move(maybe_record_batch)); actual_rows += record_batch->num_rows(); diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index 918045f3fbf..46b3caa2368 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -184,6 +184,7 @@ class ARROW_DS_EXPORT Expression { /// returns a debug string representing this expression virtual std::string ToString() const = 0; + /// \brief Return the expression's type identifier ExpressionType::type type() const { return type_; } /// Copy this expression into a shared pointer. diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 272e8f98a11..d50b798852b 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -54,17 +54,6 @@ Result PartitionScheme::Parse(const std::string& path) const { return and_(std::move(expressions)); } -class DefaultPartitionScheme : public PartitionScheme { - public: - DefaultPartitionScheme() : PartitionScheme(::arrow::schema({})) {} - - std::string name() const override { return "default_partition_scheme"; } - - Result Parse(const std::string& segment, int i) const override { - return scalar(true); - } -}; - PartitionSchemePtr PartitionScheme::Default() { return std::make_shared(); } diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 3f131e0db59..f7856789a0c 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -60,7 +60,7 @@ class ARROW_DS_EXPORT PartitionScheme { virtual ~PartitionScheme() = default; /// \brief The name identifying the kind of partition scheme - virtual std::string name() const = 0; + virtual std::string type_name() const = 0; /// \brief Parse a path segment into a partition expression /// @@ -108,6 +108,18 @@ class ARROW_DS_EXPORT PartitionSchemeDiscovery { std::shared_ptr schema_; }; +/// \brief Subclass for representing the default, always true scheme. +class DefaultPartitionScheme : public PartitionScheme { + public: + DefaultPartitionScheme() : PartitionScheme(::arrow::schema({})) {} + + std::string type_name() const override { return "default"; } + + Result Parse(const std::string& segment, int i) const override { + return scalar(true); + } +}; + /// \brief Subclass for looking up partition information from a dictionary /// mapping segments to expressions provided on construction. class ARROW_DS_EXPORT SegmentDictionaryPartitionScheme : public PartitionScheme { @@ -117,7 +129,7 @@ class ARROW_DS_EXPORT SegmentDictionaryPartitionScheme : public PartitionScheme std::vector> dictionaries) : PartitionScheme(std::move(schema)), dictionaries_(std::move(dictionaries)) {} - std::string name() const override { return "segment_dictionary_partition_scheme"; } + std::string type_name() const override { return "segment_dictionary"; } /// Return dictionaries_[i][segment] or scalar(true) Result Parse(const std::string& segment, int i) const override; @@ -160,7 +172,7 @@ class ARROW_DS_EXPORT SchemaPartitionScheme : public PartitionKeysScheme { explicit SchemaPartitionScheme(std::shared_ptr schema) : PartitionKeysScheme(std::move(schema)) {} - std::string name() const override { return "schema_partition_scheme"; } + std::string type_name() const override { return "schema"; } util::optional ParseKey(const std::string& segment, int i) const override; @@ -181,7 +193,7 @@ class ARROW_DS_EXPORT HivePartitionScheme : public PartitionKeysScheme { explicit HivePartitionScheme(std::shared_ptr schema) : PartitionKeysScheme(std::move(schema)) {} - std::string name() const override { return "hive_partition_scheme"; } + std::string type_name() const override { return "hive"; } util::optional ParseKey(const std::string& segment, int i) const override { return ParseKey(segment); @@ -198,12 +210,12 @@ class ARROW_DS_EXPORT FunctionPartitionScheme : public PartitionScheme { explicit FunctionPartitionScheme( std::shared_ptr schema, std::function(const std::string&, int)> impl, - std::string name = "function_partition_scheme") + std::string name = "function") : PartitionScheme(std::move(schema)), impl_(std::move(impl)), name_(std::move(name)) {} - std::string name() const override { return name_; } + std::string type_name() const override { return name_; } Result Parse(const std::string& segment, int i) const override { return impl_(segment, i); diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 1096a0d6f8d..b722a01ddc7 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -229,7 +229,7 @@ class RangePartitionScheme : public HivePartitionScheme { public: using HivePartitionScheme::HivePartitionScheme; - std::string name() const override { return "range_partition_scheme"; } + std::string type_name() const override { return "range"; } Result Parse(const std::string& segment, int i) const override { ExpressionVector ranges; diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 45b4627ace8..1d053af4290 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -46,7 +46,7 @@ ScanOptionsPtr ScanOptions::ReplaceSchema(std::shared_ptr schema) const return copy; } -Result SimpleScanTask::Scan() { +Result SimpleScanTask::Execute() { return MakeVectorIterator(record_batches_); } @@ -169,7 +169,7 @@ struct TableAggregator { struct ScanTaskPromise { Status operator()() { - ARROW_ASSIGN_OR_RAISE(auto it, task->Scan()); + ARROW_ASSIGN_OR_RAISE(auto it, task->Execute()); for (auto maybe_batch : it) { ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); aggregator.Append(std::move(batch)); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 824e3a622e3..aa6d9061f50 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -83,9 +83,9 @@ class ARROW_DS_EXPORT ScanOptions { class ARROW_DS_EXPORT ScanTask { public: /// \brief Iterate through sequence of materialized record batches - /// resulting from the Scan. Execution semantics encapsulated in the + /// resulting from the Scan. Execution semantics are encapsulated in the /// particular ScanTask implementation - virtual Result Scan() = 0; + virtual Result Execute() = 0; virtual ~ScanTask() = default; @@ -108,7 +108,7 @@ class ARROW_DS_EXPORT SimpleScanTask : public ScanTask { : ScanTask(std::move(options), std::move(context)), record_batches_(std::move(record_batches)) {} - Result Scan() override; + Result Execute() override; protected: std::vector> record_batches_; @@ -165,7 +165,7 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Set the subset of columns to materialize. /// - /// This subset wil be passed down to DataSources and corresponding DataFragments. + /// This subset will be passed down to DataSources and corresponding DataFragments. /// The goal is to avoid loading/copying/deserializing columns that will /// not be required further down the compute chain. /// @@ -181,7 +181,6 @@ class ARROW_DS_EXPORT ScannerBuilder { /// The predicate will be passed down to DataSources and corresponding /// DataFragments to exploit predicate pushdown if possible using /// partition information or DataFragment internal metadata, e.g. Parquet statistics. - /// statistics. /// /// \param[in] filter expression to filter rows with. /// diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 0ef2f1d3bd0..a2f9ee864d8 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -53,8 +53,8 @@ class FilterAndProjectScanTask : public ScanTask { explicit FilterAndProjectScanTask(ScanTaskPtr task) : ScanTask(task->options(), task->context()), task_(std::move(task)) {} - Result Scan() override { - ARROW_ASSIGN_OR_RAISE(auto it, task_->Scan()); + Result Execute() override { + ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); auto filter_it = FilterRecordBatch(std::move(it), *options_->evaluator, *options_->filter, context_->pool); return ProjectRecordBatch(std::move(filter_it), &task_->options()->projector, diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 08ac065f920..0c7dc1a8d46 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -87,7 +87,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto it, task->Scan()); + ASSERT_OK_AND_ASSIGN(auto it, task->Execute()); ARROW_EXPECT_OK(it.Visit([expected](std::shared_ptr rhs) -> Status { std::shared_ptr lhs; RETURN_NOT_OK(expected->ReadNext(&lhs)); @@ -179,7 +179,7 @@ class DummyFileFormat : public FileFormat { explicit DummyFileFormat(std::shared_ptr schema = NULLPTR) : schema_(std::move(schema)) {} - std::string name() const override { return "dummy"; } + std::string type_name() const override { return "dummy"; } Result IsSupported(const FileSource& source) const override { return true; } @@ -223,7 +223,7 @@ class JSONRecordBatchFileFormat : public FileFormat { explicit JSONRecordBatchFileFormat(SchemaResolver resolver) : resolver_(std::move(resolver)) {} - std::string name() const override { return "json_record_batch"; } + std::string type_name() const override { return "json_record_batch"; } /// \brief Return true if the given file extension Result IsSupported(const FileSource& source) const override { return true; } diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index c5fa8aefdb9..8f34a0e0830 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -155,6 +155,8 @@ class ARROW_EXPORT FileSystem { public: virtual ~FileSystem(); + virtual std::string type_name() const = 0; + /// Get statistics for the given target. /// /// Any symlink is automatically dereferenced, recursively. @@ -243,6 +245,8 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { std::shared_ptr base_fs); ~SubTreeFileSystem() override; + std::string type_name() const override { return "subtree"; } + /// \cond FALSE using FileSystem::GetTargetStats; /// \endcond @@ -289,6 +293,8 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { SlowFileSystem(std::shared_ptr base_fs, double average_latency, int32_t seed); + std::string type_name() const override { return "slow"; } + using FileSystem::GetTargetStats; Result GetTargetStats(const std::string& path) override; Result> GetTargetStats(const Selector& select) override; diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h index 304293a8f6a..3130ecd4465 100644 --- a/cpp/src/arrow/filesystem/hdfs.h +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -61,6 +61,8 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { public: ~HadoopFileSystem() override; + std::string type_name() const override { return "hdfs"; } + /// \cond FALSE using FileSystem::GetTargetStats; /// \endcond diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h index ebc07d78cae..ed25efb424f 100644 --- a/cpp/src/arrow/filesystem/localfs.h +++ b/cpp/src/arrow/filesystem/localfs.h @@ -48,6 +48,8 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { explicit LocalFileSystem(const LocalFileSystemOptions&); ~LocalFileSystem() override; + std::string type_name() const override { return "local"; } + /// \cond FALSE using FileSystem::GetTargetStats; /// \endcond diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h index e925f03e7f6..b0e8e417bff 100644 --- a/cpp/src/arrow/filesystem/mockfs.h +++ b/cpp/src/arrow/filesystem/mockfs.h @@ -60,6 +60,8 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { explicit MockFileSystem(TimePoint current_time); ~MockFileSystem() override; + std::string type_name() const override { return "mock"; } + // XXX It's not very practical to have to explicitly declare inheritance // of default overrides. using FileSystem::GetTargetStats; diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index d3f83cc319f..c57c3a8819f 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -79,6 +79,8 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { public: ~S3FileSystem() override; + std::string type_name() const override { return "s3"; } + /// \cond FALSE using FileSystem::GetTargetStats; /// \endcond diff --git a/cpp/src/arrow/python/datetime.h b/cpp/src/arrow/python/datetime.h index efeb13e6add..30d58372e77 100644 --- a/cpp/src/arrow/python/datetime.h +++ b/cpp/src/arrow/python/datetime.h @@ -113,6 +113,11 @@ inline int64_t PyDateTime_to_ns(PyDateTime_DateTime* pydatetime) { return PyDateTime_to_us(pydatetime) * 1000; } +ARROW_PYTHON_EXPORT +inline TimePoint PyDateTime_to_TimePoint(PyDateTime_DateTime* pydatetime) { + return TimePoint(TimePoint::duration(PyDateTime_to_ns(pydatetime))); +} + ARROW_PYTHON_EXPORT inline int64_t PyDelta_to_s(PyDateTime_Delta* pytimedelta) { int64_t total_seconds = 0; diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index a248582b7b3..466da68af46 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -60,6 +60,7 @@ endif() if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF) option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF) + option(PYARROW_BUILD_DATASET "Build the PyArrow Dataset integration" OFF) option(PYARROW_BUILD_GANDIVA "Build the PyArrow Gandiva integration" OFF) option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF) option(PYARROW_PARQUET_USE_SHARED "Rely on parquet shared libraries where relevant" ON) @@ -395,6 +396,22 @@ if(PYARROW_BUILD_CUDA) set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _cuda) endif() +# Dataset +if(PYARROW_BUILD_DATASET) + # Arrow Dataset + find_package(ArrowDataset REQUIRED) + + if(PYARROW_BUNDLE_ARROW_CPP) + bundle_arrow_lib(ARROW_DATASET_SHARED_LIB SO_VERSION ${ARROW_SO_VERSION}) + if(MSVC) + bundle_arrow_import_lib(ARROW_DATASET_IMPORT_LIB) + endif() + endif() + + set(DATASET_LINK_LIBS arrow_dataset_shared) + set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _dataset) +endif() + if(PYARROW_BUILD_PARQUET) # Parquet find_package(Parquet REQUIRED) @@ -567,6 +584,10 @@ if(PYARROW_BUILD_FLIGHT) target_link_libraries(_flight ${FLIGHT_LINK_LIBS}) endif() +if(PYARROW_BUILD_DATASET) + target_link_libraries(_dataset ${DATASET_LINK_LIBS}) +endif() + if(PYARROW_BUILD_GANDIVA) target_link_libraries(gandiva ${GANDIVA_LINK_LIBS}) endif() diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx new file mode 100644 index 00000000000..c180e0cb144 --- /dev/null +++ b/python/pyarrow/_dataset.pyx @@ -0,0 +1,976 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from __future__ import absolute_import + +from cython.operator cimport dereference as deref + +from pyarrow.lib cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.compat import frombytes, tobytes +from pyarrow._fs cimport FileSystem, FileStats, Selector + + +def _forbid_instantiation(klass, subclasses_instead=True): + msg = '{} is an abstract class thus cannot be initialized.'.format( + klass.__name__ + ) + if subclasses_instead: + subclasses = [cls.__name__ for cls in klass.__subclasses__] + msg += ' Use one of the subclasses instead: {}'.format( + ', '.join(subclasses) + ) + raise TypeError(msg) + + +cdef class FileFormat: + + cdef: + shared_ptr[CFileFormat] wrapped + CFileFormat* format + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef void init(self, const shared_ptr[CFileFormat]& sp): + self.wrapped = sp + self.format = sp.get() + + @staticmethod + cdef wrap(shared_ptr[CFileFormat]& sp): + cdef FileFormat self + + typ = frombytes(sp.get().type_name()) + if typ == 'parquet': + self = ParquetFileFormat.__new__(ParquetFileFormat) + else: + raise TypeError(typ) + + self.init(sp) + return self + + cdef inline shared_ptr[CFileFormat] unwrap(self): + return self.wrapped + + +cdef class ParquetFileFormat(FileFormat): + + def __init__(self): + self.init(shared_ptr[CFileFormat](new CParquetFileFormat())) + + +cdef class PartitionScheme: + + cdef: + shared_ptr[CPartitionScheme] wrapped + CPartitionScheme* scheme + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef init(self, const shared_ptr[CPartitionScheme]& sp): + self.wrapped = sp + self.scheme = sp.get() + + @staticmethod + cdef wrap(const shared_ptr[CPartitionScheme]& sp): + cdef PartitionScheme self + + typ = frombytes(sp.get().type_name()) + if typ == 'default': + self = DefaultPartitionScheme.__new__(DefaultPartitionScheme) + elif typ == 'schema': + self = SchemaPartitionScheme.__new__(SchemaPartitionScheme) + elif typ == 'hive': + self = HivePartitionScheme.__new__(HivePartitionScheme) + else: + raise TypeError(typ) + + self.init(sp) + return self + + cdef inline shared_ptr[CPartitionScheme] unwrap(self): + return self.wrapped + + def parse(self, path): + cdef CResult[shared_ptr[CExpression]] result + result = self.scheme.Parse(tobytes(path)) + return Expression.wrap(GetResultValue(result)) + + @property + def schema(self): + return pyarrow_wrap_schema(self.scheme.schema()) + + +cdef class DefaultPartitionScheme(PartitionScheme): + + cdef: + CDefaultPartitionScheme* default_scheme + + def __init__(self): + cdef shared_ptr[CDefaultPartitionScheme] scheme + scheme = make_shared[CDefaultPartitionScheme]() + self.init( scheme) + + cdef init(self, const shared_ptr[CPartitionScheme]& sp): + PartitionScheme.init(self, sp) + self.default_scheme = sp.get() + + +cdef class SchemaPartitionScheme(PartitionScheme): + + cdef: + CSchemaPartitionScheme* schema_scheme + + def __init__(self, Schema schema not None): + cdef shared_ptr[CSchemaPartitionScheme] scheme + scheme = make_shared[CSchemaPartitionScheme]( + pyarrow_unwrap_schema(schema) + ) + self.init( scheme) + + cdef init(self, const shared_ptr[CPartitionScheme]& sp): + PartitionScheme.init(self, sp) + self.schema_scheme = sp.get() + + +cdef class HivePartitionScheme(PartitionScheme): + + cdef: + CHivePartitionScheme* hive_scheme + + def __init__(self, Schema schema not None): + cdef shared_ptr[CHivePartitionScheme] scheme + scheme = make_shared[CHivePartitionScheme]( + pyarrow_unwrap_schema(schema) + ) + self.init( scheme) + + cdef init(self, const shared_ptr[CPartitionScheme]& sp): + PartitionScheme.init(self, sp) + self.hive_scheme = sp.get() + + +cdef class FileSystemDiscoveryOptions: + + cdef: + CFileSystemDiscoveryOptions options + + __slots__ = () # avoid mistakingly creating attributes + + def __init__(self, partition_base_dir=None, exclude_invalid_files=None, + list ignore_prefixes=None): + if partition_base_dir is not None: + self.partition_base_dir = partition_base_dir + if exclude_invalid_files is not None: + self.exclude_invalid_files = exclude_invalid_files + if ignore_prefixes is not None: + self.ignore_prefixes = ignore_prefixes + + cdef inline CFileSystemDiscoveryOptions unwrap(self): + return self.options + + @property + def partition_base_dir(self): + return frombytes(self.options.partition_base_dir) + + @partition_base_dir.setter + def partition_base_dir(self, value): + self.options.partition_base_dir = tobytes(value) + + @property + def exclude_invalid_files(self): + return self.options.exclude_invalid_files + + @exclude_invalid_files.setter + def exclude_invalid_files(self, bint value): + self.options.exclude_invalid_files = value + + @property + def ignore_prefixes(self): + return [frombytes(p) for p in self.options.ignore_prefixes] + + @ignore_prefixes.setter + def ignore_prefixes(self, values): + self.options.ignore_prefixes = [tobytes(v) for v in values] + + +cdef class DataSourceDiscovery: + + cdef: + shared_ptr[CDataSourceDiscovery] wrapped + CDataSourceDiscovery* discovery + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef init(self, shared_ptr[CDataSourceDiscovery]& sp): + self.wrapped = sp + self.discovery = sp.get() + + @staticmethod + cdef wrap(shared_ptr[CDataSourceDiscovery]& sp): + cdef DataSourceDiscovery self = \ + DataSourceDiscovery.__new__(DataSourceDiscovery) + self.init(sp) + return self + + cdef inline shared_ptr[CDataSourceDiscovery] unwrap(self) nogil: + return self.wrapped + + @property + def partition_scheme(self): + cdef shared_ptr[CPartitionScheme] scheme + scheme = self.discovery.partition_scheme() + if scheme.get() == nullptr: + return None + else: + return PartitionScheme.wrap(scheme) + + @partition_scheme.setter + def partition_scheme(self, PartitionScheme scheme not None): + check_status(self.discovery.SetPartitionScheme(scheme.unwrap())) + + @property + def root_partition(self): + cdef shared_ptr[CExpression] expr = self.discovery.root_partition() + if expr.get() == nullptr: + return None + else: + return Expression.wrap(expr) + + @root_partition.setter + def root_partition(self, Expression expr): + check_status(self.discovery.SetRootPartition(expr.unwrap())) + + def inspect(self): + cdef CResult[shared_ptr[CSchema]] result + with nogil: + result = self.discovery.Inspect() + return pyarrow_wrap_schema(GetResultValue(result)) + + def finish(self): + cdef CResult[shared_ptr[CDataSource]] result + with nogil: + result = self.discovery.Finish() + return DataSource.wrap(GetResultValue(result)) + + +cdef class FileSystemDataSourceDiscovery(DataSourceDiscovery): + + cdef: + CFileSystemDataSourceDiscovery* filesystem_discovery + + def __init__(self, FileSystem filesystem not None, paths_or_selector, + FileFormat format not None, + FileSystemDiscoveryOptions options=None): + cdef: + FileStats file_stats + vector[CFileStats] stats + CResult[shared_ptr[CDataSourceDiscovery]] result + + options = options or FileSystemDiscoveryOptions() + for file_stats in filesystem.get_target_stats(paths_or_selector): + stats.push_back(file_stats.unwrap()) + + result = CFileSystemDataSourceDiscovery.MakeFromFileStats( + filesystem.unwrap(), + stats, + format.unwrap(), + options.unwrap() + ) + self.init(GetResultValue(result)) + + cdef init(self, shared_ptr[CDataSourceDiscovery]& sp): + DataSourceDiscovery.init(self, sp) + self.filesystem_discovery = sp.get() + + +cdef class DataSource: + """Basic component of a Dataset which yields zero or more data fragments. + + A DataSource acts as a discovery mechanism of data fragments and + partitions, e.g. files deeply nested in a directory. + """ + + cdef: + shared_ptr[CDataSource] wrapped + CDataSource* source + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef void init(self, const shared_ptr[CDataSource]& sp): + self.wrapped = sp + self.source = sp.get() + + @staticmethod + cdef wrap(shared_ptr[CDataSource]& sp): + cdef DataSource self + + typ = frombytes(sp.get().type_name()) + if typ == 'tree': + self = TreeDataSource.__new__(TreeDataSource) + elif typ == 'filesystem': + self = FileSystemDataSource.__new__(FileSystemDataSource) + else: + raise TypeError(typ) + + self.init(sp) + return self + + cdef shared_ptr[CDataSource] unwrap(self) nogil: + return self.wrapped + + @property + def partition_expression(self): + cdef shared_ptr[CExpression] expression + expression = self.source.partition_expression() + if expression.get() == nullptr: + return None + else: + return Expression.wrap(expression) + + +cdef class TreeDataSource(DataSource): + """A DataSource created from other data source objects""" + + cdef: + CTreeDataSource* tree_source + + def __init__(self, data_sources): + cdef: + DataSource child + CDataSourceVector children + shared_ptr[CTreeDataSource] tree_source + + for child in data_sources: + children.push_back(child.wrapped) + + tree_source = make_shared[CTreeDataSource](children) + self.init( tree_source) + + cdef void init(self, const shared_ptr[CDataSource]& sp): + DataSource.init(self, sp) + self.tree_source = sp.get() + + +cdef class FileSystemDataSource(DataSource): + """A DataSource created from a set of files on a particular filesystem""" + + cdef: + CFileSystemDataSource* filesystem_source + + def __init__(self, FileSystem filesystem not None, paths_or_selector, + partitions, Expression source_partition, + FileFormat file_format not None): + """Create a FileSystemDataSource + + Parameters + ---------- + filesystem : FileSystem + Filesystem to discover. + paths_or_selector : Union[Selector, List[FileStats]] + The file stats object can be queried by the + filesystem.get_target_stats method. + partitions : List[Expression] + source_partition : Expression + file_format : FileFormat + """ + cdef: + FileStats stats + Expression expression + vector[CFileStats] c_file_stats + shared_ptr[CExpression] c_source_partition + vector[shared_ptr[CExpression]] c_partitions + CResult[shared_ptr[CDataSource]] result + + for stats in filesystem.get_target_stats(paths_or_selector): + c_file_stats.push_back(stats.unwrap()) + + for expression in partitions: + c_partitions.push_back(expression.unwrap()) + + if c_file_stats.size() != c_partitions.size(): + raise ValueError( + 'The number of files resulting from paths_or_selector must be ' + 'equal to the number of partitions.' + ) + + if source_partition is not None: + c_source_partition = source_partition.unwrap() + + result = CFileSystemDataSource.Make( + filesystem.unwrap(), + c_file_stats, + c_partitions, + c_source_partition, + file_format.unwrap() + ) + self.init(GetResultValue(result)) + + cdef void init(self, const shared_ptr[CDataSource]& sp): + DataSource.init(self, sp) + self.filesystem_source = sp.get() + + +cdef class Dataset: + """Collection of data fragments coming from possibly multiple sources.""" + + cdef: + shared_ptr[CDataset] wrapped + CDataset* dataset + + def __init__(self, data_sources, Schema schema not None): + """Create a dataset + + A schema must be passed because most of the data sources' schema is + unknown before executing possibly expensive scanning operation, but + projecting, filtering, predicate pushduwn requires a well defined + schema to work on. + + Parameters + ---------- + data_sources : list of DataSource + One or more input data sources + schema : Schema + A known schema to conform to. + """ + cdef: + DataSource source + CDataSourceVector sources + CResult[CDatasetPtr] result + + for source in data_sources: + sources.push_back(source.unwrap()) + + result = CDataset.Make(sources, pyarrow_unwrap_schema(schema)) + self.init(GetResultValue(result)) + + cdef void init(self, const shared_ptr[CDataset]& sp): + self.wrapped = sp + self.dataset = sp.get() + + cdef inline shared_ptr[CDataset] unwrap(self) nogil: + return self.wrapped + + def new_scan(self, MemoryPool memory_pool=None): + """Begin to build a new Scan operation against this Dataset.""" + cdef: + shared_ptr[CScanContext] context = make_shared[CScanContext]() + CResult[shared_ptr[CScannerBuilder]] result + context.get().pool = maybe_unbox_memory_pool(memory_pool) + result = self.dataset.NewScanWithContext(context) + return ScannerBuilder.wrap(GetResultValue(result)) + + @property + def sources(self): + cdef vector[shared_ptr[CDataSource]] sources = self.dataset.sources() + return [DataSource.wrap(source) for source in sources] + + @property + def schema(self): + return pyarrow_wrap_schema(self.dataset.schema()) + + +cdef class ScanTask: + """Read record batches from a range of a single data fragment. + + A ScanTask is meant to be a unit of work to be dispatched. + """ + + cdef: + shared_ptr[CScanTask] wrapped + CScanTask* task + + def __init__(self): + _forbid_instantiation(self.__class__, subclasses_instead=False) + + cdef init(self, shared_ptr[CScanTask]& sp): + self.wrapped = sp + self.task = self.wrapped.get() + + @staticmethod + cdef wrap(shared_ptr[CScanTask]& sp): + cdef SimpleScanTask self = SimpleScanTask.__new__(SimpleScanTask) + self.init(sp) + return self + + cdef inline shared_ptr[CScanTask] unwrap(self) nogil: + return self.wrapped + + def execute(self): + """Iterate through sequence of materialized record batches. + + Execution semantics are encapsulated in the particular ScanTask + implementation. + + Returns + ------- + record_batches : iterator of RecordBatch + """ + cdef: + CRecordBatchIterator iterator + shared_ptr[CRecordBatch] record_batch + + iterator = move(GetResultValue(move(self.task.Execute()))) + + while True: + record_batch = GetResultValue(iterator.Next()) + if record_batch.get() == nullptr: + raise StopIteration() + else: + yield pyarrow_wrap_batch(record_batch) + + +cdef class SimpleScanTask(ScanTask): + """A trivial ScanTask that yields the RecordBatch of an array.""" + + cdef: + CSimpleScanTask* simple_task + + cdef init(self, shared_ptr[CScanTask]& sp): + ScanTask.init(self, sp) + self.simple_task = sp.get() + + +cdef class ScannerBuilder: + """Factory class to construct a Scanner. + + It is used to pass information, notably a potential filter expression and a + subset of columns to materialize. + """ + + cdef: + shared_ptr[CScannerBuilder] wrapped + CScannerBuilder* builder + + def __init__(self, Dataset dataset not None, MemoryPool memory_pool=None): + cdef: + shared_ptr[CScannerBuilder] builder + shared_ptr[CScanContext] context = make_shared[CScanContext]() + context.get().pool = maybe_unbox_memory_pool(memory_pool) + builder = make_shared[CScannerBuilder](dataset.unwrap(), context) + self.init(builder) + + cdef void init(self, shared_ptr[CScannerBuilder]& sp): + self.wrapped = sp + self.builder = sp.get() + + @staticmethod + cdef wrap(shared_ptr[CScannerBuilder]& sp): + cdef ScannerBuilder self = ScannerBuilder.__new__(ScannerBuilder) + self.init(sp) + return self + + cdef inline shared_ptr[CScannerBuilder] unwrap(self) nogil: + return self.wrapped + + def project(self, columns): + """Set the subset of columns to materialize. + + This subset will be passed down to DataSources and corresponding + data fragments. The goal is to avoid loading, copying, and + deserializing columns that will not be required further down the + compute chain. + + It alters the object in place and returns the object itself enabling + method chaining. Raises exception if any of the referenced column names + does not exists in the dataset's Schema. + + Parameters + ---------- + columns : list of str + List of columns to project. Order and duplicates will be preserved. + + Returns + ------- + self : ScannerBuilder + """ + cdef vector[c_string] cols = [tobytes(c) for c in columns] + check_status(self.builder.Project(cols)) + + return self + + def finish(self): + """Return the constructed now-immutable Scanner object + + Returns + ------- + self : ScannerBuilder + """ + return Scanner.wrap(GetResultValue(self.builder.Finish())) + + def filter(self, Expression filter_expression not None): + """Set the filter expression to return only rows matching the filter. + + The predicate will be passed down to DataSources and corresponding + data fragments to exploit predicate pushdown if possible using + partition information or internal metadata, e.g. Parquet statistics. + Otherwise filters the loaded RecordBatches before yielding them. + + It alters the object in place and returns the object itself enabling + method chaining. Raises exception if any of the referenced column names + does not exists in the dataset's Schema. + + Parameters + ---------- + filter_expression : Expression + Boolean expression to filter rows with. + + Returns + ------- + self : ScannerBuilder + """ + check_status(self.builder.Filter(filter_expression.unwrap())) + return self + + def use_threads(self, bint value): + """Set whether the Scanner should make use of the thread pool. + + It alters the object in place and returns with the object itself + enabling method chaining. + + Parameters + ---------- + value : boolean + + Returns + ------- + self : ScannerBuilder + """ + check_status(self.builder.UseThreads(value)) + return self + + @property + def schema(self): + return pyarrow_wrap_schema(self.builder.schema()) + + +cdef class Scanner: + """A materialized scan operation with context and options bound. + + A scanner is the class that glues the scan tasks, data fragments and data + sources together. + """ + + cdef: + shared_ptr[CScanner] wrapped + CScanner* scanner + + def __init__(self): + raise TypeError('Scanner cannot be initialized directly, use ' + 'ScannerBuilder instead') + + cdef void init(self, shared_ptr[CScanner]& sp): + self.wrapped = sp + self.scanner = sp.get() + + @staticmethod + cdef wrap(shared_ptr[CScanner]& sp): + cdef Scanner self = Scanner.__new__(Scanner) + self.init(sp) + return self + + def scan(self): + """Returns a stream of ScanTasks + + The caller is responsible to dispatch/schedule said tasks. Tasks should + be safe to run in a concurrent fashion and outlive the iterator. + + Returns + ------- + scan_tasks : iterator of ScanTask + """ + cdef: + CScanTaskIterator iterator + shared_ptr[CScanTask] task + + iterator = move(GetResultValue(move(self.scanner.Scan()))) + + while True: + task = GetResultValue(iterator.Next()) + if task.get() == nullptr: + raise StopIteration() + else: + yield ScanTask.wrap(task) + + def to_table(self): + """Convert a Scanner into a Table. + + Use this convenience utility with care. This will serially materialize + the Scan result in memory before creating the Table. + + Returns + ------- + table : Table + """ + cdef CResult[shared_ptr[CTable]] result + + with nogil: + result = self.scanner.ToTable() + + return pyarrow_wrap_table(GetResultValue(result)) + + +cdef class Expression: + + cdef: + shared_ptr[CExpression] wrapped + CExpression* expression + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef void init(self, const shared_ptr[CExpression]& sp): + self.wrapped = sp + self.expression = sp.get() + + @staticmethod + cdef wrap(const shared_ptr[CExpression]& sp): + cdef Expression self + + typ = sp.get().type() + if typ == CExpressionType_FIELD: + self = FieldExpression.__new__(FieldExpression) + elif typ == CExpressionType_SCALAR: + self = ScalarExpression.__new__(ScalarExpression) + elif typ == CExpressionType_NOT: + self = NotExpression.__new__(NotExpression) + elif typ == CExpressionType_CAST: + self = CastExpression.__new__(CastExpression) + elif typ == CExpressionType_AND: + self = AndExpression.__new__(AndExpression) + elif typ == CExpressionType_OR: + self = OrExpression.__new__(OrExpression) + elif typ == CExpressionType_COMPARISON: + self = ComparisonExpression.__new__(ComparisonExpression) + elif typ == CExpressionType_IS_VALID: + self = IsValidExpression.__new__(IsValidExpression) + elif typ == CExpressionType_IN: + self = InExpression.__new__(InExpression) + else: + raise TypeError(typ) + + self.init(sp) + return self + + cdef inline shared_ptr[CExpression] unwrap(self): + return self.wrapped + + def equals(self, Expression other): + return self.expression.Equals(other.unwrap()) + + def __str__(self): + return frombytes(self.expression.ToString()) + + def validate(self, Schema schema not None): + cdef: + shared_ptr[CSchema] sp_schema + CResult[shared_ptr[CDataType]] result + sp_schema = pyarrow_unwrap_schema(schema) + result = self.expression.Validate(deref(sp_schema)) + return pyarrow_wrap_data_type(GetResultValue(result)) + + def assume(self, Expression given): + return Expression.wrap(self.expression.Assume(given.unwrap())) + + +cdef class UnaryExpression(Expression): + + cdef CUnaryExpression* unary + + cdef void init(self, const shared_ptr[CExpression]& sp): + Expression.init(self, sp) + self.unary = sp.get() + + +cdef class BinaryExpression(Expression): + + cdef CBinaryExpression* binary + + cdef void init(self, const shared_ptr[CExpression]& sp): + Expression.init(self, sp) + self.binary = sp.get() + + @property + def left_operand(self): + return Expression.wrap(self.binary.left_operand()) + + @property + def right_operand(self): + return Expression.wrap(self.binary.right_operand()) + + +cdef class ScalarExpression(Expression): + + cdef CScalarExpression* scalar + + def __init__(self, value): + cdef: + shared_ptr[CScalar] scalar + shared_ptr[CScalarExpression] expression + + if isinstance(value, bool): + scalar = MakeScalar(value) + elif isinstance(value, float): + scalar = MakeScalar(value) + elif isinstance(value, int): + scalar = MakeScalar(value) + else: + raise TypeError('Not yet supported scalar value: {}'.format(value)) + + expression.reset(new CScalarExpression(scalar)) + self.init( expression) + + cdef void init(self, const shared_ptr[CExpression]& sp): + Expression.init(self, sp) + self.scalar = sp.get() + + # TODO(kszucs): implement once we have proper Scalar bindings + # @property + # def value(self): + # return pyarrow_wrap_scalar(self.scalar.value()) + + +cdef class FieldExpression(Expression): + + cdef CFieldExpression* scalar + + def __init__(self, name): + cdef: + c_string field_name = tobytes(name) + shared_ptr[CExpression] expression + expression.reset(new CFieldExpression(field_name)) + self.init(expression) + + cdef void init(self, const shared_ptr[CExpression]& sp): + Expression.init(self, sp) + self.scalar = sp.get() + + def name(self): + return frombytes(self.scalar.name()) + + +cpdef enum CompareOperator: + Equal = CCompareOperator_EQUAL + NotEqual = CCompareOperator_NOT_EQUAL + Greater = CCompareOperator_GREATER + GreaterEqual = CCompareOperator_GREATER_EQUAL + Less = CCompareOperator_LESS + LessEqual = CCompareOperator_LESS_EQUAL + + +cdef class ComparisonExpression(BinaryExpression): + + cdef CComparisonExpression* comparison + + def __init__(self, CompareOperator op, + Expression left_operand not None, + Expression right_operand not None): + cdef shared_ptr[CComparisonExpression] expression + expression.reset( + new CComparisonExpression( + op, + left_operand.unwrap(), + right_operand.unwrap() + ) + ) + self.init( expression) + + cdef void init(self, const shared_ptr[CExpression]& sp): + BinaryExpression.init(self, sp) + self.comparison = sp.get() + + def op(self): + return self.comparison.op() + + +cdef class IsValidExpression(UnaryExpression): + + def __init__(self, Expression operand not None): + cdef shared_ptr[CIsValidExpression] expression + expression = make_shared[CIsValidExpression](operand.unwrap()) + self.init( expression) + + +cdef class CastExpression(UnaryExpression): + + def __init__(self, Expression operand not None, DataType to not None, + bint safe=True): + # TODO(kszucs): safe is consitently used across pyarrow, but on long + # term we should expose the CastOptions object + cdef: + CastOptions options + shared_ptr[CExpression] expression + options = CastOptions.safe() if safe else CastOptions.unsafe() + expression.reset(new CCastExpression( + operand.unwrap(), + pyarrow_unwrap_data_type(to), + options.unwrap() + )) + self.init(expression) + + +cdef class InExpression(UnaryExpression): + + def __init__(self, Expression operand not None, Array haystack not None): + cdef shared_ptr[CExpression] expression + expression.reset( + new CInExpression(operand.unwrap(), pyarrow_unwrap_array(haystack)) + ) + self.init(expression) + + +cdef class NotExpression(UnaryExpression): + + def __init__(self, Expression operand not None): + cdef shared_ptr[CNotExpression] expression + expression = MakeNotExpression(operand.unwrap()) + self.init( expression) + + +cdef class AndExpression(BinaryExpression): + + def __init__(self, Expression left_operand not None, + Expression right_operand not None, + *additional_operands): + cdef: + Expression operand + vector[shared_ptr[CExpression]] exprs + exprs.push_back(left_operand.unwrap()) + exprs.push_back(right_operand.unwrap()) + for operand in additional_operands: + exprs.push_back(operand.unwrap()) + self.init(MakeAndExpression(exprs)) + + +cdef class OrExpression(BinaryExpression): + + def __init__(self, Expression left_operand not None, + Expression right_operand not None, + *additional_operands): + cdef: + Expression operand + vector[shared_ptr[CExpression]] exprs + exprs.push_back(left_operand.unwrap()) + exprs.push_back(right_operand.unwrap()) + for operand in additional_operands: + exprs.push_back(operand.unwrap()) + self.init(MakeOrExpression(exprs)) diff --git a/python/pyarrow/_fs.pxd b/python/pyarrow/_fs.pxd index 401b6dce845..9a8d487d04f 100644 --- a/python/pyarrow/_fs.pxd +++ b/python/pyarrow/_fs.pxd @@ -39,13 +39,17 @@ cdef class FileStats: CFileStats stats @staticmethod - cdef FileStats wrap(CFileStats stats) + cdef wrap(CFileStats stats) + + cdef inline CFileStats unwrap(self) nogil cdef class Selector: cdef: CSelector selector + cdef inline CSelector unwrap(self) nogil + cdef class FileSystem: cdef: @@ -54,6 +58,11 @@ cdef class FileSystem: cdef init(self, const shared_ptr[CFileSystem]& wrapped) + @staticmethod + cdef wrap(shared_ptr[CFileSystem]& sp) + + cdef inline shared_ptr[CFileSystem] unwrap(self) nogil + cdef class LocalFileSystem(FileSystem): cdef: @@ -67,3 +76,10 @@ cdef class SubTreeFileSystem(FileSystem): CSubTreeFileSystem* subtreefs cdef init(self, const shared_ptr[CFileSystem]& wrapped) + + +cdef class _MockFileSystem(FileSystem): + cdef: + CMockFileSystem* mockfs + + cdef init(self, const shared_ptr[CFileSystem]& wrapped) diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx index afd68849347..5d44b2cd9c9 100644 --- a/python/pyarrow/_fs.pyx +++ b/python/pyarrow/_fs.pyx @@ -18,10 +18,13 @@ # cython: language_level = 3 import six +from cpython.datetime cimport datetime, PyDateTime_DateTime from pyarrow.compat import frombytes, tobytes from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport PyDateTime_from_TimePoint +from pyarrow.includes.libarrow cimport ( + PyDateTime_from_TimePoint, PyDateTime_to_TimePoint +) from pyarrow.lib import _detect_compression from pyarrow.lib cimport * @@ -45,11 +48,14 @@ cdef class FileStats: "FileSystem.get_target_stats method instead.") @staticmethod - cdef FileStats wrap(CFileStats stats): + cdef wrap(CFileStats stats): cdef FileStats self = FileStats.__new__(FileStats) self.stats = stats return self + cdef inline CFileStats unwrap(self) nogil: + return self.stats + def __repr__(self): def getvalue(attr): try: @@ -144,6 +150,9 @@ cdef class Selector: self.recursive = recursive self.allow_non_existent = allow_non_existent + cdef inline CSelector unwrap(self) nogil: + return self.selector + @property def base_dir(self): return frombytes(self.selector.base_dir) @@ -181,6 +190,32 @@ cdef class FileSystem: self.wrapped = wrapped self.fs = wrapped.get() + @staticmethod + cdef wrap(shared_ptr[CFileSystem]& sp): + cdef FileSystem self + + typ = frombytes(sp.get().type_name()) + if typ == 'local': + self = LocalFileSystem.__new__(LocalFileSystem) + elif typ == 'mock': + self = _MockFileSystem.__new__(_MockFileSystem) + elif typ == 'subtree': + self = SubTreeFileSystem.__new__(SubTreeFileSystem) + elif typ == 's3': + from pyarrow._s3fs import S3FileSystem + self = S3FileSystem.__new__(S3FileSystem) + elif typ == 'hdfs': + from pyarrow._hdfs import HadoopFileSystem + self = HadoopFileSystem.__new__(HadoopFileSystem) + else: + raise TypeError('Cannot wrap FileSystem pointer') + + self.init(sp) + return self + + cdef inline shared_ptr[CFileSystem] unwrap(self) nogil: + return self.wrapped + def get_target_stats(self, paths_or_selector): """Get statistics for the given target. @@ -547,3 +582,20 @@ cdef class SubTreeFileSystem(FileSystem): cdef init(self, const shared_ptr[CFileSystem]& wrapped): FileSystem.init(self, wrapped) self.subtreefs = wrapped.get() + + +cdef class _MockFileSystem(FileSystem): + + def __init__(self, datetime current_time=None): + cdef shared_ptr[CMockFileSystem] wrapped + + current_time = current_time or datetime.now() + wrapped = make_shared[CMockFileSystem]( + PyDateTime_to_TimePoint( current_time) + ) + + self.init( wrapped) + + cdef init(self, const shared_ptr[CFileSystem]& wrapped): + FileSystem.init(self, wrapped) + self.mockfs = wrapped.get() diff --git a/python/pyarrow/compute.pxi b/python/pyarrow/compute.pxi new file mode 100644 index 00000000000..d5cc366bfed --- /dev/null +++ b/python/pyarrow/compute.pxi @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +cdef class CastOptions: + + __slots__ = () # avoid mistakingly creating attributes + + def __init__(self, allow_int_overflow=None, + allow_time_truncate=None, allow_time_overflow=None, + allow_float_truncate=None, allow_invalid_utf8=None): + if allow_int_overflow is not None: + self.allow_int_overflow = allow_int_overflow + if allow_time_truncate is not None: + self.allow_time_truncate = allow_time_truncate + if allow_time_overflow is not None: + self.allow_time_overflow = allow_time_overflow + if allow_float_truncate is not None: + self.allow_float_truncate = allow_float_truncate + if allow_invalid_utf8 is not None: + self.allow_invalid_utf8 = allow_invalid_utf8 + + @staticmethod + cdef wrap(CCastOptions options): + cdef CastOptions self = CastOptions.__new__(CastOptions) + self.options = options + return self + + @staticmethod + def safe(): + return CastOptions.wrap(CCastOptions.Safe()) + + @staticmethod + def unsafe(): + return CastOptions.wrap(CCastOptions.Unsafe()) + + cdef inline CCastOptions unwrap(self) nogil: + return self.options + + @property + def allow_int_overflow(self): + return self.options.allow_int_overflow + + @allow_int_overflow.setter + def allow_int_overflow(self, bint flag): + self.options.allow_int_overflow = flag + + @property + def allow_time_truncate(self): + return self.options.allow_time_truncate + + @allow_time_truncate.setter + def allow_time_truncate(self, bint flag): + self.options.allow_time_truncate = flag + + @property + def allow_time_overflow(self): + return self.options.allow_time_overflow + + @allow_time_overflow.setter + def allow_time_overflow(self, bint flag): + self.options.allow_time_overflow = flag + + @property + def allow_float_truncate(self): + return self.options.allow_float_truncate + + @allow_float_truncate.setter + def allow_float_truncate(self, bint flag): + self.options.allow_float_truncate = flag + + @property + def allow_invalid_utf8(self): + return self.options.allow_invalid_utf8 + + @allow_invalid_utf8.setter + def allow_invalid_utf8(self, bint flag): + self.options.allow_invalid_utf8 = flag diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py new file mode 100644 index 00000000000..6a9dd460acb --- /dev/null +++ b/python/pyarrow/dataset.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Dataset is currently unstable. APIs subject to change without notice.""" + +from __future__ import absolute_import + +import sys + +if sys.version_info < (3,): + raise ImportError("Python Dataset bindings require Python 3") + +from pyarrow._dataset import ( # noqa + AndExpression, + CastExpression, + CompareOperator, + ComparisonExpression, + Dataset, + DataSource, + DefaultPartitionScheme, + Expression, + FieldExpression, + FileFormat, + FileSystemDataSource, + FileSystemDataSourceDiscovery, + FileSystemDiscoveryOptions, + HivePartitionScheme, + InExpression, + IsValidExpression, + NotExpression, + OrExpression, + ParquetFileFormat, + PartitionScheme, + ScalarExpression, + Scanner, + ScannerBuilder, + ScanTask, + SchemaPartitionScheme, + TreeDataSource, +) diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 95b875a5a7d..b01739d899a 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -24,7 +24,8 @@ FileSystem, LocalFileSystem, LocalFileSystemOptions, - SubTreeFileSystem + SubTreeFileSystem, + _MockFileSystem ) try: diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 0bbdb864efc..ba4cda8d332 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -27,12 +27,31 @@ from libcpp.unordered_map cimport unordered_map from libcpp.unordered_set cimport unordered_set from cpython cimport PyObject +from cpython.datetime cimport PyDateTime_DateTime cimport cpython cdef extern from * namespace "std" nogil: cdef shared_ptr[T] static_pointer_cast[T, U](shared_ptr[U]) +# vendored from the cymove project https://github.com/ozars/cymove +cdef extern from * namespace "cymove" nogil: + """ + #include + #include + namespace cymove { + template + inline typename std::remove_reference::type&& cymove(T& t) { + return std::move(t); + } + template + inline typename std::remove_reference::type&& cymove(T&& t) { + return std::move(t); + } + } // namespace cymove + """ + cdef T move" cymove::cymove"[T](T) + cdef extern from "arrow/python/platform.h": pass diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index d906c2d635e..67178687639 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -791,6 +791,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass CDoubleScalar" arrow::DoubleScalar"(CScalar): double value + shared_ptr[CScalar] MakeScalar[Value](Value value) + CStatus ConcatenateTables(const vector[shared_ptr[CTable]]& tables, shared_ptr[CTable]* result) @@ -1363,11 +1365,15 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: cdef cppclass CCastOptions" arrow::compute::CastOptions": CCastOptions() CCastOptions(c_bool safe) + @staticmethod CCastOptions Safe() + @staticmethod CCastOptions Unsafe() c_bool allow_int_overflow c_bool allow_time_truncate + c_bool allow_time_overflow c_bool allow_float_truncate + c_bool allow_invalid_utf8 cdef cppclass CTakeOptions" arrow::compute::TakeOptions": pass @@ -1420,6 +1426,16 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: CFunctionContext* context, const CDatum& values, const CDatum& filter, CDatum* out) + enum CCompareOperator "arrow::compute::CompareOperator": + CCompareOperator_EQUAL "arrow::compute::CompareOperator::EQUAL" + CCompareOperator_NOT_EQUAL "arrow::compute::CompareOperator::NOT_EQUAL" + CCompareOperator_GREATER "arrow::compute::CompareOperator::GREATER" + CCompareOperator_GREATER_EQUAL \ + "arrow::compute::CompareOperator::GREATER_EQUAL" + CCompareOperator_LESS "arrow::compute::CompareOperator::LESS" + CCompareOperator_LESS_EQUAL \ + "arrow::compute::CompareOperator::LESS_EQUAL" + cdef extern from "arrow/python/api.h" namespace "arrow::py": # Requires GIL @@ -1572,13 +1588,13 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: cdef extern from "arrow/python/api.h" namespace "arrow::py::internal" nogil: - cdef cppclass CTimePoint "arrow::py::internal::TimePoint": pass - cdef CStatus PyDateTime_from_int(int64_t val, const TimeUnit unit, - PyObject** out) - cdef CStatus PyDateTime_from_TimePoint(CTimePoint val, PyObject** out) + CStatus PyDateTime_from_int(int64_t val, const TimeUnit unit, + PyObject** out) + CStatus PyDateTime_from_TimePoint(CTimePoint val, PyObject** out) + CTimePoint PyDateTime_to_TimePoint(PyDateTime_DateTime* pydatetime) cdef extern from 'arrow/python/init.h': @@ -1668,6 +1684,12 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) +cdef extern from 'arrow/util/iterator.h' namespace 'arrow' nogil: + cdef cppclass CIterator" arrow::Iterator"[T]: + CResult[T] Next() + CStatus Visit[Visitor](Visitor&& visitor) + + cdef extern from 'arrow/util/thread_pool.h' namespace 'arrow' nogil: int GetCpuThreadPoolCapacity() CStatus SetCpuThreadPoolCapacity(int threads) @@ -1675,7 +1697,3 @@ cdef extern from 'arrow/util/thread_pool.h' namespace 'arrow' nogil: cdef extern from 'arrow/array/concatenate.h' namespace 'arrow' nogil: CStatus Concatenate(const vector[shared_ptr[CArray]]& arrays, CMemoryPool* pool, shared_ptr[CArray]* result) - -cdef extern from "" namespace "std": - # Work around https://github.com/cython/cython/issues/2169 - unique_ptr[CCodec] move(unique_ptr[CCodec]) nogil diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd new file mode 100644 index 00000000000..4941be22aa2 --- /dev/null +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -0,0 +1,362 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from libcpp.unordered_map cimport unordered_map + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_fs cimport * + + +cdef extern from "arrow/api.h" namespace "arrow" nogil: + + cdef cppclass CRecordBatchIterator "arrow::RecordBatchIterator"( + CIterator[shared_ptr[CRecordBatch]]): + pass + + +cdef extern from "arrow/dataset/api.h" namespace "arrow::fs" nogil: + + ctypedef shared_ptr[CFileSystem] CFileSystemPtr "arrow::fs::FileSystemPtr" + ctypedef vector[CFileStats] CFileStatsVector "arrow::fs::FileStatsVector" + + +cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: + + cdef enum CExpressionType "arrow::dataset::ExpressionType::type": + CExpressionType_FIELD "arrow::dataset::ExpressionType::type::FIELD" + CExpressionType_SCALAR "arrow::dataset::ExpressionType::type::SCALAR" + CExpressionType_NOT "arrow::dataset::ExpressionType::type::NOT" + CExpressionType_CAST "arrow::dataset::ExpressionType::type::CAST" + CExpressionType_AND "arrow::dataset::ExpressionType::type::AND" + CExpressionType_OR "arrow::dataset::ExpressionType::type::OR" + CExpressionType_COMPARISON \ + "arrow::dataset::ExpressionType::type::COMPARISON" + CExpressionType_IS_VALID \ + "arrow::dataset::ExpressionType::type::IS_VALID" + CExpressionType_IN "arrow::dataset::ExpressionType::type::IN" + CExpressionType_CUSTOM "arrow::dataset::ExpressionType::type::CUSTOM" + + cdef cppclass CExpression "arrow::dataset::Expression": + CExpression(CExpressionType type) + c_bool Equals(const CExpression& other) const + c_bool Equals(const shared_ptr[CExpression]& other) const + c_bool IsNull() const + CResult[shared_ptr[CDataType]] Validate(const CSchema& schema) const + shared_ptr[CExpression] Assume(const CExpression& given) const + shared_ptr[CExpression] Assume( + const shared_ptr[CExpression]& given) const + c_string ToString() const + CExpressionType type() const + shared_ptr[CExpression] Copy() const + + ctypedef shared_ptr[CExpression] CExpressionPtr \ + "arrow::dataset::ExpressionPtr" + ctypedef vector[CExpressionPtr] CExpressionVector \ + "arrow::dataset::ExpressionVector" + + cdef cppclass CUnaryExpression "arrow::dataset::UnaryExpression"( + CExpression): + const CExpressionPtr& operand() const + + cdef cppclass CBinaryExpression "arrow::dataset::BinaryExpression"( + CExpression): + const CExpressionPtr& left_operand() const + const CExpressionPtr& right_operand() const + + cdef cppclass CScalarExpression "arrow::dataset::ScalarExpression"( + CExpression): + CScalarExpression(const shared_ptr[CScalar]& value) + const shared_ptr[CScalar]& value() const + + cdef cppclass CFieldExpression "arrow::dataset::FieldExpression"( + CExpression): + CFieldExpression(c_string name) + c_string name() const + + cdef cppclass CComparisonExpression "arrow::dataset::ComparisonExpression"( + CBinaryExpression): + CComparisonExpression(CCompareOperator op, CExpressionPtr left_operand, + CExpressionPtr right_operand) + CCompareOperator op() const + + cdef cppclass CAndExpression "arrow::dataset::AndExpression"( + CBinaryExpression): + pass + + cdef cppclass COrExpression "arrow::dataset::OrExpression"( + CBinaryExpression): + pass + + cdef cppclass CNotExpression "arrow::dataset::NotExpression"( + CUnaryExpression): + pass + + cdef cppclass CIsValidExpression "arrow::dataset::IsValidExpression"( + CUnaryExpression): + pass + + cdef cppclass CCastExpression "arrow::dataset::CastExpression"( + CUnaryExpression): + CCastExpression(CExpressionPtr operand, shared_ptr[CDataType] to, + CCastOptions options) + + cdef cppclass CInExpression "arrow::dataset::InExpression"( + CUnaryExpression): + CInExpression(CExpressionPtr operand, shared_ptr[CArray] set) + + cdef shared_ptr[CNotExpression] MakeNotExpression "arrow::dataset::not_"( + CExpressionPtr operand) + cdef CExpressionPtr MakeAndExpression "arrow::dataset::and_"( + const CExpressionVector& subexpressions) + cdef CExpressionPtr MakeOrExpression "arrow::dataset::or_"( + const CExpressionVector& subexpressions) + + cdef cppclass CFilter "arrow::dataset::Filter": + pass + + cdef cppclass CWriteOptions "arrow::dataset::WriteOptions": + pass + + cdef cppclass CScanOptions "arrow::dataset::ScanOptions": + CExpressionPtr filter + shared_ptr[CSchema] schema + c_bool use_threads + # shared_ptr[CExpressionEvaluator] evaluator + # shared_ptr[CRecordBatchProjector] projector + + @staticmethod + shared_ptr[CScanOptions] Defaults() + + cdef cppclass CScanContext "arrow::dataset::ScanContext": + CMemoryPool* pool + + ctypedef shared_ptr[CScanOptions] CScanOptionsPtr \ + "arrow::dataset::ScanOptionsPtr" + ctypedef shared_ptr[CScanContext] CScanContextPtr \ + "arrow::dataset::ScanContextPtr" + + cdef cppclass CScanTask" arrow::dataset::ScanTask": + CResult[CRecordBatchIterator] Execute() + + cdef cppclass CSimpleScanTask "arrow::dataset::SimpleScanTask"(CScanTask): + pass + + ctypedef shared_ptr[CScanTask] CScanTaskPtr "arrow::dataset::ScanTaskPtr" + ctypedef CIterator[CScanTaskPtr] CScanTaskIterator \ + "arrow::dataset::ScanTaskIterator" + + cdef cppclass CScanner "arrow::dataset::Scanner": + CResult[CScanTaskIterator] Scan() + CResult[shared_ptr[CTable]] ToTable() + + ctypedef shared_ptr[CScanner] CScannerPtr "arrow::dataset::ScannerPtr" + + cdef cppclass CScannerBuilder "arrow::dataset::ScannerBuilder": + CScannerBuilder(shared_ptr[CDataset], CScanContextPtr scan_context) + CStatus Project(const vector[c_string]& columns) + CStatus Filter(const CExpression& filter) + CStatus Filter(CExpressionPtr filter) + CStatus UseThreads(c_bool use_threads) + CResult[CScannerPtr] Finish() + shared_ptr[CSchema] schema() const + + ctypedef shared_ptr[CScannerBuilder] CScannerBuilderPtr \ + "arrow::dataset::ScannerBuilderPtr" + + cdef cppclass CDataFragment "arrow::dataset::DataFragment": + CResult[CScanTaskIterator] Scan(CScanContextPtr context) + c_bool splittable() + CScanOptionsPtr scan_options() + + ctypedef shared_ptr[CDataFragment] CDataFragmentPtr \ + "arrow::dataset::DataFragmentPtr" + ctypedef vector[CDataFragmentPtr] CDataFragmentVector \ + "arrow::dataset::DataFragmentVector" + ctypedef CIterator[CDataFragmentPtr] CDataFragmentIterator \ + "arrow::dataset::DataFragmentIterator" + + cdef cppclass CSimpleDataFragment "arrow::dataset::SimpleDataFragment"( + CDataFragment): + CSimpleDataFragment(vector[shared_ptr[CRecordBatch]] record_batches, + CScanOptionsPtr scan_options) + + cdef cppclass CDataSource "arrow::dataset::DataSource": + CDataFragmentIterator GetFragments(CScanOptionsPtr options) + const CExpressionPtr& partition_expression() + c_string type_name() + + ctypedef shared_ptr[CDataSource] CDataSourcePtr \ + "arrow::dataset::DataSourcePtr" + ctypedef vector[CDataSourcePtr] CDataSourceVector \ + "arrow::dataset::DataSourceVector" + + cdef cppclass CTreeDataSource "arrow::dataset::TreeDataSource"( + CDataSource): + CTreeDataSource(CDataSourceVector children) + + cdef cppclass CDataset "arrow::dataset::Dataset": + @staticmethod + CResult[shared_ptr[CDataset]] Make(CDataSourceVector sources, + shared_ptr[CSchema] schema) + CResult[CScannerBuilderPtr] NewScanWithContext "NewScan"( + CScanContextPtr context) + CResult[CScannerBuilderPtr] NewScan() + const CDataSourceVector& sources() + shared_ptr[CSchema] schema() + + ctypedef shared_ptr[CDataset] CDatasetPtr "arrow::dataset::DatasetPtr" + + cdef cppclass CFileScanOptions "arrow::dataset::FileScanOptions"( + CScanOptions): + c_string file_type() + + cdef cppclass CFileSource "arrow::dataset::FileSource": + CFileSource(c_string path, CFileSystem* filesystem, + CompressionType compression) + c_bool operator==(const CFileSource& other) const + CompressionType compression() + c_string path() + CFileSystem* filesystem() + shared_ptr[CBuffer] buffer() + CStatus Open(shared_ptr[CRandomAccessFile]* out) + + cdef cppclass CFileWriteOptions "arrow::dataset::WriteOptions"( + CWriteOptions): + c_string file_type() + + cdef cppclass CFileFormat "arrow::dataset::FileFormat": + c_string type_name() + CStatus IsSupported(const CFileSource& source, c_bool* supported) const + CStatus Inspect(const CFileSource& source, + shared_ptr[CSchema]* out) const + CStatus ScanFile(const CFileSource& source, + CScanOptionsPtr scan_options, + CScanContextPtr scan_context, + CScanTaskIterator* out) const + CStatus MakeFragment(const CFileSource& location, + CScanOptionsPtr opts, + CDataFragmentPtr* out) + + ctypedef shared_ptr[CFileFormat] CFileFormatPtr \ + "arrow::dataset::FileFormatPtr" + + cdef cppclass CFileDataFragment "arrow::dataset::FileDataFragment"( + CDataFragment): + CFileDataFragment(const CFileSource& source, CFileFormatPtr format, + CScanOptionsPtr scan_options) + CStatus Scan(CScanContextPtr scan_context, + shared_ptr[CScanTaskIterator]* out) + const CFileSource& source() + CFileFormatPtr format() + CScanOptionsPtr scan_options() + + cdef cppclass CParquetDataFragment "arrow::dataset::ParquetFragment"( + CFileDataFragment): + CParquetDataFragment(const CFileSource& source, + CScanOptionsPtr options) + + ctypedef unordered_map[c_string, CExpressionPtr] CPathPartitions \ + "arrow::dataset::PathPartitions" + + cdef cppclass CFileSystemDataSource \ + "arrow::dataset::FileSystemDataSource"(CDataSource): + @staticmethod + CResult[CDataSourcePtr] Make(CFileSystemPtr filesystem, + CFileStatsVector stats, + CExpressionVector partitions, + CExpressionPtr source_partition, + CFileFormatPtr format) + c_string type() + shared_ptr[CDataFragmentIterator] GetFragments(CScanOptionsPtr options) + + cdef cppclass CParquetScanOptions "arrow::dataset::ParquetScanOptions"( + CFileScanOptions): + c_string file_type() + + cdef cppclass CParquetWriterOptions "arrow::dataset::ParquetWriterOptions"( + CFileWriteOptions): + c_string file_type() + + cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"( + CFileFormat): + pass + + cdef cppclass CParquetFragment "arrow::dataset::ParquetFragment"( + CFileDataFragment): + CParquetFragment(const CFileSource& source, + shared_ptr[CScanOptions] options) + c_bool splittable() + + cdef cppclass CPartitionScheme "arrow::dataset::PartitionScheme": + c_string type_name() const + CResult[CExpressionPtr] Parse(const c_string& path) const + const shared_ptr[CSchema]& schema() + + ctypedef shared_ptr[CPartitionScheme] CPartitionSchemePtr \ + "arrow::dataset::PartitionSchemePtr" + + cdef cppclass CDefaultPartitionScheme \ + "arrow::dataset::DefaultPartitionScheme"(CPartitionScheme): + CDefaultPartitionScheme() + + cdef cppclass CSchemaPartitionScheme \ + "arrow::dataset::SchemaPartitionScheme"(CPartitionScheme): + CSchemaPartitionScheme(shared_ptr[CSchema] schema) + + cdef cppclass CHivePartitionScheme \ + "arrow::dataset::HivePartitionScheme"(CPartitionScheme): + CHivePartitionScheme(shared_ptr[CSchema] schema) + + cdef cppclass CFileSystemDiscoveryOptions \ + "arrow::dataset::FileSystemDiscoveryOptions": + c_string partition_base_dir + c_bool exclude_invalid_files + vector[c_string] ignore_prefixes + + cdef cppclass CDataSourceDiscovery "arrow::dataset::DataSourceDiscovery": + CResult[shared_ptr[CSchema]] Inspect() + CResult[CDataSourcePtr] Finish() + shared_ptr[CSchema] schema() + CStatus SetSchema(shared_ptr[CSchema]) + CPartitionSchemePtr partition_scheme() + CStatus SetPartitionScheme(CPartitionSchemePtr partition_scheme) + CExpressionPtr root_partition() + CStatus SetRootPartition(CExpressionPtr partition) + + ctypedef shared_ptr[CDataSourceDiscovery] CDataSourceDiscoveryPtr \ + "arrow::dataset::DataSourceDiscovery" + + cdef cppclass CFileSystemDataSourceDiscovery \ + "arrow::dataset::FileSystemDataSourceDiscovery"( + CDataSourceDiscovery): + @staticmethod + CResult[CDataSourceDiscoveryPtr] MakeFromFileStats "Make"( + CFileSystemPtr filesytem, + CFileStatsVector paths, + CFileFormatPtr format, + CFileSystemDiscoveryOptions options + ) + @staticmethod + CResult[CDataSourceDiscoveryPtr] MakeFromSelector "Make"( + CFileSystemPtr filesytem, + CSelector, + CFileFormatPtr format, + CFileSystemDiscoveryOptions options + ) diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd index f57a1c1ab33..34cef1201b8 100644 --- a/python/pyarrow/includes/libarrow_flight.pxd +++ b/python/pyarrow/includes/libarrow_flight.pxd @@ -486,9 +486,3 @@ cdef extern from "arrow/python/flight.h" namespace "arrow::py::flight" nogil: cdef CStatus SerializeBasicAuth" arrow::py::flight::SerializeBasicAuth"( CBasicAuth basic_auth, c_string* out) - -cdef extern from "" namespace "std": - # Work around https://github.com/cython/cython/issues/2169 - unique_ptr[CFlightDataStream] move(unique_ptr[CFlightDataStream]) nogil - unique_ptr[CServerAuthHandler] move(unique_ptr[CServerAuthHandler]) nogil - unique_ptr[CClientAuthHandler] move(unique_ptr[CClientAuthHandler]) nogil diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index e2e50985f40..d68e6e85609 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -59,6 +59,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: c_bool recursive cdef cppclass CFileSystem "arrow::fs::FileSystem": + c_string type_name() const CResult[CFileStats] GetTargetStats(const c_string& path) CResult[vector[CFileStats]] GetTargetStats( const vector[c_string]& paths) @@ -147,3 +148,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: @staticmethod CResult[shared_ptr[CHadoopFileSystem]] Make( const CHdfsOptions& options) + + cdef cppclass CMockFileSystem "arrow::fs::internal::MockFileSystem"( + CFileSystem): + CMockFileSystem(CTimePoint current_time) diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 8415c4dfc66..059fd69f69d 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -19,12 +19,11 @@ from __future__ import absolute_import -from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport * -from pyarrow.includes.libarrow cimport CStatus from cpython cimport PyObject from libcpp cimport nullptr from libcpp.cast cimport dynamic_cast +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * cdef extern from "Python.h": @@ -530,6 +529,18 @@ cdef class _CRecordBatchReader: shared_ptr[CRecordBatchReader] reader +cdef class CastOptions: + cdef: + CCastOptions options + + @staticmethod + cdef wrap(CCastOptions options) + + cdef inline CCastOptions unwrap(self) nogil + + +cdef CompressionType _get_compression_type(object name) except * + cdef get_input_stream(object source, c_bool use_memory_map, shared_ptr[CInputStream]* reader) cdef get_reader(object source, c_bool use_memory_map, @@ -547,6 +558,7 @@ cdef object pyarrow_wrap_metadata( # Public Cython API for 3rd party code # +cdef public object pyarrow_wrap_scalar(const shared_ptr[CScalar]& sp_scalar) cdef public object pyarrow_wrap_array(const shared_ptr[CArray]& sp_array) cdef public object pyarrow_wrap_chunked_array( const shared_ptr[CChunkedArray]& sp_array) @@ -565,6 +577,7 @@ cdef public object pyarrow_wrap_sparse_coo_tensor( cdef public object pyarrow_wrap_sparse_csr_matrix( const shared_ptr[CSparseCSRMatrix]& sp_sparse_tensor) +cdef public shared_ptr[CScalar] pyarrow_unwrap_scalar(object scalar) cdef public shared_ptr[CArray] pyarrow_unwrap_array(object array) cdef public shared_ptr[CRecordBatch] pyarrow_unwrap_batch(object batch) cdef public shared_ptr[CBuffer] pyarrow_unwrap_buffer(object buffer) diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 676fea5c981..8aaefc27407 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -129,6 +129,9 @@ include "table.pxi" # Tensors include "tensor.pxi" +# Compute +include "compute.pxi" + # File IO include "io.pxi" include "io-hdfs.pxi" diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi index b97ddd6b413..fd303ba482c 100644 --- a/python/pyarrow/public-api.pxi +++ b/python/pyarrow/public-api.pxi @@ -224,6 +224,20 @@ cdef api object pyarrow_wrap_chunked_array( arr.init(sp_array) return arr + +cdef api bint pyarrow_is_scalar(object value): + return isinstance(value, ScalarValue) + + +cdef api shared_ptr[CScalar] pyarrow_unwrap_scalar(object scalar): + cdef ScalarValue value + if pyarrow_is_scalar(scalar): + value = (scalar) + return value.sp_scalar + + return shared_ptr[CScalar]() + + cdef api object pyarrow_wrap_scalar(const shared_ptr[CScalar]& sp_scalar): if sp_scalar.get() == NULL: raise ValueError('Scalar was NULL') diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 7707d650a10..421fc551443 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -41,9 +41,9 @@ # examples try: pytest pyarrow -sv --only-hypothesis --hypothesis-profile=debug h.settings.load_profile(os.environ.get('HYPOTHESIS_PROFILE', 'dev')) - groups = [ 'cython', + 'dataset', 'hypothesis', 'fastparquet', 'gandiva', @@ -61,9 +61,9 @@ 'requires_testing_data', ] - defaults = { 'cython': False, + 'dataset': False, 'fastparquet': False, 'hypothesis': False, 'gandiva': False, @@ -94,11 +94,17 @@ pass try: - import pyarrow.gandiva # noqa + import pyarrow.gandiva # noqa defaults['gandiva'] = True except ImportError: pass +try: + import pyarrow.dataset # noqa + defaults['dataset'] = True +except ImportError: + pass + try: import pyarrow.orc # noqa defaults['orc'] = True diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py new file mode 100644 index 00000000000..57421aef737 --- /dev/null +++ b/python/pyarrow/tests/test_dataset.py @@ -0,0 +1,352 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +import pyarrow as pa +import pyarrow.fs as fs + +try: + import pyarrow.dataset as ds +except ImportError: + ds = None + +# Marks all of the tests in this module +# Ignore these with pytest ... -m 'not dataset' +pytestmark = pytest.mark.dataset + + +@pytest.fixture +@pytest.mark.parquet +def mockfs(): + import pyarrow.parquet as pq + + mockfs = fs._MockFileSystem() + + data = [ + list(range(5)), + list(map(float, range(5))) + ] + schema = pa.schema([ + pa.field('i64', pa.int64()), + pa.field('f64', pa.float64()) + ]) + batch = pa.record_batch(data, schema=schema) + table = pa.Table.from_batches([batch]) + + directories = [ + 'subdir/1/xxx', + 'subdir/2/yyy', + ] + + for i, directory in enumerate(directories): + path = '{}/file{}.parquet'.format(directory, i) + mockfs.create_dir(directory) + with mockfs.open_output_stream(path) as out: + pq.write_table(table, out) + + return mockfs + + +@pytest.fixture +def dataset(mockfs): + format = ds.ParquetFileFormat() + selector = fs.Selector('subdir', recursive=True) + options = ds.FileSystemDiscoveryOptions('subdir') + discovery = ds.FileSystemDataSourceDiscovery(mockfs, selector, format, + options) + discovery.partition_scheme = ds.SchemaPartitionScheme( + pa.schema([ + pa.field('group', pa.int32()), + pa.field('key', pa.string()) + ]) + ) + source = discovery.finish() + schema = discovery.inspect() + return ds.Dataset([source], schema) + + +def test_filesystem_data_source(mockfs): + file_format = ds.ParquetFileFormat() + + paths = ['subdir/1/xxx/file0.parquet', 'subdir/2/yyy/file1.parquet'] + partitions = [ds.ScalarExpression(True), ds.ScalarExpression(True)] + + source = ds.FileSystemDataSource(mockfs, paths, partitions, + source_partition=None, + file_format=file_format) + + source_partition = ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('source'), + ds.ScalarExpression(1337) + ) + partitions = [ + ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('part'), + ds.ScalarExpression(1) + ), + ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('part'), + ds.ScalarExpression(2) + ) + ] + source = ds.FileSystemDataSource(mockfs, paths, partitions, + source_partition=source_partition, + file_format=file_format) + assert source.partition_expression.equals(source_partition) + + +def test_dataset(dataset): + assert isinstance(dataset, ds.Dataset) + assert isinstance(dataset.schema, pa.Schema) + + # TODO(kszucs): test non-boolean expressions for filter do raise + builder = dataset.new_scan() + assert isinstance(builder, ds.ScannerBuilder) + + scanner = builder.finish() + assert isinstance(scanner, ds.Scanner) + assert len(list(scanner.scan())) == 2 + + expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) + expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) + for task in scanner.scan(): + assert isinstance(task, ds.ScanTask) + for batch in task.execute(): + assert batch.column(0).equals(expected_i64) + assert batch.column(1).equals(expected_f64) + + table = scanner.to_table() + assert isinstance(table, pa.Table) + assert len(table) == 10 + + condition = ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('i64'), + ds.ScalarExpression(1) + ) + scanner = dataset.new_scan().use_threads(True).filter(condition).finish() + result = scanner.to_table() + + assert result.to_pydict() == { + 'i64': [1, 1], + 'f64': [1., 1.], + 'group': [1, 2], + 'key': ['xxx', 'yyy'] + } + + +def test_scanner_builder(dataset): + builder = ds.ScannerBuilder(dataset, memory_pool=pa.default_memory_pool()) + scanner = builder.finish() + assert isinstance(scanner, ds.Scanner) + assert len(list(scanner.scan())) == 2 + + with pytest.raises(pa.ArrowInvalid): + dataset.new_scan().project(['unknown']) + + builder = dataset.new_scan(memory_pool=pa.default_memory_pool()) + scanner = builder.project(['i64']).finish() + + assert isinstance(scanner, ds.Scanner) + assert len(list(scanner.scan())) == 2 + for task in scanner.scan(): + for batch in task.execute(): + assert batch.num_columns == 1 + + +def test_abstract_classes(): + classes = [ + ds.FileFormat, + ds.Scanner, + ds.DataSource, + ds.Expression, + ds.PartitionScheme, + ] + for klass in classes: + with pytest.raises(TypeError): + klass() + + +def test_partition_scheme(): + schema = pa.schema([ + pa.field('i64', pa.int64()), + pa.field('f64', pa.float64()) + ]) + for klass in [ds.SchemaPartitionScheme, ds.HivePartitionScheme]: + scheme = klass(schema) + assert isinstance(scheme, ds.PartitionScheme) + + scheme = ds.SchemaPartitionScheme( + pa.schema([ + pa.field('group', pa.int64()), + pa.field('key', pa.float64()) + ]) + ) + expr = scheme.parse('/3/3.14') + assert isinstance(expr, ds.Expression) + + expected = ds.AndExpression( + ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('group'), + ds.ScalarExpression(3) + ), + ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('key'), + ds.ScalarExpression(3.14) + ) + ) + assert expr.equals(expected) + + with pytest.raises(pa.ArrowInvalid): + scheme.parse('/prefix/3/aaa') + + scheme = ds.HivePartitionScheme( + pa.schema([ + pa.field('alpha', pa.int64()), + pa.field('beta', pa.int64()) + ]) + ) + expr = scheme.parse('/alpha=0/beta=3') + expected = ds.AndExpression( + ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('alpha'), + ds.ScalarExpression(0) + ), + ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('beta'), + ds.ScalarExpression(3) + ) + ) + assert expr.equals(expected) + + +def test_expression(): + a = ds.ScalarExpression(1) + b = ds.ScalarExpression(1.1) + c = ds.ScalarExpression(True) + + equal = ds.ComparisonExpression(ds.CompareOperator.Equal, a, b) + assert equal.op() == ds.CompareOperator.Equal + + and_ = ds.AndExpression(a, b) + assert and_.left_operand.equals(a) + assert and_.right_operand.equals(b) + assert and_.equals(ds.AndExpression(a, b)) + assert and_.equals(and_) + + ds.AndExpression(a, b, c) + ds.OrExpression(a, b) + ds.OrExpression(a, b, c) + ds.NotExpression(ds.OrExpression(a, b, c)) + ds.IsValidExpression(a) + ds.CastExpression(a, pa.int32()) + ds.CastExpression(a, pa.int32(), safe=True) + ds.InExpression(a, pa.array([1, 2, 3])) + + condition = ds.ComparisonExpression( + ds.CompareOperator.Greater, + ds.FieldExpression('i64'), + ds.ScalarExpression(5) + ) + schema = pa.schema([ + pa.field('i64', pa.int64()), + pa.field('f64', pa.float64()) + ]) + assert condition.validate(schema) == pa.bool_() + + i64_is_5 = ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('i64'), + ds.ScalarExpression(5) + ) + i64_is_7 = ds.ComparisonExpression( + ds.CompareOperator.Equal, + ds.FieldExpression('i64'), + ds.ScalarExpression(7) + ) + assert condition.assume(i64_is_5).equals(ds.ScalarExpression(False)) + assert condition.assume(i64_is_7).equals(ds.ScalarExpression(True)) + assert str(condition) == "(i64 > 5:int64)" + + +@pytest.mark.parametrize('paths_or_selector', [ + fs.Selector('subdir', recursive=True), + [ + 'subdir', + 'subdir/1', + 'subdir/1/xxx', + 'subdir/1/xxx/file0.parquet', + 'subdir/2', + 'subdir/2/yyy', + 'subdir/2/yyy/file1.parquet', + ] +]) +def test_file_system_discovery(mockfs, paths_or_selector): + format = ds.ParquetFileFormat() + + options = ds.FileSystemDiscoveryOptions('subdir') + assert options.partition_base_dir == 'subdir' + assert options.ignore_prefixes == ['.', '_'] + assert options.exclude_invalid_files is True + + discovery = ds.FileSystemDataSourceDiscovery( + mockfs, paths_or_selector, format, options + ) + assert isinstance(discovery.inspect(), pa.Schema) + assert isinstance(discovery.finish(), ds.FileSystemDataSource) + assert isinstance(discovery.partition_scheme, ds.DefaultPartitionScheme) + assert discovery.root_partition.equals(ds.ScalarExpression(True)) + + discovery.partition_scheme = ds.SchemaPartitionScheme( + pa.schema([ + pa.field('group', pa.int32()), + pa.field('key', pa.string()) + ]) + ) + data_source = discovery.finish() + assert isinstance(data_source, ds.DataSource) + + inspected_schema = discovery.inspect() + dataset = ds.Dataset([data_source], inspected_schema) + + scanner = dataset.new_scan().finish() + assert len(list(scanner.scan())) == 2 + + expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) + expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) + for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']): + expected_group_column = pa.array([group] * 5, type=pa.int32()) + expected_key_column = pa.array([key] * 5, type=pa.string()) + for batch in task.execute(): + assert batch.num_columns == 4 + assert batch[0].equals(expected_i64) + assert batch[1].equals(expected_f64) + assert batch[2].equals(expected_group_column) + assert batch[3].equals(expected_key_column) + + table = scanner.to_table() + assert isinstance(table, pa.Table) + assert len(table) == 10 + assert table.num_columns == 4 diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 5519a6a97ea..0b0f1a30825 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -26,7 +26,8 @@ import pyarrow as pa from pyarrow.tests.test_io import gzip_compress, gzip_decompress from pyarrow.fs import (FileType, Selector, FileSystem, LocalFileSystem, - LocalFileSystemOptions, SubTreeFileSystem) + LocalFileSystemOptions, SubTreeFileSystem, + _MockFileSystem) @pytest.fixture @@ -40,6 +41,17 @@ def localfs(request, tempdir): ) +@pytest.fixture +def mockfs(request): + return dict( + fs=_MockFileSystem(), + pathfn=lambda p: p, + allow_copy_file=True, + allow_move_dir=True, + allow_append_to_file=True, + ) + + @pytest.fixture def localfs_with_mmap(request, tempdir): return dict( @@ -141,6 +153,10 @@ def hdfs(request, hdfs_server): pytest.lazy_fixture('hdfs'), id='HadoopFileSystem' ), + pytest.param( + pytest.lazy_fixture('mockfs'), + id='_MockFileSystem()' + ), ]) def filesystem_config(request): return request.param diff --git a/python/setup.py b/python/setup.py index 652500faad2..4dd66590950 100755 --- a/python/setup.py +++ b/python/setup.py @@ -106,6 +106,7 @@ def run(self): 'namespace of boost (default: boost)'), ('with-cuda', None, 'build the Cuda extension'), ('with-flight', None, 'build the Flight extension'), + ('with-dataset', None, 'build the Dataset extension'), ('with-parquet', None, 'build the Parquet extension'), ('with-static-parquet', None, 'link parquet statically'), ('with-static-boost', None, 'link boost statically'), @@ -149,6 +150,8 @@ def initialize_options(self): os.environ.get('PYARROW_WITH_CUDA', '0')) self.with_flight = strtobool( os.environ.get('PYARROW_WITH_FLIGHT', '0')) + self.with_dataset = strtobool( + os.environ.get('PYARROW_WITH_DATASET', '0')) self.with_parquet = strtobool( os.environ.get('PYARROW_WITH_PARQUET', '0')) self.with_static_parquet = strtobool( @@ -177,6 +180,7 @@ def initialize_options(self): '_json', '_cuda', '_flight', + '_dataset', '_parquet', '_orc', '_plasma', @@ -230,6 +234,7 @@ def append_cmake_bool(value, varname): append_cmake_bool(self.with_cuda, 'PYARROW_BUILD_CUDA') append_cmake_bool(self.with_flight, 'PYARROW_BUILD_FLIGHT') append_cmake_bool(self.with_gandiva, 'PYARROW_BUILD_GANDIVA') + append_cmake_bool(self.with_dataset, 'PYARROW_BUILD_DATASET') append_cmake_bool(self.with_orc, 'PYARROW_BUILD_ORC') append_cmake_bool(self.with_parquet, 'PYARROW_BUILD_PARQUET') append_cmake_bool(self.with_plasma, 'PYARROW_BUILD_PLASMA') @@ -355,6 +360,8 @@ def append_cmake_bool(value, varname): move_shared_libs(build_prefix, build_lib, "arrow_flight") move_shared_libs(build_prefix, build_lib, "arrow_python_flight") + if self.with_dataset: + move_shared_libs(build_prefix, build_lib, "arrow_dataset") if self.with_plasma: move_shared_libs(build_prefix, build_lib, "plasma") if self.with_gandiva: @@ -408,6 +415,8 @@ def _failure_permitted(self, name): return True if name == '_hdfs' and not self.with_hdfs: return True + if name == '_dataset' and not self.with_dataset: + return True if name == '_cuda' and not self.with_cuda: return True if name == 'gandiva' and not self.with_gandiva: