From 259f7f2ace39dadee62202ba6d5190862f7c9524 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 21 Jun 2022 18:53:07 +0200 Subject: [PATCH 01/31] Proof of concept --- python/pyarrow/_dataset.pxd | 1 + python/pyarrow/_dataset.pyx | 17 ++++++++++++++++- python/pyarrow/tests/test_dataset.py | 15 ++++++++++++--- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index a512477d501..d337656b1bf 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -47,6 +47,7 @@ cdef class Dataset(_Weakrefable): cdef: shared_ptr[CDataset] wrapped CDataset* dataset + public dict _scanner_options cdef void init(self, const shared_ptr[CDataset]& sp) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 5d4cf95087d..5ff6696ecd3 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -156,6 +156,7 @@ cdef class Dataset(_Weakrefable): cdef void init(self, const shared_ptr[CDataset]& sp): self.wrapped = sp self.dataset = sp.get() + self._scanner_options = {} @staticmethod cdef wrap(const shared_ptr[CDataset]& sp): @@ -296,7 +297,8 @@ cdef class Dataset(_Weakrefable): n_legs: [[2,4,4,100]] animal: [["Parrot","Dog","Horse","Centipede"]] """ - return Scanner.from_dataset(self, **kwargs) + scanner_options = {**self._scanner_options, **kwargs} + return Scanner.from_dataset(self, **scanner_options) def to_batches(self, **kwargs): """ @@ -385,6 +387,19 @@ cdef class Dataset(_Weakrefable): """The common schema of the full Dataset""" return pyarrow_wrap_schema(self.dataset.schema()) + def filter(self, expression): + cdef: + Dataset filtered_dataset + + if "filter" in self._scanner_options: + new_filter = self._scanner_options["filter"] & expression + else: + new_filter = expression + filtered_dataset = self.__class__.__new__(self.__class__) + filtered_dataset.init(self.wrapped) + filtered_dataset._scanner_options = dict(self._scanner_options, filter=new_filter) + return filtered_dataset + def join(self, right_dataset, keys, right_keys=None, join_type="left outer", left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6262e3a2fdf..20f9d8cb61f 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4854,10 +4854,19 @@ def test_dataset_filter(tempdir): ds.write_dataset(t1, tempdir / "t1", format="ipc") ds1 = ds.dataset(tempdir / "t1", format="ipc") - result = ds1.scanner(filter=pc.field("colA") < 3) + result = ds1.filter(pc.field("colA") < 3).filter(pc.field("col2") == "a") assert result.to_table() == pa.table({ - "colA": [1, 2], - "col2": ["a", "b"] + "colA": [1], + "col2": ["a"] + }) + + assert type(result) == ds.FileSystemDataset + + ds.write_dataset(result, tempdir / "filtered", format="ipc") + filtered = ds.dataset(tempdir / "filtered", format="ipc") + assert filtered.to_table() == pa.table({ + "colA": [1], + "col2": ["a"] }) From e3981e5596f5af7d685c697753c47fc3ceb2afbe Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 30 Jun 2022 14:42:58 +0200 Subject: [PATCH 02/31] Working joins --- cpp/src/arrow/compute/exec/options.cc | 12 ++++++ cpp/src/arrow/compute/exec/options.h | 4 ++ python/pyarrow/_dataset.pxd | 22 +++++++++- python/pyarrow/_dataset.pyx | 62 +++++++++++++++++++-------- python/pyarrow/_exec_plan.pyx | 23 +++++++++- python/pyarrow/dataset.py | 1 + python/pyarrow/includes/libarrow.pxd | 6 +++ python/pyarrow/tests/test_dataset.py | 26 +++++++++-- 8 files changed, 131 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.cc b/cpp/src/arrow/compute/exec/options.cc index ff66649e297..bb46b175a27 100644 --- a/cpp/src/arrow/compute/exec/options.cc +++ b/cpp/src/arrow/compute/exec/options.cc @@ -62,5 +62,17 @@ Result> SourceNodeOptions::FromTable( return std::make_shared(table.schema(), batch_gen); } +Result> SourceNodeOptions::FromRecordBatchReader( + std::shared_ptr reader, std::shared_ptr schema, + arrow::internal::Executor* exc) { + if (exc == nullptr) return Status::TypeError("No executor provided."); + + // Map the RecordBatchReader to a SourceNode + ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc)); + + return std::shared_ptr( + new SourceNodeOptions(schema, batch_gen)); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 8600b113489..430c691bee0 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -65,6 +65,10 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { static Result> FromTable(const Table& table, arrow::internal::Executor*); + static Result> FromRecordBatchReader( + std::shared_ptr reader, std::shared_ptr schema, + arrow::internal::Executor* exc); + std::shared_ptr output_schema; std::function>()> generator; }; diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index d337656b1bf..e8afc1da1e3 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -47,7 +47,6 @@ cdef class Dataset(_Weakrefable): cdef: shared_ptr[CDataset] wrapped CDataset* dataset - public dict _scanner_options cdef void init(self, const shared_ptr[CDataset]& sp) @@ -57,6 +56,27 @@ cdef class Dataset(_Weakrefable): cdef shared_ptr[CDataset] unwrap(self) nogil +cdef class FilteredDataset(Dataset): + + cdef: + public object _filter + + cdef Scanner _make_scanner(self, options) + + +cdef class Scanner(_Weakrefable): + cdef: + shared_ptr[CScanner] wrapped + CScanner* scanner + + cdef void init(self, const shared_ptr[CScanner]& sp) + + @staticmethod + cdef wrap(const shared_ptr[CScanner]& sp) + + cdef shared_ptr[CScanner] unwrap(self) + + cdef class FragmentScanOptions(_Weakrefable): cdef: diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 5ff6696ecd3..b2c7c4ec1cc 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -156,7 +156,6 @@ cdef class Dataset(_Weakrefable): cdef void init(self, const shared_ptr[CDataset]& sp): self.wrapped = sp self.dataset = sp.get() - self._scanner_options = {} @staticmethod cdef wrap(const shared_ptr[CDataset]& sp): @@ -297,8 +296,7 @@ cdef class Dataset(_Weakrefable): n_legs: [[2,4,4,100]] animal: [["Parrot","Dog","Horse","Centipede"]] """ - scanner_options = {**self._scanner_options, **kwargs} - return Scanner.from_dataset(self, **scanner_options) + return Scanner.from_dataset(self, **kwargs) def to_batches(self, **kwargs): """ @@ -388,17 +386,7 @@ cdef class Dataset(_Weakrefable): return pyarrow_wrap_schema(self.dataset.schema()) def filter(self, expression): - cdef: - Dataset filtered_dataset - - if "filter" in self._scanner_options: - new_filter = self._scanner_options["filter"] & expression - else: - new_filter = expression - filtered_dataset = self.__class__.__new__(self.__class__) - filtered_dataset.init(self.wrapped) - filtered_dataset._scanner_options = dict(self._scanner_options, filter=new_filter) - return filtered_dataset + return FilteredDataset(self, expression) def join(self, right_dataset, keys, right_keys=None, join_type="left outer", left_suffix=None, right_suffix=None, coalesce_keys=True, @@ -448,6 +436,45 @@ cdef class Dataset(_Weakrefable): use_threads=use_threads, coalesce_keys=coalesce_keys, output_type=InMemoryDataset) +cdef class FilteredDataset(Dataset): + """ + A Dataset with an applied filter. + + Parameters + ---------- + dataset : Dataset + The dataset to which the filter should be applied. + expression : Expression + The filter that should be applied to the dataset. + """ + def __init__(self, dataset, expression): + self.init((dataset).wrapped) + self._filter = expression + + cdef void init(self, const shared_ptr[CDataset]& sp): + Dataset.init(self, sp) + self._filter = None + + def filter(self, expression): + cdef: + FilteredDataset filtered_dataset + + if self._filter is not None: + new_filter = self._filter & expression + else: + new_filter = expression + filtered_dataset = self.__class__.__new__(self.__class__) + filtered_dataset.init(self.wrapped) + filtered_dataset._filter = new_filter + return filtered_dataset + + cdef Scanner _make_scanner(self, options): + scanner_options = dict(options, filter=self._filter) + return Scanner.from_dataset(self, **scanner_options) + + def scanner(self, **kwargs): + return self._make_scanner(kwargs) + cdef class InMemoryDataset(Dataset): """ @@ -2202,6 +2229,9 @@ cdef class RecordBatchIterator(_Weakrefable): self.iterator = make_shared[CRecordBatchIterator](move(iterator)) return self + cdef inline shared_ptr[CRecordBatchIterator] unwrap(self) nogil: + return self.iterator + def __iter__(self): return self @@ -2369,10 +2399,6 @@ cdef class Scanner(_Weakrefable): default pool. """ - cdef: - shared_ptr[CScanner] wrapped - CScanner* scanner - def __init__(self): _forbid_instantiation(self.__class__) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 167a5030f5c..f60c0f3a5d6 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -27,10 +27,11 @@ from cython.operator cimport dereference as deref, preincrement as inc from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * -from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table) +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) from pyarrow.lib import tobytes from pyarrow._compute cimport Expression, _true -from pyarrow._dataset cimport Dataset +from pyarrow._dataset cimport Dataset, FilteredDataset, Scanner from pyarrow._dataset import InMemoryDataset Initialize() # Initialise support for Datasets in ExecPlan @@ -66,11 +67,13 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads shared_ptr[CTable] c_in_table shared_ptr[CTable] c_out_table shared_ptr[CTableSourceNodeOptions] c_tablesourceopts + shared_ptr[CScanner] c_dataset_scanner shared_ptr[CScanNodeOptions] c_scanopts shared_ptr[CExecNodeOptions] c_input_node_opts shared_ptr[CSinkNodeOptions] c_sinkopts shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen shared_ptr[CRecordBatchReader] c_recordbatchreader + shared_ptr[CRecordBatchReader] c_recordbatchreader_in vector[CDeclaration].iterator plan_iter vector[CDeclaration.Input] no_c_inputs CStatus c_plan_status @@ -95,6 +98,22 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads c_in_table) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions]( c_tablesourceopts) + elif isinstance(ipt, FilteredDataset): + node_factory = "source" + c_in_dataset = (ipt).unwrap() + c_dataset_scanner = ( + ((ipt)._make_scanner({})).unwrap() + ) + c_recordbatchreader_in = ( + GetResultValue(deref(c_dataset_scanner).ToRecordBatchReader()) + ) + c_sourceopts = GetResultValue( + CSourceNodeOptions.FromRecordBatchReader(c_recordbatchreader_in, + deref(c_in_dataset).schema(), + c_executor) + ) + c_input_node_opts = static_pointer_cast[CExecNodeOptions, CSourceNodeOptions]( + c_sourceopts) elif isinstance(ipt, Dataset): node_factory = "scan" c_in_dataset = (ipt).unwrap() diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 6fbe00eace1..b41bb8e1e56 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -24,6 +24,7 @@ CsvFileFormat, CsvFragmentScanOptions, Dataset, + FilteredDataset, DatasetFactory, DirectoryPartitioning, FeatherFileFormat, diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9cea340a309..7c5b31df518 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2545,6 +2545,12 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions": pass + cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions): + @staticmethod + CResult[shared_ptr[CSourceNodeOptions]] FromRecordBatchReader( + shared_ptr[CRecordBatchReader] reader, shared_ptr[CSchema] schema, + CExecutor* exc) + cdef cppclass CTableSourceNodeOptions "arrow::compute::TableSourceNodeOptions"(CExecNodeOptions): CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t max_batch_size) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 20f9d8cb61f..9c7f520c4ea 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4846,13 +4846,21 @@ def test_dataset_join_collisions(tempdir): @pytest.mark.dataset -def test_dataset_filter(tempdir): +@pytest.mark.parametrize('dstype', [ + "fs", "mem" +]) +def test_dataset_filter(tempdir, dstype): t1 = pa.table({ "colA": [1, 2, 6], "col2": ["a", "b", "f"] }) - ds.write_dataset(t1, tempdir / "t1", format="ipc") - ds1 = ds.dataset(tempdir / "t1", format="ipc") + if dstype == "fs": + ds.write_dataset(t1, tempdir / "t1", format="ipc") + ds1 = ds.dataset(tempdir / "t1", format="ipc") + elif dstype == "mem": + ds1 = ds.dataset(t1) + else: + raise NotImplementedError result = ds1.filter(pc.field("colA") < 3).filter(pc.field("col2") == "a") assert result.to_table() == pa.table({ @@ -4860,7 +4868,7 @@ def test_dataset_filter(tempdir): "col2": ["a"] }) - assert type(result) == ds.FileSystemDataset + assert type(result) == ds.FilteredDataset ds.write_dataset(result, tempdir / "filtered", format="ipc") filtered = ds.dataset(tempdir / "filtered", format="ipc") @@ -4869,6 +4877,16 @@ def test_dataset_filter(tempdir): "col2": ["a"] }) + joined = result.join(ds.dataset(pa.table({ + "colB": [10, 20], + "col2": ["a", "b"] + })), keys="col2", join_type="right outer") + assert joined.to_table().combine_chunks() == pa.table({ + "colA": [1, None], + "colB": [10, 20], + "col2": ["a", "b"] + }) + def test_write_dataset_with_scanner_use_projected_schema(tempdir): """ From 4f50a8be627cb71636340c770be05d52e66415d8 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 30 Jun 2022 15:01:41 +0200 Subject: [PATCH 03/31] lint --- python/pyarrow/_dataset.pyx | 3 ++- python/pyarrow/_exec_plan.pyx | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index b2c7c4ec1cc..dd6389c4822 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -447,6 +447,7 @@ cdef class FilteredDataset(Dataset): expression : Expression The filter that should be applied to the dataset. """ + def __init__(self, dataset, expression): self.init((dataset).wrapped) self._filter = expression @@ -467,7 +468,7 @@ cdef class FilteredDataset(Dataset): filtered_dataset.init(self.wrapped) filtered_dataset._filter = new_filter return filtered_dataset - + cdef Scanner _make_scanner(self, options): scanner_options = dict(options, filter=self._filter) return Scanner.from_dataset(self, **scanner_options) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index f60c0f3a5d6..ded54e95505 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -27,7 +27,7 @@ from cython.operator cimport dereference as deref, preincrement as inc from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * -from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, RecordBatchReader) from pyarrow.lib import tobytes from pyarrow._compute cimport Expression, _true @@ -108,8 +108,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads GetResultValue(deref(c_dataset_scanner).ToRecordBatchReader()) ) c_sourceopts = GetResultValue( - CSourceNodeOptions.FromRecordBatchReader(c_recordbatchreader_in, - deref(c_in_dataset).schema(), + CSourceNodeOptions.FromRecordBatchReader(c_recordbatchreader_in, + deref( + c_in_dataset).schema(), c_executor) ) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CSourceNodeOptions]( From 545067ad77669b78a9800b6a7521b851dfbef0e9 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 4 Jul 2022 18:31:49 +0200 Subject: [PATCH 04/31] Ensure standard dataset methods keep the filter --- python/pyarrow/_dataset.pyx | 8 ++++++++ python/pyarrow/tests/test_dataset.py | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index dd6389c4822..3d20758ef93 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -386,6 +386,14 @@ cdef class Dataset(_Weakrefable): return pyarrow_wrap_schema(self.dataset.schema()) def filter(self, expression): + """ + Apply a row filter to the dataset. + + Parameters + ---------- + expression : Expression + The filter that should be applied to the dataset. + """ return FilteredDataset(self, expression) def join(self, right_dataset, keys, right_keys=None, join_type="left outer", diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 9c7f520c4ea..f6e9dc51d32 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4870,6 +4870,11 @@ def test_dataset_filter(tempdir, dstype): assert type(result) == ds.FilteredDataset + assert result.head(5) == pa.table({ + "colA": [1], + "col2": ["a"] + }) + ds.write_dataset(result, tempdir / "filtered", format="ipc") filtered = ds.dataset(tempdir / "filtered", format="ipc") assert filtered.to_table() == pa.table({ From 4441003455c2ed30aa559808c684372caf1a53f1 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 4 Jul 2022 18:41:57 +0200 Subject: [PATCH 05/31] Document Dataset.filter --- docs/source/python/compute.rst | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/docs/source/python/compute.rst b/docs/source/python/compute.rst index fe7f333300f..12738151c74 100644 --- a/docs/source/python/compute.rst +++ b/docs/source/python/compute.rst @@ -368,8 +368,19 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter: nums: [[6,8,10]] chars: [["f","h","l"]] -:class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` method -passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset documentation. +:class:`.Dataset` can be filtered with :meth:`.Dataset.filter` method too. +The method will return an instance of `.FilteredDataset` which will lazily +apply the filter as soon as actual data of the dataset is accessed: + + >>> dataset = ds.dataset(table) + >>> filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2) + >>> filtered.to_table() + pyarrow.Table + nums: int64 + chars: string + ---- + nums: [[3,4]] + chars: [["c","d"]] User-Defined Functions From 1d33ee31e02c2a01dc3252b9c3616560917402a9 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 4 Jul 2022 19:01:52 +0200 Subject: [PATCH 06/31] Lint --- cpp/src/arrow/compute/exec/options.cc | 3 +-- cpp/src/arrow/compute/exec/options.h | 4 ++-- docs/source/python/compute.rst | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.cc b/cpp/src/arrow/compute/exec/options.cc index bb46b175a27..58cfb34f526 100644 --- a/cpp/src/arrow/compute/exec/options.cc +++ b/cpp/src/arrow/compute/exec/options.cc @@ -70,8 +70,7 @@ Result> SourceNodeOptions::FromRecordBatchRea // Map the RecordBatchReader to a SourceNode ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc)); - return std::shared_ptr( - new SourceNodeOptions(schema, batch_gen)); + return std::shared_ptr(new SourceNodeOptions(schema, batch_gen)); } } // namespace compute diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 430c691bee0..cb18fd6684b 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -66,8 +66,8 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { arrow::internal::Executor*); static Result> FromRecordBatchReader( - std::shared_ptr reader, std::shared_ptr schema, - arrow::internal::Executor* exc); + std::shared_ptr reader, std::shared_ptr schema, + arrow::internal::Executor* exc); std::shared_ptr output_schema; std::function>()> generator; diff --git a/docs/source/python/compute.rst b/docs/source/python/compute.rst index 12738151c74..8ce6c680d54 100644 --- a/docs/source/python/compute.rst +++ b/docs/source/python/compute.rst @@ -369,7 +369,7 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter: chars: [["f","h","l"]] :class:`.Dataset` can be filtered with :meth:`.Dataset.filter` method too. -The method will return an instance of `.FilteredDataset` which will lazily +The method will return an instance of :class:`.FilteredDataset` which will lazily apply the filter as soon as actual data of the dataset is accessed: >>> dataset = ds.dataset(table) From 9d3b4a97fb276fe0a057dea50513ac58b18f0e47 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 4 Jul 2022 19:04:03 +0200 Subject: [PATCH 07/31] Add class to reference --- docs/source/python/api/dataset.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/python/api/dataset.rst b/docs/source/python/api/dataset.rst index 866c67440c5..bda3cc1b627 100644 --- a/docs/source/python/api/dataset.rst +++ b/docs/source/python/api/dataset.rst @@ -60,6 +60,7 @@ Classes HivePartitioning FilenamePartitioning Dataset + FilteredDataset FileSystemDataset FileSystemFactoryOptions FileSystemDatasetFactory From e51ac733906213c9df5e4af1f559efcdfb9a23d6 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 5 Jul 2022 13:15:03 +0200 Subject: [PATCH 08/31] tweak variable name --- cpp/src/arrow/compute/exec/options.cc | 12 ++++++------ cpp/src/arrow/compute/exec/options.h | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.cc b/cpp/src/arrow/compute/exec/options.cc index 58cfb34f526..b6c22260cbc 100644 --- a/cpp/src/arrow/compute/exec/options.cc +++ b/cpp/src/arrow/compute/exec/options.cc @@ -51,24 +51,24 @@ std::string ToString(JoinType t) { } Result> SourceNodeOptions::FromTable( - const Table& table, arrow::internal::Executor* exc) { + const Table& table, arrow::internal::Executor* executor) { std::shared_ptr reader = std::make_shared(table); - if (exc == nullptr) return Status::TypeError("No executor provided."); + if (executor == nullptr) return Status::TypeError("No executor provided."); // Map the RecordBatchReader to a SourceNode - ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc)); + ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor)); return std::make_shared(table.schema(), batch_gen); } Result> SourceNodeOptions::FromRecordBatchReader( std::shared_ptr reader, std::shared_ptr schema, - arrow::internal::Executor* exc) { - if (exc == nullptr) return Status::TypeError("No executor provided."); + arrow::internal::Executor* executor) { + if (executor == nullptr) return Status::TypeError("No executor provided."); // Map the RecordBatchReader to a SourceNode - ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc)); + ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor)); return std::shared_ptr(new SourceNodeOptions(schema, batch_gen)); } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index cb18fd6684b..5b2c984eec3 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -67,7 +67,7 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { static Result> FromRecordBatchReader( std::shared_ptr reader, std::shared_ptr schema, - arrow::internal::Executor* exc); + arrow::internal::Executor*); std::shared_ptr output_schema; std::function>()> generator; From 87a828fff249b9caf9472f0f96d34b8a5ee690aa Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 5 Jul 2022 16:30:24 +0200 Subject: [PATCH 09/31] Update docs/source/python/compute.rst Co-authored-by: Antoine Pitrou --- docs/source/python/compute.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/python/compute.rst b/docs/source/python/compute.rst index 8ce6c680d54..26f29110978 100644 --- a/docs/source/python/compute.rst +++ b/docs/source/python/compute.rst @@ -368,7 +368,7 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter: nums: [[6,8,10]] chars: [["f","h","l"]] -:class:`.Dataset` can be filtered with :meth:`.Dataset.filter` method too. +:class:`.Dataset` can similarly be filtered with the :meth:`.Dataset.filter` method. The method will return an instance of :class:`.FilteredDataset` which will lazily apply the filter as soon as actual data of the dataset is accessed: From c932b97d13a2151b304731e99dc6384999ed6769 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 5 Jul 2022 16:34:05 +0200 Subject: [PATCH 10/31] Update python/pyarrow/_dataset.pyx Co-authored-by: Antoine Pitrou --- python/pyarrow/_dataset.pyx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3d20758ef93..4e13ec51a00 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -478,8 +478,7 @@ cdef class FilteredDataset(Dataset): return filtered_dataset cdef Scanner _make_scanner(self, options): - scanner_options = dict(options, filter=self._filter) - return Scanner.from_dataset(self, **scanner_options) + return Scanner.from_dataset(self, filter=self._filter, **options) def scanner(self, **kwargs): return self._make_scanner(kwargs) From 30d240937644d80b5118c9fac06e29ab9eff809e Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 5 Jul 2022 16:46:21 +0200 Subject: [PATCH 11/31] Remove unecessary casts --- python/pyarrow/_exec_plan.pyx | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index ded54e95505..76281e5ff67 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -101,11 +101,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads elif isinstance(ipt, FilteredDataset): node_factory = "source" c_in_dataset = (ipt).unwrap() - c_dataset_scanner = ( - ((ipt)._make_scanner({})).unwrap() - ) - c_recordbatchreader_in = ( - GetResultValue(deref(c_dataset_scanner).ToRecordBatchReader()) + c_dataset_scanner = (ipt)._make_scanner({}).unwrap() + c_recordbatchreader_in = GetResultValue( + deref(c_dataset_scanner).ToRecordBatchReader() ) c_sourceopts = GetResultValue( CSourceNodeOptions.FromRecordBatchReader(c_recordbatchreader_in, From 1f406de6cf19e21b16a203cb35ad104b9e5499d1 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 5 Jul 2022 16:49:12 +0200 Subject: [PATCH 12/31] move schema --- cpp/src/arrow/compute/exec/options.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/options.cc b/cpp/src/arrow/compute/exec/options.cc index b6c22260cbc..1b92600da6f 100644 --- a/cpp/src/arrow/compute/exec/options.cc +++ b/cpp/src/arrow/compute/exec/options.cc @@ -70,7 +70,8 @@ Result> SourceNodeOptions::FromRecordBatchRea // Map the RecordBatchReader to a SourceNode ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor)); - return std::shared_ptr(new SourceNodeOptions(schema, batch_gen)); + return std::shared_ptr( + new SourceNodeOptions(std::move(schema), batch_gen)); } } // namespace compute From 6523ed53568f69bd9df41c786fe0d32aa2923132 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 5 Jul 2022 19:24:52 +0200 Subject: [PATCH 13/31] better error and docstrings --- python/pyarrow/_dataset.pyx | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 4e13ec51a00..84e232813a1 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -393,6 +393,10 @@ cdef class Dataset(_Weakrefable): ---------- expression : Expression The filter that should be applied to the dataset. + + Returns + ------- + FilteredDataset """ return FilteredDataset(self, expression) @@ -465,6 +469,10 @@ cdef class FilteredDataset(Dataset): self._filter = None def filter(self, expression): + """Apply an additional row filter to the filtered dataset. + + See :meth:`.Dataset.filter` for documentation. + """ cdef: FilteredDataset filtered_dataset @@ -478,9 +486,18 @@ cdef class FilteredDataset(Dataset): return filtered_dataset cdef Scanner _make_scanner(self, options): + if "filter" in options: + raise ValueError( + "Passing filter in scanner option is not valid for FilteredDataset." + ) return Scanner.from_dataset(self, filter=self._filter, **options) def scanner(self, **kwargs): + """Build a scan operation against the dataset. + + See :meth:`.Dataset.scanner` for list of supported + arguments. + """ return self._make_scanner(kwargs) From 3476dfbf9a85ac8b42c93720b9b4c4c8599533eb Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 5 Jul 2022 19:44:44 +0200 Subject: [PATCH 14/31] Tweak docstrings --- python/pyarrow/_dataset.pyx | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 84e232813a1..ae07b41fa23 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -239,7 +239,7 @@ cdef class Dataset(_Weakrefable): which exposes further operations (e.g. loading all data as a table, counting rows). - See the `Scanner.from_dataset` method for further information. + See the :meth:`Scanner.from_dataset` method for further information. Parameters ---------- @@ -471,7 +471,14 @@ cdef class FilteredDataset(Dataset): def filter(self, expression): """Apply an additional row filter to the filtered dataset. - See :meth:`.Dataset.filter` for documentation. + Parameters + ---------- + expression : Expression + The filter that should be applied to the dataset. + + Returns + ------- + FilteredDataset """ cdef: FilteredDataset filtered_dataset @@ -495,8 +502,16 @@ cdef class FilteredDataset(Dataset): def scanner(self, **kwargs): """Build a scan operation against the dataset. - See :meth:`.Dataset.scanner` for list of supported - arguments. + See :meth:`.Dataset.scanner` for additional information + + Parameters + ---------- + **kwargs : dict, optional + Arguments for :meth:`Scanner.from_dataset`. + + Returns + ------- + scanner : Scanner """ return self._make_scanner(kwargs) From 569cd80ba01aff9ea5940977abea1be16bd2ec60 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 7 Jul 2022 11:40:38 +0200 Subject: [PATCH 15/31] Update cpp/src/arrow/compute/exec/options.cc Co-authored-by: Weston Pace --- cpp/src/arrow/compute/exec/options.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.cc b/cpp/src/arrow/compute/exec/options.cc index 1b92600da6f..9e9da7ad831 100644 --- a/cpp/src/arrow/compute/exec/options.cc +++ b/cpp/src/arrow/compute/exec/options.cc @@ -70,8 +70,7 @@ Result> SourceNodeOptions::FromRecordBatchRea // Map the RecordBatchReader to a SourceNode ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor)); - return std::shared_ptr( - new SourceNodeOptions(std::move(schema), batch_gen)); + return std::make_shared(std::move(schema), std::move(batch_gen)); } } // namespace compute From 4401496510f07793e8246028265be09eec540858 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 13 Jul 2022 16:50:11 +0100 Subject: [PATCH 16/31] Refactoring --- cpp/src/arrow/dataset/scanner.cc | 9 +++++++-- python/pyarrow/_dataset.pyx | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index b4baab5a09f..27746f5be8e 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -927,12 +927,17 @@ Status ScannerBuilder::Backpressure(compute::BackpressureOptions backpressure) { return Status::OK(); } -Result> ScannerBuilder::Finish() { +Result> ScannerBuilder::GetScanOptions() { if (!scan_options_->projection.IsBound()) { RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names())); } - return std::make_shared(dataset_, scan_options_); + return scan_options_; +} + +Result> ScannerBuilder::Finish() { + ARROW_ASSIGN_OR_RAISE(auto scan_options, GetScanOptions()); + return std::make_shared(dataset_, scan_options); } namespace { diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index ae07b41fa23..03036371a2f 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -489,7 +489,7 @@ cdef class FilteredDataset(Dataset): new_filter = expression filtered_dataset = self.__class__.__new__(self.__class__) filtered_dataset.init(self.wrapped) - filtered_dataset._filter = new_filter + filtered_dataset._scan_options = dict(filter=new_filter) return filtered_dataset cdef Scanner _make_scanner(self, options): From b134425aaeafb178e21cf93c1226e5bbeaabbeae Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 15 Sep 2022 11:40:48 +0200 Subject: [PATCH 17/31] Allow to create ScanOptions alone --- cpp/src/arrow/dataset/scanner.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 7d6dd0e1a8c..dbd0eeb9165 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -520,6 +520,9 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Override default backpressure configuration Status Backpressure(compute::BackpressureOptions backpressure); + /// \brief Return the current scan options for the builder. + Result> GetScanOptions(); + /// \brief Return the constructed now-immutable Scanner object Result> Finish(); From 737dac22147dc359a3fbf16ec4437c9d6b1e5e95 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 10 Oct 2022 17:37:47 +0200 Subject: [PATCH 18/31] Working filter and join on filtered datasets --- python/pyarrow/_dataset.pxd | 7 +- python/pyarrow/_dataset.pyx | 89 +++++++++++--------- python/pyarrow/_exec_plan.pyx | 44 ++++------ python/pyarrow/includes/libarrow_dataset.pxd | 2 + python/pyarrow/tests/test_dataset.py | 2 +- 5 files changed, 75 insertions(+), 69 deletions(-) diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index e8afc1da1e3..b7c5e436a70 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -59,9 +59,7 @@ cdef class Dataset(_Weakrefable): cdef class FilteredDataset(Dataset): cdef: - public object _filter - - cdef Scanner _make_scanner(self, options) + public dict _scan_options cdef class Scanner(_Weakrefable): @@ -76,6 +74,9 @@ cdef class Scanner(_Weakrefable): cdef shared_ptr[CScanner] unwrap(self) + @staticmethod + cdef shared_ptr[CScanOptions] _make_scan_options(Dataset dataset, dict py_scanoptions) except * + cdef class FragmentScanOptions(_Weakrefable): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 03036371a2f..1da1bfd1b5d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -231,6 +231,9 @@ cdef class Dataset(_Weakrefable): for maybe_fragment in c_fragments: yield Fragment.wrap(GetResultValue(move(maybe_fragment))) + def scanner_options(self, options): + return options + def scanner(self, **kwargs): """ Build a scan operation against the dataset. @@ -462,11 +465,11 @@ cdef class FilteredDataset(Dataset): def __init__(self, dataset, expression): self.init((dataset).wrapped) - self._filter = expression + self._scan_options = dict(filter=expression) cdef void init(self, const shared_ptr[CDataset]& sp): Dataset.init(self, sp) - self._filter = None + self._scan_options = dict() def filter(self, expression): """Apply an additional row filter to the filtered dataset. @@ -483,38 +486,27 @@ cdef class FilteredDataset(Dataset): cdef: FilteredDataset filtered_dataset - if self._filter is not None: - new_filter = self._filter & expression - else: + try: + new_filter = self._scan_options["filter"] & expression + except KeyError: new_filter = expression filtered_dataset = self.__class__.__new__(self.__class__) filtered_dataset.init(self.wrapped) filtered_dataset._scan_options = dict(filter=new_filter) return filtered_dataset - cdef Scanner _make_scanner(self, options): - if "filter" in options: - raise ValueError( - "Passing filter in scanner option is not valid for FilteredDataset." - ) - return Scanner.from_dataset(self, filter=self._filter, **options) - - def scanner(self, **kwargs): - """Build a scan operation against the dataset. - - See :meth:`.Dataset.scanner` for additional information - - Parameters - ---------- - **kwargs : dict, optional - Arguments for :meth:`Scanner.from_dataset`. - - Returns - ------- - scanner : Scanner - """ - return self._make_scanner(kwargs) + def scanner_options(self, options): + new_options = options.copy() + # at the moment only support filter + requested_filter = options.get("filter") + current_filter = self._scan_options.get("filter") + if requested_filter is not None and current_filter is not None: + new_options["filter"] = current_filter & requested_filter + elif current_filter is not None: + new_options["filter"] = current_filter + + return new_options cdef class InMemoryDataset(Dataset): """ @@ -2456,8 +2448,32 @@ cdef class Scanner(_Weakrefable): return self.wrapped @staticmethod - def from_dataset(Dataset dataset not None, *, object columns=None, - Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, + cdef shared_ptr[CScanOptions] _make_scan_options(Dataset dataset, dict py_scanoptions) except *: + cdef: + shared_ptr[CScannerBuilder] builder = make_shared[CScannerBuilder](dataset.unwrap()) + + py_scanoptions = dataset.scanner_options(py_scanoptions) + + # Need to explicitly expand the arguments as Cython doesn't support + # keyword expansion in cdef functions. + _populate_builder(builder, + columns=py_scanoptions.get("columns"), + filter=py_scanoptions.get("filter"), + batch_size=py_scanoptions.get("batch_size", _DEFAULT_BATCH_SIZE), + batch_readahead=py_scanoptions.get("batch_readahead", _DEFAULT_BATCH_READAHEAD), + fragment_readahead=py_scanoptions.get("fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), + use_threads=py_scanoptions.get("use_threads", True), + memory_pool=py_scanoptions.get("memory_pool"), + fragment_scan_options=py_scanoptions.get("fragment_scan_options")) + + return GetResultValue(deref(builder).GetScanOptions()) + + @staticmethod + def from_dataset(Dataset dataset not None, + bint use_threads=True, object use_async=None, + MemoryPool memory_pool=None, + object columns=None, Expression filter=None, + int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, @@ -2521,8 +2537,12 @@ cdef class Scanner(_Weakrefable): default pool. """ cdef: - shared_ptr[CScanOptions] options = make_shared[CScanOptions]() - shared_ptr[CScannerBuilder] builder + shared_ptr[CScanOptions] options = Scanner._make_scan_options(dataset, dict(columns=columns, filter=filter, + batch_size=batch_size, batch_readahead=batch_readahead, + fragment_readahead=fragment_readahead, use_threads=use_threads, + memory_pool=memory_pool, + fragment_scan_options=fragment_scan_options)) + shared_ptr[CScannerBuilder] builder = make_shared[CScannerBuilder](dataset.unwrap(), options) shared_ptr[CScanner] scanner if use_async is not None: @@ -2530,13 +2550,6 @@ cdef class Scanner(_Weakrefable): 'effect. It will be removed in the next release.', FutureWarning) - builder = make_shared[CScannerBuilder](dataset.unwrap(), options) - _populate_builder(builder, columns=columns, filter=filter, - batch_size=batch_size, batch_readahead=batch_readahead, - fragment_readahead=fragment_readahead, use_threads=use_threads, - memory_pool=memory_pool, - fragment_scan_options=fragment_scan_options) - scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 76281e5ff67..1a52dd33a3a 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -59,6 +59,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads CExecutor *c_executor shared_ptr[CExecContext] c_exec_context shared_ptr[CExecPlan] c_exec_plan + CDeclaration current_decl vector[CDeclaration] c_decls vector[CExecNode*] _empty vector[CExecNode*] c_final_node_vec @@ -92,50 +93,39 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads # Create source nodes for each input for ipt in inputs: if isinstance(ipt, Table): - node_factory = "table_source" c_in_table = pyarrow_unwrap_table(ipt) c_tablesourceopts = make_shared[CTableSourceNodeOptions]( c_in_table) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions]( c_tablesourceopts) - elif isinstance(ipt, FilteredDataset): - node_factory = "source" - c_in_dataset = (ipt).unwrap() - c_dataset_scanner = (ipt)._make_scanner({}).unwrap() - c_recordbatchreader_in = GetResultValue( - deref(c_dataset_scanner).ToRecordBatchReader() - ) - c_sourceopts = GetResultValue( - CSourceNodeOptions.FromRecordBatchReader(c_recordbatchreader_in, - deref( - c_in_dataset).schema(), - c_executor) - ) - c_input_node_opts = static_pointer_cast[CExecNodeOptions, CSourceNodeOptions]( - c_sourceopts) + + current_decl = CDeclaration(tobytes("table_source"), no_c_inputs, c_input_node_opts) elif isinstance(ipt, Dataset): - node_factory = "scan" c_in_dataset = (ipt).unwrap() c_scanopts = make_shared[CScanNodeOptions]( - c_in_dataset, make_shared[CScanOptions]()) - deref(deref(c_scanopts).scan_options).use_threads = use_threads + c_in_dataset, Scanner._make_scan_options(ipt, {"use_threads": use_threads})) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CScanNodeOptions]( c_scanopts) + + # Filters applied in CScanNodeOptions are "best effort" for the scan node itself, + # so we always need to inject an additional Filter node to apply them for real. + current_decl = CDeclaration(tobytes("filter"), no_c_inputs, + static_pointer_cast[CExecNodeOptions, CFilterNodeOptions]( + make_shared[CFilterNodeOptions](deref(deref(c_scanopts).scan_options).filter, True) + ) + ) + current_decl.inputs.push_back( + CDeclaration.Input(CDeclaration(tobytes("scan"), no_c_inputs, c_input_node_opts)) + ) else: raise TypeError("Unsupported type") if plan_iter != plan.end(): # Flag the source as the input of the first plan node. - deref(plan_iter).inputs.push_back(CDeclaration.Input( - CDeclaration(tobytes(node_factory), - no_c_inputs, c_input_node_opts) - )) + deref(plan_iter).inputs.push_back(CDeclaration.Input(current_decl)) else: # Empty plan, make the source the first plan node. - c_decls.push_back( - CDeclaration(tobytes(node_factory), - no_c_inputs, c_input_node_opts) - ) + c_decls.push_back(current_decl) # Add Here additional nodes while plan_iter != plan.end(): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index e69c3cbcaf2..b75eafcdeea 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -54,6 +54,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CSchema] dataset_schema shared_ptr[CSchema] projected_schema c_bool use_threads + CExpression filter cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions): CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options) @@ -126,6 +127,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CStatus FragmentReadahead(int32_t fragment_readahead) CStatus FragmentScanOptions( shared_ptr[CFragmentScanOptions] fragment_scan_options) + CResult[shared_ptr[CScanOptions]] GetScanOptions() CResult[shared_ptr[CScanner]] Finish() shared_ptr[CSchema] schema() const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index f6e9dc51d32..282abc977c1 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4886,7 +4886,7 @@ def test_dataset_filter(tempdir, dstype): "colB": [10, 20], "col2": ["a", "b"] })), keys="col2", join_type="right outer") - assert joined.to_table().combine_chunks() == pa.table({ + assert joined.to_table().sort_by("colB") == pa.table({ "colA": [1, None], "colB": [10, 20], "col2": ["a", "b"] From 94e6713db2e3695409b872d153e4771f2c54ce37 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 10 Oct 2022 17:39:15 +0200 Subject: [PATCH 19/31] lint --- python/pyarrow/_dataset.pyx | 25 ++++++++++++++----------- python/pyarrow/_exec_plan.pyx | 17 ++++++++++------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 1da1bfd1b5d..fd3ea2e1af1 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -504,8 +504,8 @@ cdef class FilteredDataset(Dataset): if requested_filter is not None and current_filter is not None: new_options["filter"] = current_filter & requested_filter elif current_filter is not None: - new_options["filter"] = current_filter - + new_options["filter"] = current_filter + return new_options cdef class InMemoryDataset(Dataset): @@ -2456,12 +2456,15 @@ cdef class Scanner(_Weakrefable): # Need to explicitly expand the arguments as Cython doesn't support # keyword expansion in cdef functions. - _populate_builder(builder, - columns=py_scanoptions.get("columns"), + _populate_builder(builder, + columns=py_scanoptions.get("columns"), filter=py_scanoptions.get("filter"), - batch_size=py_scanoptions.get("batch_size", _DEFAULT_BATCH_SIZE), - batch_readahead=py_scanoptions.get("batch_readahead", _DEFAULT_BATCH_READAHEAD), - fragment_readahead=py_scanoptions.get("fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), + batch_size=py_scanoptions.get( + "batch_size", _DEFAULT_BATCH_SIZE), + batch_readahead=py_scanoptions.get( + "batch_readahead", _DEFAULT_BATCH_READAHEAD), + fragment_readahead=py_scanoptions.get( + "fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), use_threads=py_scanoptions.get("use_threads", True), memory_pool=py_scanoptions.get("memory_pool"), fragment_scan_options=py_scanoptions.get("fragment_scan_options")) @@ -2538,10 +2541,10 @@ cdef class Scanner(_Weakrefable): """ cdef: shared_ptr[CScanOptions] options = Scanner._make_scan_options(dataset, dict(columns=columns, filter=filter, - batch_size=batch_size, batch_readahead=batch_readahead, - fragment_readahead=fragment_readahead, use_threads=use_threads, - memory_pool=memory_pool, - fragment_scan_options=fragment_scan_options)) + batch_size=batch_size, batch_readahead=batch_readahead, + fragment_readahead=fragment_readahead, use_threads=use_threads, + memory_pool=memory_pool, + fragment_scan_options=fragment_scan_options)) shared_ptr[CScannerBuilder] builder = make_shared[CScannerBuilder](dataset.unwrap(), options) shared_ptr[CScanner] scanner diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 1a52dd33a3a..aeb47e08bbf 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -98,8 +98,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads c_in_table) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions]( c_tablesourceopts) - - current_decl = CDeclaration(tobytes("table_source"), no_c_inputs, c_input_node_opts) + + current_decl = CDeclaration( + tobytes("table_source"), no_c_inputs, c_input_node_opts) elif isinstance(ipt, Dataset): c_in_dataset = (ipt).unwrap() c_scanopts = make_shared[CScanNodeOptions]( @@ -109,13 +110,15 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads # Filters applied in CScanNodeOptions are "best effort" for the scan node itself, # so we always need to inject an additional Filter node to apply them for real. - current_decl = CDeclaration(tobytes("filter"), no_c_inputs, - static_pointer_cast[CExecNodeOptions, CFilterNodeOptions]( - make_shared[CFilterNodeOptions](deref(deref(c_scanopts).scan_options).filter, True) - ) + current_decl = CDeclaration(tobytes("filter"), no_c_inputs, + static_pointer_cast[CExecNodeOptions, CFilterNodeOptions]( + make_shared[CFilterNodeOptions]( + deref(deref(c_scanopts).scan_options).filter, True) + ) ) current_decl.inputs.push_back( - CDeclaration.Input(CDeclaration(tobytes("scan"), no_c_inputs, c_input_node_opts)) + CDeclaration.Input(CDeclaration( + tobytes("scan"), no_c_inputs, c_input_node_opts)) ) else: raise TypeError("Unsupported type") From 16afe333a51c13fc157fdffcd04184a0e2306486 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 24 Nov 2022 14:50:01 +0100 Subject: [PATCH 20/31] Test with chained filtering --- python/pyarrow/_dataset.pyx | 7 +++---- python/pyarrow/_exec_plan.pyx | 2 +- python/pyarrow/tests/test_dataset.py | 13 +++++++++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index fd3ea2e1af1..a6a1f7c0e11 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2472,10 +2472,9 @@ cdef class Scanner(_Weakrefable): return GetResultValue(deref(builder).GetScanOptions()) @staticmethod - def from_dataset(Dataset dataset not None, - bint use_threads=True, object use_async=None, - MemoryPool memory_pool=None, - object columns=None, Expression filter=None, + def from_dataset(Dataset dataset not None, *, + object columns=None, + Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index aeb47e08bbf..d98b407995c 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -113,7 +113,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads current_decl = CDeclaration(tobytes("filter"), no_c_inputs, static_pointer_cast[CExecNodeOptions, CFilterNodeOptions]( make_shared[CFilterNodeOptions]( - deref(deref(c_scanopts).scan_options).filter, True) + deref(deref(c_scanopts).scan_options).filter) ) ) current_decl.inputs.push_back( diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 282abc977c1..a0d447f8eb3 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4851,8 +4851,8 @@ def test_dataset_join_collisions(tempdir): ]) def test_dataset_filter(tempdir, dstype): t1 = pa.table({ - "colA": [1, 2, 6], - "col2": ["a", "b", "f"] + "colA": [1, 2, 6, 8], + "col2": ["a", "b", "f", "g"] }) if dstype == "fs": ds.write_dataset(t1, tempdir / "t1", format="ipc") @@ -4862,6 +4862,7 @@ def test_dataset_filter(tempdir, dstype): else: raise NotImplementedError + # Ensure chained filtering works. result = ds1.filter(pc.field("colA") < 3).filter(pc.field("col2") == "a") assert result.to_table() == pa.table({ "colA": [1], @@ -4875,6 +4876,14 @@ def test_dataset_filter(tempdir, dstype): "col2": ["a"] }) + # Ensure that further filtering with scanners works too + r2 = ds1.filter(pc.field("colA") < 8).filter(pc.field("colA") > 1).scanner(filter=pc.field("colA") != 6) + assert r2.to_table() == pa.table({ + "colA": [2], + "col2": ["b"] + }) + + # Ensure that writing back to disk works. ds.write_dataset(result, tempdir / "filtered", format="ipc") filtered = ds.dataset(tempdir / "filtered", format="ipc") assert filtered.to_table() == pa.table({ From 3f83890fcae67f303ca6cda933e831e1b2e11ebc Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 24 Nov 2022 15:20:46 +0100 Subject: [PATCH 21/31] Remove usage of FilteredDataset class --- docs/source/python/api/dataset.rst | 1 - python/pyarrow/_dataset.pxd | 7 +-- python/pyarrow/_dataset.pyx | 93 ++++++++++------------------ python/pyarrow/_exec_plan.pyx | 2 +- python/pyarrow/dataset.py | 1 - python/pyarrow/tests/test_dataset.py | 9 ++- 6 files changed, 39 insertions(+), 74 deletions(-) diff --git a/docs/source/python/api/dataset.rst b/docs/source/python/api/dataset.rst index bda3cc1b627..866c67440c5 100644 --- a/docs/source/python/api/dataset.rst +++ b/docs/source/python/api/dataset.rst @@ -60,7 +60,6 @@ Classes HivePartitioning FilenamePartitioning Dataset - FilteredDataset FileSystemDataset FileSystemFactoryOptions FileSystemDatasetFactory diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index b7c5e436a70..d626b42e238 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -47,6 +47,7 @@ cdef class Dataset(_Weakrefable): cdef: shared_ptr[CDataset] wrapped CDataset* dataset + public dict _scan_options cdef void init(self, const shared_ptr[CDataset]& sp) @@ -56,12 +57,6 @@ cdef class Dataset(_Weakrefable): cdef shared_ptr[CDataset] unwrap(self) nogil -cdef class FilteredDataset(Dataset): - - cdef: - public dict _scan_options - - cdef class Scanner(_Weakrefable): cdef: shared_ptr[CScanner] wrapped diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index a6a1f7c0e11..ea1055009ae 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -156,6 +156,7 @@ cdef class Dataset(_Weakrefable): cdef void init(self, const shared_ptr[CDataset]& sp): self.wrapped = sp self.dataset = sp.get() + self._scan_options = dict() @staticmethod cdef wrap(const shared_ptr[CDataset]& sp): @@ -231,8 +232,23 @@ cdef class Dataset(_Weakrefable): for maybe_fragment in c_fragments: yield Fragment.wrap(GetResultValue(move(maybe_fragment))) - def scanner_options(self, options): - return options + def _scanner_options(self, options): + """Returns the default options to create a new Scanner. + + This is automatically invoked by :meth:`Dataset.scanner` + and there is no need to use it. + """ + new_options = options.copy() + + # at the moment only support filter + requested_filter = options.get("filter") + current_filter = self._scan_options.get("filter") + if requested_filter is not None and current_filter is not None: + new_options["filter"] = current_filter & requested_filter + elif current_filter is not None: + new_options["filter"] = current_filter + + return new_options def scanner(self, **kwargs): """ @@ -399,9 +415,19 @@ cdef class Dataset(_Weakrefable): Returns ------- - FilteredDataset + Dataset """ - return FilteredDataset(self, expression) + cdef: + Dataset filtered_dataset + + try: + new_filter = self._scan_options["filter"] & expression + except KeyError: + new_filter = expression + filtered_dataset = self.__class__.__new__(self.__class__) + filtered_dataset.init(self.wrapped) + filtered_dataset._scan_options = dict(filter=new_filter) + return filtered_dataset def join(self, right_dataset, keys, right_keys=None, join_type="left outer", left_suffix=None, right_suffix=None, coalesce_keys=True, @@ -451,63 +477,6 @@ cdef class Dataset(_Weakrefable): use_threads=use_threads, coalesce_keys=coalesce_keys, output_type=InMemoryDataset) -cdef class FilteredDataset(Dataset): - """ - A Dataset with an applied filter. - - Parameters - ---------- - dataset : Dataset - The dataset to which the filter should be applied. - expression : Expression - The filter that should be applied to the dataset. - """ - - def __init__(self, dataset, expression): - self.init((dataset).wrapped) - self._scan_options = dict(filter=expression) - - cdef void init(self, const shared_ptr[CDataset]& sp): - Dataset.init(self, sp) - self._scan_options = dict() - - def filter(self, expression): - """Apply an additional row filter to the filtered dataset. - - Parameters - ---------- - expression : Expression - The filter that should be applied to the dataset. - - Returns - ------- - FilteredDataset - """ - cdef: - FilteredDataset filtered_dataset - - try: - new_filter = self._scan_options["filter"] & expression - except KeyError: - new_filter = expression - filtered_dataset = self.__class__.__new__(self.__class__) - filtered_dataset.init(self.wrapped) - filtered_dataset._scan_options = dict(filter=new_filter) - return filtered_dataset - - def scanner_options(self, options): - new_options = options.copy() - - # at the moment only support filter - requested_filter = options.get("filter") - current_filter = self._scan_options.get("filter") - if requested_filter is not None and current_filter is not None: - new_options["filter"] = current_filter & requested_filter - elif current_filter is not None: - new_options["filter"] = current_filter - - return new_options - cdef class InMemoryDataset(Dataset): """ A Dataset wrapping in-memory data. @@ -2452,7 +2421,7 @@ cdef class Scanner(_Weakrefable): cdef: shared_ptr[CScannerBuilder] builder = make_shared[CScannerBuilder](dataset.unwrap()) - py_scanoptions = dataset.scanner_options(py_scanoptions) + py_scanoptions = dataset._scanner_options(py_scanoptions) # Need to explicitly expand the arguments as Cython doesn't support # keyword expansion in cdef functions. diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index d98b407995c..36fcae54e6a 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -31,7 +31,7 @@ from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wra RecordBatchReader) from pyarrow.lib import tobytes from pyarrow._compute cimport Expression, _true -from pyarrow._dataset cimport Dataset, FilteredDataset, Scanner +from pyarrow._dataset cimport Dataset, Scanner from pyarrow._dataset import InMemoryDataset Initialize() # Initialise support for Datasets in ExecPlan diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b41bb8e1e56..6fbe00eace1 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -24,7 +24,6 @@ CsvFileFormat, CsvFragmentScanOptions, Dataset, - FilteredDataset, DatasetFactory, DirectoryPartitioning, FeatherFileFormat, diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a0d447f8eb3..30ec91b7abe 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4864,20 +4864,22 @@ def test_dataset_filter(tempdir, dstype): # Ensure chained filtering works. result = ds1.filter(pc.field("colA") < 3).filter(pc.field("col2") == "a") + assert type(result) == (ds.FileSystemDataset if dstype == + "fs" else ds.InMemoryDataset) + assert result.to_table() == pa.table({ "colA": [1], "col2": ["a"] }) - assert type(result) == ds.FilteredDataset - assert result.head(5) == pa.table({ "colA": [1], "col2": ["a"] }) # Ensure that further filtering with scanners works too - r2 = ds1.filter(pc.field("colA") < 8).filter(pc.field("colA") > 1).scanner(filter=pc.field("colA") != 6) + r2 = ds1.filter(pc.field("colA") < 8).filter( + pc.field("colA") > 1).scanner(filter=pc.field("colA") != 6) assert r2.to_table() == pa.table({ "colA": [2], "col2": ["b"] @@ -4891,6 +4893,7 @@ def test_dataset_filter(tempdir, dstype): "col2": ["a"] }) + # Ensure that joining to a filtered Dataset works. joined = result.join(ds.dataset(pa.table({ "colB": [10, 20], "col2": ["a", "b"] From e41fd94c3b59cb929d97d5108b0ba95dea1c5849 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 29 Nov 2022 11:43:24 +0100 Subject: [PATCH 22/31] Update docs/source/python/compute.rst Co-authored-by: Joris Van den Bossche --- docs/source/python/compute.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/python/compute.rst b/docs/source/python/compute.rst index 26f29110978..2ef4a1f754b 100644 --- a/docs/source/python/compute.rst +++ b/docs/source/python/compute.rst @@ -369,7 +369,7 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter: chars: [["f","h","l"]] :class:`.Dataset` can similarly be filtered with the :meth:`.Dataset.filter` method. -The method will return an instance of :class:`.FilteredDataset` which will lazily +The method will return an instance of :class:`.Dataset` which will lazily apply the filter as soon as actual data of the dataset is accessed: >>> dataset = ds.dataset(table) From 9c764eaaad545cde3b727d6727e3f5b97ef379ef Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 30 Nov 2022 17:23:41 +0100 Subject: [PATCH 23/31] Disable Dataset.get_fragments() when filtered --- python/pyarrow/_dataset.pyx | 21 +++++++++++++++++---- python/pyarrow/tests/test_dataset.py | 6 ++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index ea1055009ae..c9284b0ad41 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -218,6 +218,18 @@ cdef class Dataset(_Weakrefable): ------- fragments : iterator of Fragment """ + if self._scan_options.get("filter") is not None: + # Accessing fragments of a filtered dataset is not supported. + # It would be unclear if you wanted to filter the fragments + # or the rows in those fragments. + raise ValueError( + "Retrieving fragments of a filtered or projected " + "dataset is not allowed. Remove the filtering." + ) + + return self._get_fragments(filter) + + def _get_fragments(self, Expression filter): cdef: CExpression c_filter CFragmentIterator c_iterator @@ -420,10 +432,11 @@ cdef class Dataset(_Weakrefable): cdef: Dataset filtered_dataset - try: - new_filter = self._scan_options["filter"] & expression - except KeyError: - new_filter = expression + new_filter = expression + current_filter = self._scan_options.get("filter") + if current_filter is not None and new_filter is not None: + new_filter = current_filter & new_filter + filtered_dataset = self.__class__.__new__(self.__class__) filtered_dataset.init(self.wrapped) filtered_dataset._scan_options = dict(filter=new_filter) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 30ec91b7abe..edf052b4326 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4904,6 +4904,12 @@ def test_dataset_filter(tempdir, dstype): "col2": ["a", "b"] }) + # Can't get fragments of a filtered dataset + with pytest.raises(ValueError): + result.get_fragments() + fragments = list(result.filter(None).get_fragments()) + assert len(fragments) == 1 + def test_write_dataset_with_scanner_use_projected_schema(tempdir): """ From ba387f4c9291a64714d373049d557ce478038d4e Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 2 Dec 2022 17:54:32 +0100 Subject: [PATCH 24/31] Disable passing None as a filter --- python/pyarrow/_dataset.pyx | 4 ++-- python/pyarrow/tests/test_dataset.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index c9284b0ad41..0edd5b06547 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -220,7 +220,7 @@ cdef class Dataset(_Weakrefable): """ if self._scan_options.get("filter") is not None: # Accessing fragments of a filtered dataset is not supported. - # It would be unclear if you wanted to filter the fragments + # It would be unclear if you wanted to filter the fragments # or the rows in those fragments. raise ValueError( "Retrieving fragments of a filtered or projected " @@ -416,7 +416,7 @@ cdef class Dataset(_Weakrefable): """The common schema of the full Dataset""" return pyarrow_wrap_schema(self.dataset.schema()) - def filter(self, expression): + def filter(self, expression not None): """ Apply a row filter to the dataset. diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index edf052b4326..653ad0edd9e 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4907,8 +4907,6 @@ def test_dataset_filter(tempdir, dstype): # Can't get fragments of a filtered dataset with pytest.raises(ValueError): result.get_fragments() - fragments = list(result.filter(None).get_fragments()) - assert len(fragments) == 1 def test_write_dataset_with_scanner_use_projected_schema(tempdir): From 74ba4be9a59b298df303463b77ca96f45be03243 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 2 Dec 2022 17:55:19 +0100 Subject: [PATCH 25/31] 2 lines separation between classes --- python/pyarrow/_dataset.pyx | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 0edd5b06547..1849a4db211 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -490,6 +490,7 @@ cdef class Dataset(_Weakrefable): use_threads=use_threads, coalesce_keys=coalesce_keys, output_type=InMemoryDataset) + cdef class InMemoryDataset(Dataset): """ A Dataset wrapping in-memory data. From 5e96eb7501922af4460cb425edc9fb3e4f50800b Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 2 Dec 2022 18:06:43 +0100 Subject: [PATCH 26/31] Dataset.replace_schema --- python/pyarrow/_dataset.pyx | 10 ++++++++-- python/pyarrow/tests/test_dataset.py | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 1849a4db211..c8dc34c462b 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -201,8 +201,14 @@ cdef class Dataset(_Weakrefable): The new dataset schema. """ cdef shared_ptr[CDataset] copy = GetResultValue( - self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))) - return Dataset.wrap(move(copy)) + self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema)) + ) + + d = Dataset.wrap(move(copy)) + if self._scan_options: + # Preserve scan options if set. + d._scan_options = self._scan_options.copy() + return d def get_fragments(self, Expression filter=None): """Returns an iterator over the fragments in this dataset. diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 653ad0edd9e..36e988fe8ab 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4908,6 +4908,20 @@ def test_dataset_filter(tempdir, dstype): with pytest.raises(ValueError): result.get_fragments() + # Ensure replacing schema preserves the filter. + schema_without_colB = ds1.schema.remove(1) + newschema = ds1.filter( + pc.field("colA") < 3 + ).replace_schema(schema_without_colB) + assert newschema.to_table() == pa.table({ + "colA": [1, 2], + }) + with pytest.raises(pa.ArrowInvalid): + # The schema might end up being replaced with + # something that makes the filter invalid. + # Let's make sure we error nicely. + result.replace_schema(schema_without_colB).to_table() + def test_write_dataset_with_scanner_use_projected_schema(tempdir): """ From b1cf868b507f6c13eced81c323003c820dfa419f Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 2 Dec 2022 18:12:28 +0100 Subject: [PATCH 27/31] Remove CSourceNodeOptions.FromRecordBatchReader we don't use it anymore --- python/pyarrow/includes/libarrow.pxd | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 7c5b31df518..3813ec05d55 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2546,10 +2546,7 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil pass cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions): - @staticmethod - CResult[shared_ptr[CSourceNodeOptions]] FromRecordBatchReader( - shared_ptr[CRecordBatchReader] reader, shared_ptr[CSchema] schema, - CExecutor* exc) + pass cdef cppclass CTableSourceNodeOptions "arrow::compute::TableSourceNodeOptions"(CExecNodeOptions): CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t max_batch_size) From 75fe9fd7b187841215c1274e7cfeb818459d3512 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 5 Dec 2022 18:17:06 +0100 Subject: [PATCH 28/31] Deal with UnionDataset --- python/pyarrow/dataset.py | 8 +++++ python/pyarrow/tests/test_dataset.py | 50 ++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 6fbe00eace1..50dfadb08cf 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -472,6 +472,14 @@ def _union_dataset(children, schema=None, **kwargs): # unify the children datasets' schemas schema = pa.unify_schemas([child.schema for child in children]) + for child in children: + if getattr(child, "_scan_options", None): + raise ValueError( + "Creating an UnionDataset from filtered or projected Datasets " + "is currently not supported. Union the unfiltered datasets " + "and apply the filtered to the resulting union." + ) + # create datasets with the requested schema children = [child.replace_schema(schema) for child in children] diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 36e988fe8ab..323a7f0a749 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4923,6 +4923,56 @@ def test_dataset_filter(tempdir, dstype): result.replace_schema(schema_without_colB).to_table() +@pytest.mark.dataset +@pytest.mark.parametrize('dstype', [ + "fs", "mem" +]) +def test_union_dataset_filter(tempdir, dstype): + t1 = pa.table({ + "colA": [1, 2, 6, 8], + "col2": ["a", "b", "f", "g"] + }) + t2 = pa.table({ + "colA": [9, 10, 11], + "col2": ["h", "i", "l"] + }) + if dstype == "fs": + ds.write_dataset(t1, tempdir / "t1", format="ipc") + ds1 = ds.dataset(tempdir / "t1", format="ipc") + ds.write_dataset(t2, tempdir / "t2", format="ipc") + ds2 = ds.dataset(tempdir / "t2", format="ipc") + elif dstype == "mem": + ds1 = ds.dataset(t1) + ds2 = ds.dataset(t2) + else: + raise NotImplementedError + + filtered_union_ds = ds.dataset((ds1, ds2)).filter( + (pc.field("colA") < 3) | (pc.field("colA") == 9) + ) + assert filtered_union_ds.to_table() == pa.table({ + "colA": [1, 2, 9], + "col2": ["a", "b", "h"] + }) + + joined = filtered_union_ds.join(ds.dataset(pa.table({ + "colB": [10, 20], + "col2": ["a", "b"] + })), keys="col2", join_type="left outer") + assert joined.to_table().sort_by("colA") == pa.table({ + "colA": [1, 2, 9], + "col2": ["a", "b", "h"], + "colB": [10, 20, None] + }) + + filtered_ds1 = ds1.filter(pc.field("colA") < 3) + filtered_ds2 = ds2.filter(pc.field("colA") < 10) + + with pytest.raises(ValueError) as err: + ds.dataset((filtered_ds1, filtered_ds2)) + assert "currently not supported" in str(err.value) + + def test_write_dataset_with_scanner_use_projected_schema(tempdir): """ Ensure the projected schema is used to validate partitions for scanner From a6c08b130cb596e046522945d1ce88874fa4ce20 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 6 Dec 2022 12:26:21 +0100 Subject: [PATCH 29/31] minor fixes --- python/pyarrow/_dataset.pyx | 38 +++++++++++++++------------- python/pyarrow/_exec_plan.pyx | 17 ++++++++----- python/pyarrow/dataset.py | 2 +- python/pyarrow/tests/test_dataset.py | 15 ++++++----- 4 files changed, 39 insertions(+), 33 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index c8dc34c462b..65eae80a5ff 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2445,18 +2445,16 @@ cdef class Scanner(_Weakrefable): # Need to explicitly expand the arguments as Cython doesn't support # keyword expansion in cdef functions. - _populate_builder(builder, - columns=py_scanoptions.get("columns"), - filter=py_scanoptions.get("filter"), - batch_size=py_scanoptions.get( - "batch_size", _DEFAULT_BATCH_SIZE), - batch_readahead=py_scanoptions.get( - "batch_readahead", _DEFAULT_BATCH_READAHEAD), - fragment_readahead=py_scanoptions.get( - "fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), - use_threads=py_scanoptions.get("use_threads", True), - memory_pool=py_scanoptions.get("memory_pool"), - fragment_scan_options=py_scanoptions.get("fragment_scan_options")) + _populate_builder( + builder, + columns=py_scanoptions.get("columns"), + filter=py_scanoptions.get("filter"), + batch_size=py_scanoptions.get("batch_size", _DEFAULT_BATCH_SIZE), + batch_readahead=py_scanoptions.get("batch_readahead", _DEFAULT_BATCH_READAHEAD), + fragment_readahead=py_scanoptions.get("fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), + use_threads=py_scanoptions.get("use_threads", True), + memory_pool=py_scanoptions.get("memory_pool"), + fragment_scan_options=py_scanoptions.get("fragment_scan_options")) return GetResultValue(deref(builder).GetScanOptions()) @@ -2528,12 +2526,8 @@ cdef class Scanner(_Weakrefable): default pool. """ cdef: - shared_ptr[CScanOptions] options = Scanner._make_scan_options(dataset, dict(columns=columns, filter=filter, - batch_size=batch_size, batch_readahead=batch_readahead, - fragment_readahead=fragment_readahead, use_threads=use_threads, - memory_pool=memory_pool, - fragment_scan_options=fragment_scan_options)) - shared_ptr[CScannerBuilder] builder = make_shared[CScannerBuilder](dataset.unwrap(), options) + shared_ptr[CScanOptions] options + shared_ptr[CScannerBuilder] builder shared_ptr[CScanner] scanner if use_async is not None: @@ -2541,6 +2535,14 @@ cdef class Scanner(_Weakrefable): 'effect. It will be removed in the next release.', FutureWarning) + options = Scanner._make_scan_options( + dataset, + dict(columns=columns, filter=filter, batch_size=batch_size, + batch_readahead=batch_readahead, + fragment_readahead=fragment_readahead, use_threads=use_threads, + memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) + ) + builder = make_shared[CScannerBuilder](dataset.unwrap(), options) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 36fcae54e6a..5e48bf70766 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -110,15 +110,18 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads # Filters applied in CScanNodeOptions are "best effort" for the scan node itself, # so we always need to inject an additional Filter node to apply them for real. - current_decl = CDeclaration(tobytes("filter"), no_c_inputs, - static_pointer_cast[CExecNodeOptions, CFilterNodeOptions]( - make_shared[CFilterNodeOptions]( - deref(deref(c_scanopts).scan_options).filter) - ) + current_decl = CDeclaration( + tobytes("filter"), + no_c_inputs, + static_pointer_cast[CExecNodeOptions, CFilterNodeOptions]( + make_shared[CFilterNodeOptions]( + deref(deref(c_scanopts).scan_options).filter + ) + ) ) current_decl.inputs.push_back( - CDeclaration.Input(CDeclaration( - tobytes("scan"), no_c_inputs, c_input_node_opts)) + CDeclaration.Input( + CDeclaration(tobytes("scan"), no_c_inputs, c_input_node_opts)) ) else: raise TypeError("Unsupported type") diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 50dfadb08cf..adbf064a736 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -477,7 +477,7 @@ def _union_dataset(children, schema=None, **kwargs): raise ValueError( "Creating an UnionDataset from filtered or projected Datasets " "is currently not supported. Union the unfiltered datasets " - "and apply the filtered to the resulting union." + "and apply the filter to the resulting union." ) # create datasets with the requested schema diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 323a7f0a749..66ef06aaf1e 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4845,7 +4845,6 @@ def test_dataset_join_collisions(tempdir): ], names=["colA", "colB", "colVals", "colB_r", "colVals_r"]) -@pytest.mark.dataset @pytest.mark.parametrize('dstype', [ "fs", "mem" ]) @@ -4904,15 +4903,19 @@ def test_dataset_filter(tempdir, dstype): "col2": ["a", "b"] }) + # Filter with None doesn't work for now + with pytest.raises(TypeError): + ds1.filter(None) + # Can't get fragments of a filtered dataset with pytest.raises(ValueError): result.get_fragments() # Ensure replacing schema preserves the filter. - schema_without_colB = ds1.schema.remove(1) + schema_without_col2 = ds1.schema.remove(1) newschema = ds1.filter( pc.field("colA") < 3 - ).replace_schema(schema_without_colB) + ).replace_schema(schema_without_col2) assert newschema.to_table() == pa.table({ "colA": [1, 2], }) @@ -4920,10 +4923,9 @@ def test_dataset_filter(tempdir, dstype): # The schema might end up being replaced with # something that makes the filter invalid. # Let's make sure we error nicely. - result.replace_schema(schema_without_colB).to_table() + result.replace_schema(schema_without_col2).to_table() -@pytest.mark.dataset @pytest.mark.parametrize('dstype', [ "fs", "mem" ]) @@ -4968,9 +4970,8 @@ def test_union_dataset_filter(tempdir, dstype): filtered_ds1 = ds1.filter(pc.field("colA") < 3) filtered_ds2 = ds2.filter(pc.field("colA") < 10) - with pytest.raises(ValueError) as err: + with pytest.raises(ValueError, match="currently not supported"): ds.dataset((filtered_ds1, filtered_ds2)) - assert "currently not supported" in str(err.value) def test_write_dataset_with_scanner_use_projected_schema(tempdir): From c2ff428c964b4915f6c9b27e927981d6ac3cc619 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 6 Dec 2022 13:07:52 +0100 Subject: [PATCH 30/31] fixup --- python/pyarrow/_dataset.pyx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 65eae80a5ff..26c9f503bd8 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2450,8 +2450,10 @@ cdef class Scanner(_Weakrefable): columns=py_scanoptions.get("columns"), filter=py_scanoptions.get("filter"), batch_size=py_scanoptions.get("batch_size", _DEFAULT_BATCH_SIZE), - batch_readahead=py_scanoptions.get("batch_readahead", _DEFAULT_BATCH_READAHEAD), - fragment_readahead=py_scanoptions.get("fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), + batch_readahead=py_scanoptions.get( + "batch_readahead", _DEFAULT_BATCH_READAHEAD), + fragment_readahead=py_scanoptions.get( + "fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), use_threads=py_scanoptions.get("use_threads", True), memory_pool=py_scanoptions.get("memory_pool"), fragment_scan_options=py_scanoptions.get("fragment_scan_options")) From 1f1d7bd3e6b26dc55309a46670f07034729e7297 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Mon, 12 Dec 2022 13:32:59 +0100 Subject: [PATCH 31/31] Ensure filtering works in parquet datasets too --- python/pyarrow/tests/test_dataset.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 66ef06aaf1e..e293c917429 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4974,6 +4974,21 @@ def test_union_dataset_filter(tempdir, dstype): ds.dataset((filtered_ds1, filtered_ds2)) +def test_parquet_dataset_filter(tempdir): + root_path = tempdir / "test_parquet_dataset_filter" + metadata_path, _ = _create_parquet_dataset_simple(root_path) + dataset = ds.parquet_dataset(metadata_path) + + result = dataset.to_table() + assert result.num_rows == 40 + + filtered_ds = dataset.filter(pc.field("f1") < 2) + assert filtered_ds.to_table().num_rows == 20 + + with pytest.raises(ValueError): + filtered_ds.get_fragments() + + def test_write_dataset_with_scanner_use_projected_schema(tempdir): """ Ensure the projected schema is used to validate partitions for scanner