Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
259f7f2
Proof of concept
amol- Jun 21, 2022
e3981e5
Working joins
amol- Jun 30, 2022
4f50a8b
lint
amol- Jun 30, 2022
545067a
Ensure standard dataset methods keep the filter
amol- Jul 4, 2022
4441003
Document Dataset.filter
amol- Jul 4, 2022
1d33ee3
Lint
amol- Jul 4, 2022
9d3b4a9
Add class to reference
amol- Jul 4, 2022
e51ac73
tweak variable name
amol- Jul 5, 2022
87a828f
Update docs/source/python/compute.rst
amol- Jul 5, 2022
c932b97
Update python/pyarrow/_dataset.pyx
amol- Jul 5, 2022
30d2409
Remove unecessary casts
amol- Jul 5, 2022
1f406de
move schema
amol- Jul 5, 2022
6523ed5
better error and docstrings
amol- Jul 5, 2022
3476dfb
Tweak docstrings
amol- Jul 5, 2022
569cd80
Update cpp/src/arrow/compute/exec/options.cc
amol- Jul 7, 2022
4401496
Refactoring
amol- Jul 13, 2022
b134425
Allow to create ScanOptions alone
amol- Sep 15, 2022
737dac2
Working filter and join on filtered datasets
amol- Oct 10, 2022
94e6713
lint
amol- Oct 10, 2022
16afe33
Test with chained filtering
amol- Nov 24, 2022
3f83890
Remove usage of FilteredDataset class
amol- Nov 24, 2022
e41fd94
Update docs/source/python/compute.rst
amol- Nov 29, 2022
9c764ea
Disable Dataset.get_fragments() when filtered
amol- Nov 30, 2022
ba387f4
Disable passing None as a filter
amol- Dec 2, 2022
74ba4be
2 lines separation between classes
amol- Dec 2, 2022
5e96eb7
Dataset.replace_schema
amol- Dec 2, 2022
b1cf868
Remove CSourceNodeOptions.FromRecordBatchReader we don't use it anymore
amol- Dec 2, 2022
75fe9fd
Deal with UnionDataset
amol- Dec 5, 2022
a6c08b1
minor fixes
jorisvandenbossche Dec 6, 2022
c2ff428
fixup
jorisvandenbossche Dec 6, 2022
1f1d7bd
Ensure filtering works in parquet datasets too
amol- Dec 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions cpp/src/arrow/compute/exec/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,27 @@ std::string ToString(JoinType t) {
}

Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
const Table& table, arrow::internal::Executor* exc) {
const Table& table, arrow::internal::Executor* executor) {
std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(table);

if (exc == nullptr) return Status::TypeError("No executor provided.");
if (executor == nullptr) return Status::TypeError("No executor provided.");

// Map the RecordBatchReader to a SourceNode
ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc));
ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));

return std::make_shared<SourceNodeOptions>(table.schema(), batch_gen);
}

Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromRecordBatchReader(
std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
arrow::internal::Executor* executor) {
if (executor == nullptr) return Status::TypeError("No executor provided.");

// Map the RecordBatchReader to a SourceNode
ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));

return std::make_shared<SourceNodeOptions>(std::move(schema), std::move(batch_gen));
}

} // namespace compute
} // namespace arrow
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
static Result<std::shared_ptr<SourceNodeOptions>> FromTable(const Table& table,
arrow::internal::Executor*);

static Result<std::shared_ptr<SourceNodeOptions>> FromRecordBatchReader(
std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
arrow::internal::Executor*);

std::shared_ptr<Schema> output_schema;
std::function<Future<std::optional<ExecBatch>>()> generator;
};
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -927,12 +927,17 @@ Status ScannerBuilder::Backpressure(compute::BackpressureOptions backpressure) {
return Status::OK();
}

Result<std::shared_ptr<Scanner>> ScannerBuilder::Finish() {
Result<std::shared_ptr<ScanOptions>> ScannerBuilder::GetScanOptions() {
if (!scan_options_->projection.IsBound()) {
RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names()));
}

return std::make_shared<AsyncScanner>(dataset_, scan_options_);
return scan_options_;
}

Result<std::shared_ptr<Scanner>> ScannerBuilder::Finish() {
ARROW_ASSIGN_OR_RAISE(auto scan_options, GetScanOptions());
return std::make_shared<AsyncScanner>(dataset_, scan_options);
}

namespace {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// \brief Override default backpressure configuration
Status Backpressure(compute::BackpressureOptions backpressure);

/// \brief Return the current scan options for the builder.
Result<std::shared_ptr<ScanOptions>> GetScanOptions();

/// \brief Return the constructed now-immutable Scanner object
Result<std::shared_ptr<Scanner>> Finish();

Expand Down
15 changes: 13 additions & 2 deletions docs/source/python/compute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,19 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter:
nums: [[6,8,10]]
chars: [["f","h","l"]]

:class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` method
passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset documentation.
:class:`.Dataset` can similarly be filtered with the :meth:`.Dataset.filter` method.
The method will return an instance of :class:`.Dataset` which will lazily
apply the filter as soon as actual data of the dataset is accessed:

>>> dataset = ds.dataset(table)
>>> filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2)
>>> filtered.to_table()
pyarrow.Table
nums: int64
chars: string
----
nums: [[3,4]]
chars: [["c","d"]]


User-Defined Functions
Expand Down
17 changes: 17 additions & 0 deletions python/pyarrow/_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ cdef class Dataset(_Weakrefable):
cdef:
shared_ptr[CDataset] wrapped
CDataset* dataset
public dict _scan_options

cdef void init(self, const shared_ptr[CDataset]& sp)

Expand All @@ -56,6 +57,22 @@ cdef class Dataset(_Weakrefable):
cdef shared_ptr[CDataset] unwrap(self) nogil


cdef class Scanner(_Weakrefable):
cdef:
shared_ptr[CScanner] wrapped
CScanner* scanner

cdef void init(self, const shared_ptr[CScanner]& sp)

@staticmethod
cdef wrap(const shared_ptr[CScanner]& sp)

cdef shared_ptr[CScanner] unwrap(self)

@staticmethod
cdef shared_ptr[CScanOptions] _make_scan_options(Dataset dataset, dict py_scanoptions) except *


cdef class FragmentScanOptions(_Weakrefable):

cdef:
Expand Down
121 changes: 105 additions & 16 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ cdef class Dataset(_Weakrefable):
cdef void init(self, const shared_ptr[CDataset]& sp):
self.wrapped = sp
self.dataset = sp.get()
self._scan_options = dict()

@staticmethod
cdef wrap(const shared_ptr[CDataset]& sp):
Expand Down Expand Up @@ -200,8 +201,14 @@ cdef class Dataset(_Weakrefable):
The new dataset schema.
"""
cdef shared_ptr[CDataset] copy = GetResultValue(
self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema)))
return Dataset.wrap(move(copy))
self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))
)

d = Dataset.wrap(move(copy))
if self._scan_options:
# Preserve scan options if set.
d._scan_options = self._scan_options.copy()
return d

def get_fragments(self, Expression filter=None):
"""Returns an iterator over the fragments in this dataset.
Expand All @@ -217,6 +224,18 @@ cdef class Dataset(_Weakrefable):
-------
fragments : iterator of Fragment
"""
if self._scan_options.get("filter") is not None:
# Accessing fragments of a filtered dataset is not supported.
# It would be unclear if you wanted to filter the fragments
# or the rows in those fragments.
raise ValueError(
"Retrieving fragments of a filtered or projected "
"dataset is not allowed. Remove the filtering."
)

return self._get_fragments(filter)

def _get_fragments(self, Expression filter):
cdef:
CExpression c_filter
CFragmentIterator c_iterator
Expand All @@ -231,6 +250,24 @@ cdef class Dataset(_Weakrefable):
for maybe_fragment in c_fragments:
yield Fragment.wrap(GetResultValue(move(maybe_fragment)))

def _scanner_options(self, options):
"""Returns the default options to create a new Scanner.

This is automatically invoked by :meth:`Dataset.scanner`
and there is no need to use it.
"""
new_options = options.copy()

# at the moment only support filter
requested_filter = options.get("filter")
current_filter = self._scan_options.get("filter")
if requested_filter is not None and current_filter is not None:
new_options["filter"] = current_filter & requested_filter
elif current_filter is not None:
new_options["filter"] = current_filter

return new_options

def scanner(self, **kwargs):
"""
Build a scan operation against the dataset.
Expand All @@ -239,7 +276,7 @@ cdef class Dataset(_Weakrefable):
which exposes further operations (e.g. loading all data as a
table, counting rows).

See the `Scanner.from_dataset` method for further information.
See the :meth:`Scanner.from_dataset` method for further information.

Parameters
----------
Expand Down Expand Up @@ -385,6 +422,32 @@ cdef class Dataset(_Weakrefable):
"""The common schema of the full Dataset"""
return pyarrow_wrap_schema(self.dataset.schema())

def filter(self, expression not None):
"""
Apply a row filter to the dataset.

Parameters
----------
expression : Expression
The filter that should be applied to the dataset.

Returns
-------
Dataset
"""
cdef:
Dataset filtered_dataset

new_filter = expression
current_filter = self._scan_options.get("filter")
if current_filter is not None and new_filter is not None:
new_filter = current_filter & new_filter

filtered_dataset = self.__class__.__new__(self.__class__)
filtered_dataset.init(self.wrapped)
filtered_dataset._scan_options = dict(filter=new_filter)
return filtered_dataset

def join(self, right_dataset, keys, right_keys=None, join_type="left outer",
left_suffix=None, right_suffix=None, coalesce_keys=True,
use_threads=True):
Expand Down Expand Up @@ -2187,6 +2250,9 @@ cdef class RecordBatchIterator(_Weakrefable):
self.iterator = make_shared[CRecordBatchIterator](move(iterator))
return self

cdef inline shared_ptr[CRecordBatchIterator] unwrap(self) nogil:
return self.iterator

def __iter__(self):
return self

Expand Down Expand Up @@ -2354,10 +2420,6 @@ cdef class Scanner(_Weakrefable):
default pool.
"""

cdef:
shared_ptr[CScanner] wrapped
CScanner* scanner

def __init__(self):
_forbid_instantiation(self.__class__)

Expand All @@ -2375,8 +2437,34 @@ cdef class Scanner(_Weakrefable):
return self.wrapped

@staticmethod
def from_dataset(Dataset dataset not None, *, object columns=None,
Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE,
cdef shared_ptr[CScanOptions] _make_scan_options(Dataset dataset, dict py_scanoptions) except *:
cdef:
shared_ptr[CScannerBuilder] builder = make_shared[CScannerBuilder](dataset.unwrap())

py_scanoptions = dataset._scanner_options(py_scanoptions)

# Need to explicitly expand the arguments as Cython doesn't support
# keyword expansion in cdef functions.
_populate_builder(
builder,
columns=py_scanoptions.get("columns"),
filter=py_scanoptions.get("filter"),
batch_size=py_scanoptions.get("batch_size", _DEFAULT_BATCH_SIZE),
batch_readahead=py_scanoptions.get(
"batch_readahead", _DEFAULT_BATCH_READAHEAD),
fragment_readahead=py_scanoptions.get(
"fragment_readahead", _DEFAULT_FRAGMENT_READAHEAD),
use_threads=py_scanoptions.get("use_threads", True),
memory_pool=py_scanoptions.get("memory_pool"),
fragment_scan_options=py_scanoptions.get("fragment_scan_options"))

return GetResultValue(deref(builder).GetScanOptions())

@staticmethod
def from_dataset(Dataset dataset not None, *,
object columns=None,
Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
int batch_readahead=_DEFAULT_BATCH_READAHEAD,
int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD,
FragmentScanOptions fragment_scan_options=None,
Expand Down Expand Up @@ -2440,7 +2528,7 @@ cdef class Scanner(_Weakrefable):
default pool.
"""
cdef:
shared_ptr[CScanOptions] options = make_shared[CScanOptions]()
shared_ptr[CScanOptions] options
shared_ptr[CScannerBuilder] builder
shared_ptr[CScanner] scanner

Expand All @@ -2449,13 +2537,14 @@ cdef class Scanner(_Weakrefable):
'effect. It will be removed in the next release.',
FutureWarning)

options = Scanner._make_scan_options(
dataset,
dict(columns=columns, filter=filter, batch_size=batch_size,
batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead, use_threads=use_threads,
memory_pool=memory_pool, fragment_scan_options=fragment_scan_options)
)
builder = make_shared[CScannerBuilder](dataset.unwrap(), options)
_populate_builder(builder, columns=columns, filter=filter,
batch_size=batch_size, batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead, use_threads=use_threads,
memory_pool=memory_pool,
fragment_scan_options=fragment_scan_options)

scanner = GetResultValue(builder.get().Finish())
return Scanner.wrap(scanner)

Expand Down
Loading