diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 331ad3d81c6..b28bf7a14a4 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -185,5 +185,27 @@ inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded l.partition_expression == r.partition_expression; } +/// Get fragment scan options of the expected type. +/// \return Fragment scan options if provided on the scan options, else the default +/// options if set, else a default-constructed value. If options are provided +/// but of the wrong type, an error is returned. +template +arrow::Result> GetFragmentScanOptions( + const std::string& type_name, ScanOptions* scan_options, + const std::shared_ptr& default_options) { + auto source = default_options; + if (scan_options && scan_options->fragment_scan_options) { + source = scan_options->fragment_scan_options; + } + if (!source) { + return std::make_shared(); + } + if (source->type_name() != type_name) { + return Status::Invalid("FragmentScanOptions of type ", source->type_name(), + " were provided for scanning a fragment of type ", type_name); + } + return internal::checked_pointer_cast(source); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index ca77f43eded..9c613c00aff 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -124,6 +124,11 @@ class ARROW_DS_EXPORT FileSource { /// \brief Base class for file format implementation class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this { public: + /// Options affecting how this format is scanned. + /// + /// The options here can be overridden at scan time. + std::shared_ptr default_fragment_scan_options; + virtual ~FileFormat() = default; /// \brief The name identifying the kind of file format diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index b7c7f3290da..e736d06753b 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -82,14 +82,11 @@ static inline Result GetConvertOptions( ARROW_ASSIGN_OR_RAISE(auto column_names, GetColumnNames(format.parse_options, first_block, pool)); - auto convert_options = csv::ConvertOptions::Defaults(); - if (scan_options && scan_options->fragment_scan_options && - scan_options->fragment_scan_options->type_name() == kCsvTypeName) { - auto csv_scan_options = internal::checked_pointer_cast( - scan_options->fragment_scan_options); - convert_options = csv_scan_options->convert_options; - } - + ARROW_ASSIGN_OR_RAISE( + auto csv_scan_options, + GetFragmentScanOptions( + kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); + auto convert_options = csv_scan_options->convert_options; for (FieldRef ref : scan_options->MaterializedFields()) { ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOne(*scan_options->dataset_schema)); @@ -99,8 +96,13 @@ static inline Result GetConvertOptions( return convert_options; } -static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) { - auto read_options = csv::ReadOptions::Defaults(); +static inline Result GetReadOptions( + const CsvFileFormat& format, const std::shared_ptr& scan_options) { + ARROW_ASSIGN_OR_RAISE( + auto csv_scan_options, + GetFragmentScanOptions( + kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); + auto read_options = csv_scan_options->read_options; // Multithreaded conversion of individual files would lead to excessive thread // contention when ScanTasks are also executed in multiple threads, so we disable it // here. @@ -112,7 +114,7 @@ static inline Result> OpenReader( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options = nullptr, MemoryPool* pool = default_memory_pool()) { - auto reader_options = GetReadOptions(format); + ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); util::string_view first_block; ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 4c66e291a47..b235195c5e3 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -62,11 +62,17 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { std::shared_ptr DefaultWriteOptions() override { return NULLPTR; } }; -class ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { - public: +/// \brief Per-scan options for CSV fragments +struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { std::string type_name() const override { return kCsvTypeName; } + /// CSV conversion options csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); + + /// CSV reading options + /// + /// Note that use_threads is always ignored. + csv::ReadOptions read_options = csv::ReadOptions::Defaults(); }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index c3c8796e17e..99ca7cc0f42 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -134,6 +134,43 @@ bar)"); ASSERT_EQ(null_count, 1); } +TEST_P(TestCsvFileFormat, CustomReadOptions) { + auto source = GetFileSource(R"(header_skipped +str +foo +MYNULL +N/A +bar)"); + SetSchema({field("str", utf8())}); + auto defaults = std::make_shared(); + defaults->read_options.skip_rows = 1; + format_->default_fragment_scan_options = defaults; + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + ASSERT_OK_AND_ASSIGN(auto physical_schema, fragment->ReadPhysicalSchema()); + AssertSchemaEqual(opts_->dataset_schema, physical_schema); + + { + int64_t rows = 0; + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + rows += batch->GetColumnByName("str")->length(); + } + ASSERT_EQ(rows, 4); + } + { + // These options completely override the default ones + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->read_options.block_size = 1 << 22; + opts_->fragment_scan_options = fragment_scan_options; + int64_t rows = 0; + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + rows += batch->GetColumnByName("header_skipped")->length(); + } + ASSERT_EQ(rows, 5); + } +} + TEST_P(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) { auto source = GetFileSource(R"(f64 1.0 diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 1aca9fa4882..dee96ceb836 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -149,6 +149,12 @@ Status ScannerBuilder::Pool(MemoryPool* pool) { return Status::OK(); } +Status ScannerBuilder::FragmentScanOptions( + std::shared_ptr fragment_scan_options) { + scan_options_->fragment_scan_options = std::move(fragment_scan_options); + return Status::OK(); +} + Result> ScannerBuilder::Finish() { if (!scan_options_->projection.IsBound()) { RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names())); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 6e06af06066..df5f7954afe 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -240,6 +240,9 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Set the pool from which materialized and scanned arrays will be allocated. Status Pool(MemoryPool* pool); + /// \brief Set fragment-specific scan options. + Status FragmentScanOptions(std::shared_ptr fragment_scan_options); + /// \brief Return the constructed now-immutable Scanner object Result> Finish(); diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 62395ad1a6e..d148d4ee2d3 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -46,6 +46,8 @@ class Fragment; using FragmentIterator = Iterator>; using FragmentVector = std::vector>; +class FragmentScanOptions; + class FileSource; class FileFormat; class FileFragment; @@ -58,6 +60,7 @@ struct FileSystemDatasetWriteOptions; class InMemoryDataset; class CsvFileFormat; +struct CsvFragmentScanOptions; class IpcFileFormat; class IpcFileWriter; diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd index 2d9d24aea57..f8e12f16bc8 100644 --- a/python/pyarrow/_csv.pxd +++ b/python/pyarrow/_csv.pxd @@ -21,9 +21,26 @@ from pyarrow.includes.libarrow cimport * from pyarrow.lib cimport _Weakrefable +cdef class ConvertOptions(_Weakrefable): + cdef: + CCSVConvertOptions options + + @staticmethod + cdef ConvertOptions wrap(CCSVConvertOptions options) + + cdef class ParseOptions(_Weakrefable): cdef: CCSVParseOptions options @staticmethod cdef ParseOptions wrap(CCSVParseOptions options) + + +cdef class ReadOptions(_Weakrefable): + cdef: + CCSVReadOptions options + public object encoding + + @staticmethod + cdef ReadOptions wrap(CCSVReadOptions options) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index cce44d1d8c8..a98160cfa99 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -73,9 +73,6 @@ cdef class ReadOptions(_Weakrefable): The character encoding of the CSV data. Columns that cannot decode using this encoding can still be read as Binary. """ - cdef: - CCSVReadOptions options - public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -161,6 +158,40 @@ cdef class ReadOptions(_Weakrefable): def autogenerate_column_names(self, value): self.options.autogenerate_column_names = value + def equals(self, ReadOptions other): + return ( + self.use_threads == other.use_threads and + self.block_size == other.block_size and + self.skip_rows == other.skip_rows and + self.column_names == other.column_names and + self.autogenerate_column_names == + other.autogenerate_column_names and + self.encoding == other.encoding + ) + + @staticmethod + cdef ReadOptions wrap(CCSVReadOptions options): + out = ReadOptions() + out.options = options + out.encoding = 'utf8' # No way to know this + return out + + def __getstate__(self): + return (self.use_threads, self.block_size, self.skip_rows, + self.column_names, self.autogenerate_column_names, + self.encoding) + + def __setstate__(self, state): + (self.use_threads, self.block_size, self.skip_rows, + self.column_names, self.autogenerate_column_names, + self.encoding) = state + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + cdef class ParseOptions(_Weakrefable): """ @@ -320,6 +351,12 @@ cdef class ParseOptions(_Weakrefable): self.escape_char, self.newlines_in_values, self.ignore_empty_lines) = state + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + cdef class _ISO8601(_Weakrefable): """ @@ -391,9 +428,6 @@ cdef class ConvertOptions(_Weakrefable): `column_types`, or null by default). This option is ignored if `include_columns` is empty. """ - cdef: - CCSVConvertOptions options - # Avoid mistakingly creating attributes __slots__ = () @@ -603,6 +637,48 @@ cdef class ConvertOptions(_Weakrefable): self.options.timestamp_parsers = move(c_parsers) + @staticmethod + cdef ConvertOptions wrap(CCSVConvertOptions options): + out = ConvertOptions() + out.options = options + return out + + def equals(self, ConvertOptions other): + return ( + self.check_utf8 == other.check_utf8 and + self.column_types == other.column_types and + self.null_values == other.null_values and + self.true_values == other.true_values and + self.false_values == other.false_values and + self.timestamp_parsers == other.timestamp_parsers and + self.strings_can_be_null == other.strings_can_be_null and + self.auto_dict_encode == other.auto_dict_encode and + self.auto_dict_max_cardinality == + other.auto_dict_max_cardinality and + self.include_columns == other.include_columns and + self.include_missing_columns == other.include_missing_columns + ) + + def __getstate__(self): + return (self.check_utf8, self.column_types, self.null_values, + self.true_values, self.false_values, self.timestamp_parsers, + self.strings_can_be_null, self.auto_dict_encode, + self.auto_dict_max_cardinality, self.include_columns, + self.include_missing_columns) + + def __setstate__(self, state): + (self.check_utf8, self.column_types, self.null_values, + self.true_values, self.false_values, self.timestamp_parsers, + self.strings_can_be_null, self.auto_dict_encode, + self.auto_dict_max_cardinality, self.include_columns, + self.include_missing_columns) = state + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + cdef _get_reader(input_file, ReadOptions read_options, shared_ptr[CInputStream]* out): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 4f559f21e4c..169c6e17353 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -29,7 +29,7 @@ from pyarrow.lib cimport * from pyarrow.lib import frombytes, tobytes from pyarrow.includes.libarrow_dataset cimport * from pyarrow._fs cimport FileSystem, FileInfo, FileSelector -from pyarrow._csv cimport ParseOptions +from pyarrow._csv cimport ConvertOptions, ParseOptions, ReadOptions from pyarrow.util import _is_path_like, _stringify_path from pyarrow._parquet cimport ( @@ -377,6 +377,9 @@ cdef class Dataset(_Weakrefable): memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. + fragment_scan_options : FragmentScanOptions, default None + Options specific to a particular scan and fragment type, which + can change between different scans of the same dataset. Returns ------- @@ -714,6 +717,23 @@ cdef class FileFormat(_Weakrefable): def default_extname(self): return frombytes(self.format.type_name()) + @property + def default_fragment_scan_options(self): + return FragmentScanOptions.wrap( + self.wrapped.get().default_fragment_scan_options) + + @default_fragment_scan_options.setter + def default_fragment_scan_options(self, FragmentScanOptions options): + if options is None: + self.wrapped.get().default_fragment_scan_options =\ + nullptr + else: + self._set_default_fragment_scan_options(options) + + cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): + raise ValueError(f"Cannot set fragment scan options for " + f"'{options.type_name}' on {self.__class__.__name__}") + def __eq__(self, other): try: return self.equals(other) @@ -816,6 +836,9 @@ cdef class Fragment(_Weakrefable): memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. + fragment_scan_options : FragmentScanOptions, default None + Options specific to a particular scan and fragment type, which + can change between different scans of the same dataset. Returns ------- @@ -966,6 +989,45 @@ class RowGroupInfo: return self.id == other.id +cdef class FragmentScanOptions(_Weakrefable): + """Scan options specific to a particular fragment and scan operation.""" + + cdef: + shared_ptr[CFragmentScanOptions] wrapped + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): + self.wrapped = sp + + @staticmethod + cdef wrap(const shared_ptr[CFragmentScanOptions]& sp): + type_name = frombytes(sp.get().type_name()) + + classes = { + 'csv': CsvFragmentScanOptions, + } + + class_ = classes.get(type_name, None) + if class_ is None: + raise TypeError(type_name) + + cdef FragmentScanOptions self = class_.__new__(class_) + self.init(sp) + return self + + @property + def type_name(self): + return frombytes(self.wrapped.get().type_name()) + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + + cdef class ParquetFileFragment(FileFragment): """A Fragment representing a parquet file.""" @@ -1363,10 +1425,18 @@ cdef class CsvFileFormat(FileFormat): cdef: CCsvFileFormat* csv_format - def __init__(self, ParseOptions parse_options=None): + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, ParseOptions parse_options=None, + ConvertOptions convert_options=None, + ReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options + if convert_options is not None or read_options is not None: + self.default_fragment_scan_options = CsvFragmentScanOptions( + convert_options=convert_options, read_options=read_options) cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1383,12 +1453,68 @@ cdef class CsvFileFormat(FileFormat): def parse_options(self, ParseOptions parse_options not None): self.csv_format.parse_options = parse_options.options + cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): + if options.type_name == 'csv': + self.csv_format.default_fragment_scan_options = options.wrapped + else: + super()._set_default_fragment_scan_options(options) + def equals(self, CsvFileFormat other): return self.parse_options.equals(other.parse_options) def __reduce__(self): return CsvFileFormat, (self.parse_options,) + def __repr__(self): + return f"" + + +cdef class CsvFragmentScanOptions(FragmentScanOptions): + """Scan-specific options for CSV fragments.""" + + cdef: + CCsvFragmentScanOptions* csv_options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, ConvertOptions convert_options=None, + ReadOptions read_options=None): + self.init(shared_ptr[CFragmentScanOptions]( + new CCsvFragmentScanOptions())) + if convert_options is not None: + self.convert_options = convert_options + if read_options is not None: + self.read_options = read_options + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): + FragmentScanOptions.init(self, sp) + self.csv_options = sp.get() + + @property + def convert_options(self): + return ConvertOptions.wrap(self.csv_options.convert_options) + + @convert_options.setter + def convert_options(self, ConvertOptions convert_options not None): + self.csv_options.convert_options = convert_options.options + + @property + def read_options(self): + return ReadOptions.wrap(self.csv_options.read_options) + + @read_options.setter + def read_options(self, ReadOptions read_options not None): + self.csv_options.read_options = read_options.options + + def equals(self, CsvFragmentScanOptions other): + return (self.convert_options.equals(other.convert_options) and + self.read_options.equals(other.read_options)) + + def __reduce__(self): + return CsvFragmentScanOptions, (self.convert_options, + self.read_options) + cdef class Partitioning(_Weakrefable): @@ -2192,7 +2318,9 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, list columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, bint use_threads=True, - MemoryPool memory_pool=None) except *: + MemoryPool memory_pool=None, + FragmentScanOptions fragment_scan_options=None)\ + except *: cdef: CScannerBuilder *builder builder = ptr.get() @@ -2207,6 +2335,9 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, check_status(builder.UseThreads(use_threads)) if memory_pool: check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool))) + if fragment_scan_options: + check_status( + builder.FragmentScanOptions(fragment_scan_options.wrapped)) cdef class Scanner(_Weakrefable): @@ -2269,7 +2400,8 @@ cdef class Scanner(_Weakrefable): def from_dataset(Dataset dataset not None, bint use_threads=True, MemoryPool memory_pool=None, list columns=None, Expression filter=None, - int batch_size=_DEFAULT_BATCH_SIZE): + int batch_size=_DEFAULT_BATCH_SIZE, + FragmentScanOptions fragment_scan_options=None): cdef: shared_ptr[CScanOptions] options = make_shared[CScanOptions]() shared_ptr[CScannerBuilder] builder @@ -2278,7 +2410,8 @@ cdef class Scanner(_Weakrefable): builder = make_shared[CScannerBuilder](dataset.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, - memory_pool=memory_pool) + memory_pool=memory_pool, + fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) @@ -2287,7 +2420,8 @@ cdef class Scanner(_Weakrefable): def from_fragment(Fragment fragment not None, Schema schema=None, bint use_threads=True, MemoryPool memory_pool=None, list columns=None, Expression filter=None, - int batch_size=_DEFAULT_BATCH_SIZE): + int batch_size=_DEFAULT_BATCH_SIZE, + FragmentScanOptions fragment_scan_options=None): cdef: shared_ptr[CScanOptions] options = make_shared[CScanOptions]() shared_ptr[CScannerBuilder] builder @@ -2299,7 +2433,8 @@ cdef class Scanner(_Weakrefable): fragment.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, - memory_pool=memory_pool) + memory_pool=memory_pool, + fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index a2cb87a1f7a..195d414b047 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -22,6 +22,7 @@ from pyarrow._dataset import ( # noqa CsvFileFormat, + CsvFragmentScanOptions, Expression, Dataset, DatasetFactory, diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index f37f49f463d..309d3530eec 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -59,6 +59,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) + cdef cppclass CFragmentScanOptions "arrow::dataset::FragmentScanOptions": + c_string type_name() const + ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \ "arrow::dataset::ScanTaskIterator" @@ -101,6 +104,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CStatus UseThreads(c_bool use_threads) CStatus Pool(CMemoryPool* pool) CStatus BatchSize(int64_t batch_size) + CStatus FragmentScanOptions( + shared_ptr[CFragmentScanOptions] fragment_scan_options) CResult[shared_ptr[CScanner]] Finish() shared_ptr[CSchema] schema() const @@ -164,6 +169,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: c_string type_name() const cdef cppclass CFileFormat "arrow::dataset::FileFormat": + shared_ptr[CFragmentScanOptions] default_fragment_scan_options c_string type_name() const CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const CResult[shared_ptr[CFileFragment]] MakeFragment( @@ -252,6 +258,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CFileFormat): CCSVParseOptions parse_options + cdef cppclass CCsvFragmentScanOptions \ + "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): + CCSVConvertOptions convert_options + CCSVReadOptions read_options + cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const CResult[CExpression] Parse(const c_string & path) const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e34700838df..8e6bf9c3217 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -526,6 +526,10 @@ def test_file_format_pickling(): ds.CsvFileFormat(), ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', ignore_empty_lines=True)), + ds.CsvFileFormat(read_options=pa.csv.ReadOptions( + skip_rows=3, column_names=['foo'])), + ds.CsvFileFormat(read_options=pa.csv.ReadOptions( + skip_rows=3, block_size=2**20)), ds.ParquetFileFormat(), ds.ParquetFileFormat( read_options=ds.ParquetReadOptions(use_buffered_stream=True) @@ -541,6 +545,18 @@ def test_file_format_pickling(): assert pickle.loads(pickle.dumps(file_format)) == file_format +def test_fragment_scan_options_pickling(): + options = [ + ds.CsvFragmentScanOptions(), + ds.CsvFragmentScanOptions( + convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), + ds.CsvFragmentScanOptions( + read_options=pa.csv.ReadOptions(block_size=2**16)), + ] + for option in options: + assert pickle.loads(pickle.dumps(option)) == option + + @pytest.mark.parametrize('paths_or_selector', [ fs.FileSelector('subdir', recursive=True), [ @@ -2242,6 +2258,51 @@ def test_csv_format_compressed(tempdir, compression): assert result.equals(table) +def test_csv_format_options(tempdir): + path = str(tempdir / 'test.csv') + with open(path, 'w') as sink: + sink.write('skipped\ncol0\nfoo\nbar\n') + dataset = ds.dataset(path, format='csv') + result = dataset.to_table() + assert result.equals( + pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])})) + + dataset = ds.dataset(path, format=ds.CsvFileFormat( + read_options=pa.csv.ReadOptions(skip_rows=1))) + result = dataset.to_table() + assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])})) + + dataset = ds.dataset(path, format=ds.CsvFileFormat( + read_options=pa.csv.ReadOptions(column_names=['foo']))) + result = dataset.to_table() + assert result.equals( + pa.table({'foo': pa.array(['skipped', 'col0', 'foo', 'bar'])})) + + +def test_csv_fragment_options(tempdir): + path = str(tempdir / 'test.csv') + with open(path, 'w') as sink: + sink.write('col0\nfoo\nspam\nMYNULL\n') + dataset = ds.dataset(path, format='csv') + convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'], + strings_can_be_null=True) + options = ds.CsvFragmentScanOptions( + convert_options=convert_options, + read_options=pa.csv.ReadOptions(block_size=2**16)) + result = dataset.to_table(fragment_scan_options=options) + assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) + + csv_format = ds.CsvFileFormat(convert_options=convert_options) + dataset = ds.dataset(path, format=csv_format) + result = dataset.to_table() + assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) + + options = ds.CsvFragmentScanOptions() + result = dataset.to_table(fragment_scan_options=options) + assert result.equals( + pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) + + def test_feather_format(tempdir): from pyarrow.feather import write_feather diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 0d0d3d30f8d..2c13537f0c0 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -428,8 +428,16 @@ dataset___IpcFileFormat__Make <- function(){ .Call(`_arrow_dataset___IpcFileFormat__Make`) } -dataset___CsvFileFormat__Make <- function(parse_options){ - .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options) +dataset___CsvFileFormat__Make <- function(parse_options, convert_options, read_options){ + .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options, convert_options, read_options) +} + +dataset___FragmentScanOptions__type_name <- function(fragment_scan_options){ + .Call(`_arrow_dataset___FragmentScanOptions__type_name`, fragment_scan_options) +} + +dataset___CsvFragmentScanOptions__Make <- function(convert_options, read_options){ + .Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options, read_options) } dataset___DirectoryPartitioning <- function(schm){ @@ -468,6 +476,10 @@ dataset___ScannerBuilder__BatchSize <- function(sb, batch_size){ invisible(.Call(`_arrow_dataset___ScannerBuilder__BatchSize`, sb, batch_size)) } +dataset___ScannerBuilder__FragmentScanOptions <- function(sb, options){ + invisible(.Call(`_arrow_dataset___ScannerBuilder__FragmentScanOptions`, sb, options)) +} + dataset___ScannerBuilder__schema <- function(sb){ .Call(`_arrow_dataset___ScannerBuilder__schema`, sb) } diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index f1bf601c720..cd54a300606 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -40,11 +40,18 @@ #' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB. #' * `dict_columns`: Names of columns which should be read as dictionaries. #' -#' `format = "text"`: see [CsvReadOptions]. Note that you can specify them either +#' `format = "text"`: see [CsvParseOptions]. Note that you can specify them either #' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the #' `readr`-style naming used in [read_csv_arrow()] ("delim", "quote", etc.). #' Not all `readr` options are currently supported; please file an issue if -#' you encounter one that `arrow` should support. +#' you encounter one that `arrow` should support. Also, the following options are +#' supported. From [CsvReadOptions]: +#' * `skip_rows` +#' * `column_names` +#' * `autogenerate_column_names` +#' From [CsvFragmentScanOptions] (these values can be overridden at scan time): +#' * `convert_options`: a [CsvConvertOptions] +#' * `block_size` #' #' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`) #' @rdname FileFormat @@ -101,13 +108,21 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' @rdname FileFormat #' @export CsvFileFormat <- R6Class("CsvFileFormat", inherit = FileFormat) -CsvFileFormat$create <- function(..., opts = csv_file_format_parse_options(...)) { - dataset___CsvFileFormat__Make(opts) +CsvFileFormat$create <- function(..., opts = csv_file_format_parse_options(...), + convert_options = csv_file_format_convert_options(...), + read_options = csv_file_format_read_options(...)) { + dataset___CsvFileFormat__Make(opts, convert_options, read_options) } # Support both readr-style option names and Arrow C++ option names csv_file_format_parse_options <- function(...) { - opt_names <- names(list(...)) + opts <- list(...) + # Filter out arguments meant for CsvConvertOptions/CsvReadOptions + convert_opts <- names(formals(CsvConvertOptions$create)) + read_opts <- names(formals(CsvReadOptions$create)) + opts[convert_opts] <- NULL + opts[read_opts] <- NULL + opt_names <- names(opts) # Catch any readr-style options specified with full option names that are # supported by read_delim_arrow() (and its wrappers) but are not yet # supported here @@ -163,12 +178,89 @@ csv_file_format_parse_options <- function(...) { stop("Use either Arrow parse options or readr parse options, not both", call. = FALSE) } - readr_to_csv_parse_options(...) # all options have readr-style names + do.call(readr_to_csv_parse_options, opts) # all options have readr-style names + } else { + do.call(CsvParseOptions$create, opts) # all options have Arrow C++ names + } +} + +csv_file_format_convert_options <- function(...) { + opts <- list(...) + # Filter out arguments meant for CsvParseOptions/CsvReadOptions + arrow_opts <- names(formals(CsvParseOptions$create)) + readr_opts <- names(formals(readr_to_csv_parse_options)) + read_opts <- names(formals(CsvReadOptions$create)) + opts[arrow_opts] <- NULL + opts[readr_opts] <- NULL + opts[read_opts] <- NULL + do.call(CsvConvertOptions$create, opts) +} + +csv_file_format_read_options <- function(...) { + opts <- list(...) + # Filter out arguments meant for CsvParseOptions/CsvConvertOptions + arrow_opts <- names(formals(CsvParseOptions$create)) + readr_opts <- names(formals(readr_to_csv_parse_options)) + convert_opts <- names(formals(CsvConvertOptions$create)) + opts[arrow_opts] <- NULL + opts[readr_opts] <- NULL + opts[convert_opts] <- NULL + do.call(CsvReadOptions$create, opts) +} + +#' Format-specific scan options +#' +#' @description +#' A `FragmentScanOptions` holds options specific to a `FileFormat` and a scan +#' operation. +#' +#' @section Factory: +#' `FragmentScanOptions$create()` takes the following arguments: +#' * `format`: A string identifier of the file format. Currently supported values: +#' * "csv"/"text", aliases for the same format. +#' * `...`: Additional format-specific options +#' +#' `format = "text"`: see [CsvConvertOptions]. Note that options can only be +#' specified with the Arrow C++ library naming. Also, "block_size" from +#' [CsvReadOptions] may be given. +#' +#' It returns the appropriate subclass of `FragmentScanOptions` +#' (e.g. `CsvFragmentScanOptions`). +#' @rdname FragmentScanOptions +#' @name FragmentScanOptions +#' @export +FragmentScanOptions <- R6Class("FragmentScanOptions", inherit = ArrowObject, + active = list( + # @description + # Return the `FragmentScanOptions`'s type + type = function() dataset___FragmentScanOptions__type_name(self) + ) +) +FragmentScanOptions$create <- function(format, ...) { + opt_names <- names(list(...)) + if (format %in% c("csv", "text", "tsv")) { + CsvFragmentScanOptions$create(...) } else { - CsvParseOptions$create(...) # all options have Arrow C++ names + stop("Unsupported file format: ", format, call. = FALSE) } } +#' @export +as.character.FragmentScanOptions <- function(x, ...) { + x$type +} + +#' @usage NULL +#' @format NULL +#' @rdname FragmentScanOptions +#' @export +CsvFragmentScanOptions <- R6Class("CsvFragmentScanOptions", inherit = FragmentScanOptions) +CsvFragmentScanOptions$create <- function(..., + convert_opts = csv_file_format_convert_options(...), + read_opts = csv_file_format_read_options(...)) { + dataset___CsvFragmentScanOptions__Make(convert_opts, read_opts) +} + #' Format-specific write options #' #' @description diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 1c71bf481b5..7e148863226 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -67,6 +67,7 @@ Scanner$create <- function(dataset, filter = TRUE, use_threads = option_use_threads(), batch_size = NULL, + fragment_scan_options = NULL, ...) { if (inherits(dataset, "arrow_dplyr_query")) { if (inherits(dataset$.data, "ArrowTabular")) { @@ -78,6 +79,8 @@ Scanner$create <- function(dataset, dataset$selected_columns, dataset$filtered_rows, use_threads, + batch_size, + fragment_scan_options, ... )) } @@ -99,6 +102,9 @@ Scanner$create <- function(dataset, if (is_integerish(batch_size)) { scanner_builder$BatchSize(batch_size) } + if (!is.null(fragment_scan_options)) { + scanner_builder$FragmentScanOptions(fragment_scan_options) + } scanner_builder$Finish() } @@ -185,6 +191,10 @@ ScannerBuilder <- R6Class("ScannerBuilder", inherit = ArrowObject, dataset___ScannerBuilder__BatchSize(self, batch_size) self }, + FragmentScanOptions = function(options) { + dataset___ScannerBuilder__FragmentScanOptions(self, options) + self + }, Finish = function() dataset___ScannerBuilder__Finish(self) ), active = list( diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 7229e60b649..697451d0dd9 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1105,19 +1105,52 @@ extern "C" SEXP _arrow_dataset___IpcFileFormat__Make(){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options); -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){ +std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options, const std::shared_ptr& convert_options, const std::shared_ptr& read_options); +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP convert_options_sexp, SEXP read_options_sexp){ BEGIN_CPP11 arrow::r::Input&>::type parse_options(parse_options_sexp); - return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options)); + arrow::r::Input&>::type convert_options(convert_options_sexp); + arrow::r::Input&>::type read_options(read_options_sexp); + return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options, convert_options, read_options)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){ +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP convert_options_sexp, SEXP read_options_sexp){ Rf_error("Cannot call dataset___CsvFileFormat__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +std::string dataset___FragmentScanOptions__type_name(const std::shared_ptr& fragment_scan_options); +extern "C" SEXP _arrow_dataset___FragmentScanOptions__type_name(SEXP fragment_scan_options_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type fragment_scan_options(fragment_scan_options_sexp); + return cpp11::as_sexp(dataset___FragmentScanOptions__type_name(fragment_scan_options)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___FragmentScanOptions__type_name(SEXP fragment_scan_options_sexp){ + Rf_error("Cannot call dataset___FragmentScanOptions__type_name(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +std::shared_ptr dataset___CsvFragmentScanOptions__Make(const std::shared_ptr& convert_options, const std::shared_ptr& read_options); +extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP read_options_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type convert_options(convert_options_sexp); + arrow::r::Input&>::type read_options(read_options_sexp); + return cpp11::as_sexp(dataset___CsvFragmentScanOptions__Make(convert_options, read_options)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP read_options_sexp){ + Rf_error("Cannot call dataset___CsvFragmentScanOptions__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm); @@ -1265,6 +1298,23 @@ extern "C" SEXP _arrow_dataset___ScannerBuilder__BatchSize(SEXP sb_sexp, SEXP ba } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +void dataset___ScannerBuilder__FragmentScanOptions(const std::shared_ptr& sb, const std::shared_ptr& options); +extern "C" SEXP _arrow_dataset___ScannerBuilder__FragmentScanOptions(SEXP sb_sexp, SEXP options_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type sb(sb_sexp); + arrow::r::Input&>::type options(options_sexp); + dataset___ScannerBuilder__FragmentScanOptions(sb, options); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___ScannerBuilder__FragmentScanOptions(SEXP sb_sexp, SEXP options_sexp){ + Rf_error("Cannot call dataset___ScannerBuilder__FragmentScanOptions(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___ScannerBuilder__schema(const std::shared_ptr& sb); @@ -4222,7 +4272,9 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update1, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, - { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, + { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 3}, + { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, + { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1}, { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 2}, @@ -4232,6 +4284,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Filter, 2}, { "_arrow_dataset___ScannerBuilder__UseThreads", (DL_FUNC) &_arrow_dataset___ScannerBuilder__UseThreads, 2}, { "_arrow_dataset___ScannerBuilder__BatchSize", (DL_FUNC) &_arrow_dataset___ScannerBuilder__BatchSize, 2}, + { "_arrow_dataset___ScannerBuilder__FragmentScanOptions", (DL_FUNC) &_arrow_dataset___ScannerBuilder__FragmentScanOptions, 2}, { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) &_arrow_dataset___ScannerBuilder__schema, 1}, { "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Finish, 1}, { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 83c7cbb844c..89c3e4d56d8 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -272,12 +272,36 @@ std::shared_ptr dataset___IpcFileFormat__Make() { // [[dataset::export]] std::shared_ptr dataset___CsvFileFormat__Make( - const std::shared_ptr& parse_options) { + const std::shared_ptr& parse_options, + const std::shared_ptr& convert_options, + const std::shared_ptr& read_options) { auto format = std::make_shared(); format->parse_options = *parse_options; + auto scan_options = std::make_shared(); + if (convert_options) scan_options->convert_options = *convert_options; + if (read_options) scan_options->read_options = *read_options; + format->default_fragment_scan_options = std::move(scan_options); return format; } +// FragmentScanOptions, CsvFragmentScanOptions + +// [[dataset::export]] +std::string dataset___FragmentScanOptions__type_name( + const std::shared_ptr& fragment_scan_options) { + return fragment_scan_options->type_name(); +} + +// [[dataset::export]] +std::shared_ptr dataset___CsvFragmentScanOptions__Make( + const std::shared_ptr& convert_options, + const std::shared_ptr& read_options) { + auto options = std::make_shared(); + options->convert_options = *convert_options; + options->read_options = *read_options; + return options; +} + // DirectoryPartitioning, HivePartitioning // [[dataset::export]] @@ -346,6 +370,13 @@ void dataset___ScannerBuilder__BatchSize(const std::shared_ptrBatchSize(batch_size)); } +// [[dataset::export]] +void dataset___ScannerBuilder__FragmentScanOptions( + const std::shared_ptr& sb, + const std::shared_ptr& options) { + StopIfNotOk(sb->FragmentScanOptions(options)); +} + // [[dataset::export]] std::shared_ptr dataset___ScannerBuilder__schema( const std::shared_ptr& sb) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 67fd5004320..e6db0bcea17 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -295,6 +295,45 @@ test_that("CSV dataset", { ) }) +test_that("CSV scan options", { + options <- FragmentScanOptions$create("text") + expect_equal(options$type, "csv") + options <- FragmentScanOptions$create("csv", + null_values = c("mynull"), + strings_can_be_null = TRUE) + expect_equal(options$type, "csv") + + dst_dir <- make_temp_dir() + dst_file <- file.path(dst_dir, "data.csv") + df <- tibble(chr = c("foo", "mynull")) + write.csv(df, dst_file, row.names = FALSE, quote = FALSE) + + ds <- open_dataset(dst_dir, format = "csv") + expect_equivalent(ds %>% collect(), df) + + sb <- ds$NewScan() + sb$FragmentScanOptions(options) + + tab <- sb$Finish()$ToTable() + expect_equivalent(as.data.frame(tab), tibble(chr = c("foo", NA))) + + # Set default convert options in CsvFileFormat + csv_format <- CsvFileFormat$create(null_values = c("mynull"), + strings_can_be_null = TRUE) + ds <- open_dataset(dst_dir, format = csv_format) + expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA))) + + # Set both parse and convert options + df <- tibble(chr = c("foo", "mynull"), chr2 = c("bar", "baz")) + write.table(df, dst_file, row.names = FALSE, quote = FALSE, sep = "\t") + ds <- open_dataset(dst_dir, format = "csv", + delimiter="\t", + null_values = c("mynull"), + strings_can_be_null = TRUE) + expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA), + chr2 = c("bar", "baz"))) +}) + test_that("compressed CSV dataset", { skip_if_not_available("gzip") dst_dir <- make_temp_dir() @@ -318,6 +357,33 @@ test_that("compressed CSV dataset", { ) }) +test_that("CSV dataset options", { + dst_dir <- make_temp_dir() + dst_file <- file.path(dst_dir, "data.csv") + df <- tibble(chr = letters[1:10]) + write.csv(df, dst_file, row.names = FALSE, quote = FALSE) + + format <- FileFormat$create("csv", skip_rows = 1) + ds <- open_dataset(dst_dir, format = format) + + expect_equivalent( + ds %>% + select(string = a) %>% + collect(), + df1[-1,] %>% + select(string = chr) + ) + + ds <- open_dataset(dst_dir, format = "csv", column_names = c("foo")) + + expect_equivalent( + ds %>% + select(string = foo) %>% + collect(), + tibble(foo = c(c('chr'), letters[1:10])) + ) +}) + test_that("Other text delimited dataset", { ds1 <- open_dataset(tsv_dir, partitioning = "part", format = "tsv") expect_equivalent(