diff --git a/cpp/src/arrow/compute/exec/options.cc b/cpp/src/arrow/compute/exec/options.cc index ff66649e297..9e9da7ad831 100644 --- a/cpp/src/arrow/compute/exec/options.cc +++ b/cpp/src/arrow/compute/exec/options.cc @@ -51,16 +51,27 @@ std::string ToString(JoinType t) { } Result> SourceNodeOptions::FromTable( - const Table& table, arrow::internal::Executor* exc) { + const Table& table, arrow::internal::Executor* executor) { std::shared_ptr reader = std::make_shared(table); - if (exc == nullptr) return Status::TypeError("No executor provided."); + if (executor == nullptr) return Status::TypeError("No executor provided."); // Map the RecordBatchReader to a SourceNode - ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc)); + ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor)); return std::make_shared(table.schema(), batch_gen); } +Result> SourceNodeOptions::FromRecordBatchReader( + std::shared_ptr reader, std::shared_ptr schema, + arrow::internal::Executor* executor) { + if (executor == nullptr) return Status::TypeError("No executor provided."); + + // Map the RecordBatchReader to a SourceNode + ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor)); + + return std::make_shared(std::move(schema), std::move(batch_gen)); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 8600b113489..5b2c984eec3 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -65,6 +65,10 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { static Result> FromTable(const Table& table, arrow::internal::Executor*); + static Result> FromRecordBatchReader( + std::shared_ptr reader, std::shared_ptr schema, + arrow::internal::Executor*); + std::shared_ptr output_schema; std::function>()> generator; }; diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index b4baab5a09f..27746f5be8e 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -927,12 +927,17 @@ Status ScannerBuilder::Backpressure(compute::BackpressureOptions backpressure) { return Status::OK(); } -Result> ScannerBuilder::Finish() { +Result> ScannerBuilder::GetScanOptions() { if (!scan_options_->projection.IsBound()) { RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names())); } - return std::make_shared(dataset_, scan_options_); + return scan_options_; +} + +Result> ScannerBuilder::Finish() { + ARROW_ASSIGN_OR_RAISE(auto scan_options, GetScanOptions()); + return std::make_shared(dataset_, scan_options); } namespace { diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 7d6dd0e1a8c..dbd0eeb9165 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -520,6 +520,9 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Override default backpressure configuration Status Backpressure(compute::BackpressureOptions backpressure); + /// \brief Return the current scan options for the builder. + Result> GetScanOptions(); + /// \brief Return the constructed now-immutable Scanner object Result> Finish(); diff --git a/docs/source/python/compute.rst b/docs/source/python/compute.rst index fe7f333300f..2ef4a1f754b 100644 --- a/docs/source/python/compute.rst +++ b/docs/source/python/compute.rst @@ -368,8 +368,19 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter: nums: [[6,8,10]] chars: [["f","h","l"]] -:class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` method -passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset documentation. +:class:`.Dataset` can similarly be filtered with the :meth:`.Dataset.filter` method. +The method will return an instance of :class:`.Dataset` which will lazily +apply the filter as soon as actual data of the dataset is accessed: + + >>> dataset = ds.dataset(table) + >>> filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2) + >>> filtered.to_table() + pyarrow.Table + nums: int64 + chars: string + ---- + nums: [[3,4]] + chars: [["c","d"]] User-Defined Functions diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index a512477d501..d626b42e238 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -47,6 +47,7 @@ cdef class Dataset(_Weakrefable): cdef: shared_ptr[CDataset] wrapped CDataset* dataset + public dict _scan_options cdef void init(self, const shared_ptr[CDataset]& sp) @@ -56,6 +57,22 @@ cdef class Dataset(_Weakrefable): cdef shared_ptr[CDataset] unwrap(self) nogil +cdef class Scanner(_Weakrefable): + cdef: + shared_ptr[CScanner] wrapped + CScanner* scanner + + cdef void init(self, const shared_ptr[CScanner]& sp) + + @staticmethod + cdef wrap(const shared_ptr[CScanner]& sp) + + cdef shared_ptr[CScanner] unwrap(self) + + @staticmethod + cdef shared_ptr[CScanOptions] _make_scan_options(Dataset dataset, dict py_scanoptions) except * + + cdef class FragmentScanOptions(_Weakrefable): cdef: diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 5d4cf95087d..26c9f503bd8 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -156,6 +156,7 @@ cdef class Dataset(_Weakrefable): cdef void init(self, const shared_ptr[CDataset]& sp): self.wrapped = sp self.dataset = sp.get() + self._scan_options = dict() @staticmethod cdef wrap(const shared_ptr[CDataset]& sp): @@ -200,8 +201,14 @@ cdef class Dataset(_Weakrefable): The new dataset schema. """ cdef shared_ptr[CDataset] copy = GetResultValue( - self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))) - return Dataset.wrap(move(copy)) + self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema)) + ) + + d = Dataset.wrap(move(copy)) + if self._scan_options: + # Preserve scan options if set. + d._scan_options = self._scan_options.copy() + return d def get_fragments(self, Expression filter=None): """Returns an iterator over the fragments in this dataset. @@ -217,6 +224,18 @@ cdef class Dataset(_Weakrefable): ------- fragments : iterator of Fragment """ + if self._scan_options.get("filter") is not None: + # Accessing fragments of a filtered dataset is not supported. + # It would be unclear if you wanted to filter the fragments + # or the rows in those fragments. + raise ValueError( + "Retrieving fragments of a filtered or projected " + "dataset is not allowed. Remove the filtering." + ) + + return self._get_fragments(filter) + + def _get_fragments(self, Expression filter): cdef: CExpression c_filter CFragmentIterator c_iterator @@ -231,6 +250,24 @@ cdef class Dataset(_Weakrefable): for maybe_fragment in c_fragments: yield Fragment.wrap(GetResultValue(move(maybe_fragment))) + def _scanner_options(self, options): + """Returns the default options to create a new Scanner. + + This is automatically invoked by :meth:`Dataset.scanner` + and there is no need to use it. + """ + new_options = options.copy() + + # at the moment only support filter + requested_filter = options.get("filter") + current_filter = self._scan_options.get("filter") + if requested_filter is not None and current_filter is not None: + new_options["filter"] = current_filter & requested_filter + elif current_filter is not None: + new_options["filter"] = current_filter + + return new_options + def scanner(self, **kwargs): """ Build a scan operation against the dataset. @@ -239,7 +276,7 @@ cdef class Dataset(_Weakrefable): which exposes further operations (e.g. loading all data as a table, counting rows). - See the `Scanner.from_dataset` method for further information. + See the :meth:`Scanner.from_dataset` method for further information. Parameters ---------- @@ -385,6 +422,32 @@ cdef class Dataset(_Weakrefable): """The common schema of the full Dataset""" return pyarrow_wrap_schema(self.dataset.schema()) + def filter(self, expression not None): + """ + Apply a row filter to the dataset. + + Parameters + ---------- + expression : Expression + The filter that should be applied to the dataset. + + Returns + ------- + Dataset + """ + cdef: + Dataset filtered_dataset + + new_filter = expression + current_filter = self._scan_options.get("filter") + if current_filter is not None and new_filter is not None: + new_filter = current_filter & new_filter + + filtered_dataset = self.__class__.__new__(self.__class__) + filtered_dataset.init(self.wrapped) + filtered_dataset._scan_options = dict(filter=new_filter) + return filtered_dataset + def join(self, right_dataset, keys, right_keys=None, join_type="left outer", left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True): @@ -2187,6 +2250,9 @@ cdef class RecordBatchIterator(_Weakrefable): self.iterator = make_shared[CRecordBatchIterator](move(iterator)) return self + cdef inline shared_ptr[CRecordBatchIterator] unwrap(self) nogil: + return self.iterator + def __iter__(self): return self @@ -2354,10 +2420,6 @@ cdef class Scanner(_Weakrefable): default pool. """ - cdef: - shared_ptr[CScanner] wrapped - CScanner* scanner - def __init__(self): _forbid_instantiation(self.__class__) @@ -2375,8 +2437,34 @@ cdef class Scanner(_Weakrefable): return self.wrapped @staticmethod - def from_dataset(Dataset dataset not None, *, object columns=None, - Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, + cdef shared_ptr[CScanOptions] _make_scan_options(Dataset dataset, dict py_scanoptions) except *: + cdef: + shared_ptr[CScannerBuilder] builder = make_shared[CScannerBuilder](dataset.unwrap()) + + py_scanoptions = dataset._scanner_options(py_scanoptions) + + # Need to explicitly expand the arguments as Cython doesn't support + # keyword expansion in cdef functions. + _populate_builder( + builder, + columns=py_scanoptions.get("columns"), + filter=py_scanoptions.get("filter"), + batch_size=py_scanoptions.get("batch_size", _DEFAULT_BATCH_SIZE), + batch_readahead=py_scanoptions.get( + "batch_readahead", _DEFAULT_BATCH_READAHEAD), + fragment_readahead=py_scanoptions.get( + "fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD), + use_threads=py_scanoptions.get("use_threads", True), + memory_pool=py_scanoptions.get("memory_pool"), + fragment_scan_options=py_scanoptions.get("fragment_scan_options")) + + return GetResultValue(deref(builder).GetScanOptions()) + + @staticmethod + def from_dataset(Dataset dataset not None, *, + object columns=None, + Expression filter=None, + int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, @@ -2440,7 +2528,7 @@ cdef class Scanner(_Weakrefable): default pool. """ cdef: - shared_ptr[CScanOptions] options = make_shared[CScanOptions]() + shared_ptr[CScanOptions] options shared_ptr[CScannerBuilder] builder shared_ptr[CScanner] scanner @@ -2449,13 +2537,14 @@ cdef class Scanner(_Weakrefable): 'effect. It will be removed in the next release.', FutureWarning) + options = Scanner._make_scan_options( + dataset, + dict(columns=columns, filter=filter, batch_size=batch_size, + batch_readahead=batch_readahead, + fragment_readahead=fragment_readahead, use_threads=use_threads, + memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) + ) builder = make_shared[CScannerBuilder](dataset.unwrap(), options) - _populate_builder(builder, columns=columns, filter=filter, - batch_size=batch_size, batch_readahead=batch_readahead, - fragment_readahead=fragment_readahead, use_threads=use_threads, - memory_pool=memory_pool, - fragment_scan_options=fragment_scan_options) - scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 167a5030f5c..5e48bf70766 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -27,10 +27,11 @@ from cython.operator cimport dereference as deref, preincrement as inc from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * -from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table) +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) from pyarrow.lib import tobytes from pyarrow._compute cimport Expression, _true -from pyarrow._dataset cimport Dataset +from pyarrow._dataset cimport Dataset, Scanner from pyarrow._dataset import InMemoryDataset Initialize() # Initialise support for Datasets in ExecPlan @@ -58,6 +59,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads CExecutor *c_executor shared_ptr[CExecContext] c_exec_context shared_ptr[CExecPlan] c_exec_plan + CDeclaration current_decl vector[CDeclaration] c_decls vector[CExecNode*] _empty vector[CExecNode*] c_final_node_vec @@ -66,11 +68,13 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads shared_ptr[CTable] c_in_table shared_ptr[CTable] c_out_table shared_ptr[CTableSourceNodeOptions] c_tablesourceopts + shared_ptr[CScanner] c_dataset_scanner shared_ptr[CScanNodeOptions] c_scanopts shared_ptr[CExecNodeOptions] c_input_node_opts shared_ptr[CSinkNodeOptions] c_sinkopts shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen shared_ptr[CRecordBatchReader] c_recordbatchreader + shared_ptr[CRecordBatchReader] c_recordbatchreader_in vector[CDeclaration].iterator plan_iter vector[CDeclaration.Input] no_c_inputs CStatus c_plan_status @@ -89,35 +93,45 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads # Create source nodes for each input for ipt in inputs: if isinstance(ipt, Table): - node_factory = "table_source" c_in_table = pyarrow_unwrap_table(ipt) c_tablesourceopts = make_shared[CTableSourceNodeOptions]( c_in_table) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions]( c_tablesourceopts) + + current_decl = CDeclaration( + tobytes("table_source"), no_c_inputs, c_input_node_opts) elif isinstance(ipt, Dataset): - node_factory = "scan" c_in_dataset = (ipt).unwrap() c_scanopts = make_shared[CScanNodeOptions]( - c_in_dataset, make_shared[CScanOptions]()) - deref(deref(c_scanopts).scan_options).use_threads = use_threads + c_in_dataset, Scanner._make_scan_options(ipt, {"use_threads": use_threads})) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CScanNodeOptions]( c_scanopts) + + # Filters applied in CScanNodeOptions are "best effort" for the scan node itself, + # so we always need to inject an additional Filter node to apply them for real. + current_decl = CDeclaration( + tobytes("filter"), + no_c_inputs, + static_pointer_cast[CExecNodeOptions, CFilterNodeOptions]( + make_shared[CFilterNodeOptions]( + deref(deref(c_scanopts).scan_options).filter + ) + ) + ) + current_decl.inputs.push_back( + CDeclaration.Input( + CDeclaration(tobytes("scan"), no_c_inputs, c_input_node_opts)) + ) else: raise TypeError("Unsupported type") if plan_iter != plan.end(): # Flag the source as the input of the first plan node. - deref(plan_iter).inputs.push_back(CDeclaration.Input( - CDeclaration(tobytes(node_factory), - no_c_inputs, c_input_node_opts) - )) + deref(plan_iter).inputs.push_back(CDeclaration.Input(current_decl)) else: # Empty plan, make the source the first plan node. - c_decls.push_back( - CDeclaration(tobytes(node_factory), - no_c_inputs, c_input_node_opts) - ) + c_decls.push_back(current_decl) # Add Here additional nodes while plan_iter != plan.end(): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 6fbe00eace1..adbf064a736 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -472,6 +472,14 @@ def _union_dataset(children, schema=None, **kwargs): # unify the children datasets' schemas schema = pa.unify_schemas([child.schema for child in children]) + for child in children: + if getattr(child, "_scan_options", None): + raise ValueError( + "Creating an UnionDataset from filtered or projected Datasets " + "is currently not supported. Union the unfiltered datasets " + "and apply the filter to the resulting union." + ) + # create datasets with the requested schema children = [child.replace_schema(schema) for child in children] diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9cea340a309..3813ec05d55 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2545,6 +2545,9 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions": pass + cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions): + pass + cdef cppclass CTableSourceNodeOptions "arrow::compute::TableSourceNodeOptions"(CExecNodeOptions): CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t max_batch_size) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index e69c3cbcaf2..b75eafcdeea 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -54,6 +54,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CSchema] dataset_schema shared_ptr[CSchema] projected_schema c_bool use_threads + CExpression filter cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions): CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options) @@ -126,6 +127,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CStatus FragmentReadahead(int32_t fragment_readahead) CStatus FragmentScanOptions( shared_ptr[CFragmentScanOptions] fragment_scan_options) + CResult[shared_ptr[CScanOptions]] GetScanOptions() CResult[shared_ptr[CScanner]] Finish() shared_ptr[CSchema] schema() const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6262e3a2fdf..e293c917429 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4845,21 +4845,149 @@ def test_dataset_join_collisions(tempdir): ], names=["colA", "colB", "colVals", "colB_r", "colVals_r"]) -@pytest.mark.dataset -def test_dataset_filter(tempdir): +@pytest.mark.parametrize('dstype', [ + "fs", "mem" +]) +def test_dataset_filter(tempdir, dstype): t1 = pa.table({ - "colA": [1, 2, 6], - "col2": ["a", "b", "f"] + "colA": [1, 2, 6, 8], + "col2": ["a", "b", "f", "g"] }) - ds.write_dataset(t1, tempdir / "t1", format="ipc") - ds1 = ds.dataset(tempdir / "t1", format="ipc") + if dstype == "fs": + ds.write_dataset(t1, tempdir / "t1", format="ipc") + ds1 = ds.dataset(tempdir / "t1", format="ipc") + elif dstype == "mem": + ds1 = ds.dataset(t1) + else: + raise NotImplementedError + + # Ensure chained filtering works. + result = ds1.filter(pc.field("colA") < 3).filter(pc.field("col2") == "a") + assert type(result) == (ds.FileSystemDataset if dstype == + "fs" else ds.InMemoryDataset) - result = ds1.scanner(filter=pc.field("colA") < 3) assert result.to_table() == pa.table({ + "colA": [1], + "col2": ["a"] + }) + + assert result.head(5) == pa.table({ + "colA": [1], + "col2": ["a"] + }) + + # Ensure that further filtering with scanners works too + r2 = ds1.filter(pc.field("colA") < 8).filter( + pc.field("colA") > 1).scanner(filter=pc.field("colA") != 6) + assert r2.to_table() == pa.table({ + "colA": [2], + "col2": ["b"] + }) + + # Ensure that writing back to disk works. + ds.write_dataset(result, tempdir / "filtered", format="ipc") + filtered = ds.dataset(tempdir / "filtered", format="ipc") + assert filtered.to_table() == pa.table({ + "colA": [1], + "col2": ["a"] + }) + + # Ensure that joining to a filtered Dataset works. + joined = result.join(ds.dataset(pa.table({ + "colB": [10, 20], + "col2": ["a", "b"] + })), keys="col2", join_type="right outer") + assert joined.to_table().sort_by("colB") == pa.table({ + "colA": [1, None], + "colB": [10, 20], + "col2": ["a", "b"] + }) + + # Filter with None doesn't work for now + with pytest.raises(TypeError): + ds1.filter(None) + + # Can't get fragments of a filtered dataset + with pytest.raises(ValueError): + result.get_fragments() + + # Ensure replacing schema preserves the filter. + schema_without_col2 = ds1.schema.remove(1) + newschema = ds1.filter( + pc.field("colA") < 3 + ).replace_schema(schema_without_col2) + assert newschema.to_table() == pa.table({ "colA": [1, 2], + }) + with pytest.raises(pa.ArrowInvalid): + # The schema might end up being replaced with + # something that makes the filter invalid. + # Let's make sure we error nicely. + result.replace_schema(schema_without_col2).to_table() + + +@pytest.mark.parametrize('dstype', [ + "fs", "mem" +]) +def test_union_dataset_filter(tempdir, dstype): + t1 = pa.table({ + "colA": [1, 2, 6, 8], + "col2": ["a", "b", "f", "g"] + }) + t2 = pa.table({ + "colA": [9, 10, 11], + "col2": ["h", "i", "l"] + }) + if dstype == "fs": + ds.write_dataset(t1, tempdir / "t1", format="ipc") + ds1 = ds.dataset(tempdir / "t1", format="ipc") + ds.write_dataset(t2, tempdir / "t2", format="ipc") + ds2 = ds.dataset(tempdir / "t2", format="ipc") + elif dstype == "mem": + ds1 = ds.dataset(t1) + ds2 = ds.dataset(t2) + else: + raise NotImplementedError + + filtered_union_ds = ds.dataset((ds1, ds2)).filter( + (pc.field("colA") < 3) | (pc.field("colA") == 9) + ) + assert filtered_union_ds.to_table() == pa.table({ + "colA": [1, 2, 9], + "col2": ["a", "b", "h"] + }) + + joined = filtered_union_ds.join(ds.dataset(pa.table({ + "colB": [10, 20], "col2": ["a", "b"] + })), keys="col2", join_type="left outer") + assert joined.to_table().sort_by("colA") == pa.table({ + "colA": [1, 2, 9], + "col2": ["a", "b", "h"], + "colB": [10, 20, None] }) + filtered_ds1 = ds1.filter(pc.field("colA") < 3) + filtered_ds2 = ds2.filter(pc.field("colA") < 10) + + with pytest.raises(ValueError, match="currently not supported"): + ds.dataset((filtered_ds1, filtered_ds2)) + + +def test_parquet_dataset_filter(tempdir): + root_path = tempdir / "test_parquet_dataset_filter" + metadata_path, _ = _create_parquet_dataset_simple(root_path) + dataset = ds.parquet_dataset(metadata_path) + + result = dataset.to_table() + assert result.num_rows == 40 + + filtered_ds = dataset.filter(pc.field("f1") < 2) + assert filtered_ds.to_table().num_rows == 20 + + with pytest.raises(ValueError): + filtered_ds.get_fragments() + def test_write_dataset_with_scanner_use_projected_schema(tempdir): """