Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/csv/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct ARROW_EXPORT ConvertOptions {

/// Whether to try to automatically dict-encode string / binary data.
/// If true, then when type inference detects a string or binary column,
/// it it dict-encoded up to `auto_dict_max_cardinality` distinct values
/// it is dict-encoded up to `auto_dict_max_cardinality` distinct values
/// (per chunk), after which it switches to regular encoding.
///
/// This setting is ignored for non-inferred columns (those in `column_types`).
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ set(ARROW_DATASET_SRCS
set(ARROW_DATASET_LINK_STATIC arrow_static)
set(ARROW_DATASET_LINK_SHARED arrow_shared)

if(ARROW_CSV)
set(ARROW_DATASET_SRCS ${ARROW_DATASET_SRCS} file_csv.cc)
endif()

if(ARROW_PARQUET)
set(ARROW_DATASET_LINK_STATIC ${ARROW_DATASET_LINK_STATIC} parquet_static)
set(ARROW_DATASET_LINK_SHARED ${ARROW_DATASET_LINK_SHARED} parquet_shared)
Expand Down Expand Up @@ -108,6 +112,10 @@ add_arrow_dataset_test(filter_test)
add_arrow_dataset_test(partition_test)
add_arrow_dataset_test(scanner_test)

if(ARROW_CSV)
add_arrow_dataset_test(file_csv_test)
endif()

if(ARROW_PARQUET)
add_arrow_dataset_test(file_parquet_test)
endif()
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/discovery.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_csv.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/filter.h"
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ namespace dataset {

/// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
/// flattened FragmentIterator.
static inline FragmentIterator GetFragmentsFromDatasets(
const DatasetVector& datasets, std::shared_ptr<Expression> predicate) {
inline FragmentIterator GetFragmentsFromDatasets(const DatasetVector& datasets,
std::shared_ptr<Expression> predicate) {
// Iterator<Dataset>
auto datasets_it = MakeVectorIterator(datasets);

Expand All @@ -51,12 +51,11 @@ static inline FragmentIterator GetFragmentsFromDatasets(
return MakeFlattenIterator(std::move(fragments_it));
}

static inline RecordBatchIterator IteratorFromReader(
std::shared_ptr<RecordBatchReader> reader) {
inline RecordBatchIterator IteratorFromReader(std::shared_ptr<RecordBatchReader> reader) {
return MakeFunctionIterator([reader] { return reader->Next(); });
}

static inline std::shared_ptr<Schema> SchemaFromColumnNames(
inline std::shared_ptr<Schema> SchemaFromColumnNames(
const std::shared_ptr<Schema>& input, const std::vector<std::string>& column_names) {
std::vector<std::shared_ptr<Field>> columns;
for (FieldRef ref : column_names) {
Expand Down
136 changes: 136 additions & 0 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/dataset/file_csv.h"

#include <algorithm>
#include <memory>
#include <string>
#include <utility>

#include "arrow/csv/options.h"
#include "arrow/csv/reader.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/result.h"
#include "arrow/type.h"
#include "arrow/util/iterator.h"

namespace arrow {
namespace dataset {

using internal::checked_cast;
using internal::checked_pointer_cast;

static inline Result<csv::ConvertOptions> GetConvertOptions(
const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
auto options = csv::ConvertOptions::Defaults();
if (scan_options != nullptr) {
// This is set to true to match behavior with other formats; a missing column
// will be materialized as null.
options.include_missing_columns = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect the FilterAndProjectScanTask to fix this in a more efficient way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to stop conversion from erroring if a column is projected but absent from the file. Another way to handle this would be: restrict include_colums to columns present in the file then let the projector handle the rest as you say, but that would require knowledge of the file's columns which we don't have at this stage


for (const auto& field : scan_options->schema()->fields()) {
options.column_types[field->name()] = field->type();
options.include_columns.push_back(field->name());
}

// FIXME(bkietz) also acquire types of fields materialized but not projected.
for (auto&& name : FieldsInExpression(scan_options->filter)) {
ARROW_ASSIGN_OR_RAISE(auto match,
FieldRef(name).FindOneOrNone(*scan_options->schema()));
if (match.indices().empty()) {
options.include_columns.push_back(std::move(name));
}
}
}
return options;
}

static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
auto options = csv::ReadOptions::Defaults();
// Multithreaded conversion of individual files would lead to excessive thread
// contention when ScanTasks are also executed in multiple threads, so we disable it
// here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be good reasons for it, but this makes it different than eg parquet, where you also get multithreaded reading when only reading a single file (at least with ToTable).

In addition, it could maybe still be exposed to the user?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multithreaded reading of a single CSV file without excessive contention in the many file case would require refactoring the CSV readers to expose their parallelism as tasks which could be productively scheduled alongside the ScanTasks. That's a significant change and should certainly wait for a follow up.

Even after we have such an API, the decision to use threading or not would be derived from scan parameters (see ScannerBuilder::UseThreads) and so it doesn't belong in the format.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education: the parquet reader has such APIs, and thus there this is not a problem?

use threading or not would be derived from scan parameters (see ScannerBuilder::UseThreads) and so it doesn't belong in the format.

Yep, that's true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet as a format does not require chunking; it is already subdivided into rowgroups. We simply wrap each of those in scan tasks and in this way scan individual files in parallel.

There are several ways we could augment CSV to support something like this: exposing a task interface as I mentioned, including a sidecar file with cached chunk start offsets, using csv::Chunker directly when generating ScanTasks (so each ScanTask wraps a single independently parseable chunk), ...

options.use_threads = false;
return options;
}

static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& options = nullptr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd take a ConvertOption instead of a ScanTask.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean

MemoryPool* pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());

auto reader_options = GetReadOptions(format);
const auto& parse_options = format.parse_options;
ARROW_ASSIGN_OR_RAISE(auto convert_options, GetConvertOptions(format, options));
auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options,
parse_options, convert_options);
if (!maybe_reader.ok()) {
return maybe_reader.status().WithMessage("Could not open CSV input source '",
source.path(), "': ", maybe_reader.status());
}

return std::move(maybe_reader).ValueOrDie();
}

/// \brief A ScanTask backed by an Csv file.
class CsvScanTask : public ScanTask {
public:
CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
: ScanTask(std::move(options), std::move(context)),
format_(std::move(format)),
source_(std::move(source)) {}

Result<RecordBatchIterator> Execute() override {
ARROW_ASSIGN_OR_RAISE(auto reader,
OpenReader(source_, *format_, options(), context()->pool));
return IteratorFromReader(std::move(reader));
}

private:
std::shared_ptr<const CsvFileFormat> format_;
FileSource source_;
};

Result<bool> CsvFileFormat::IsSupported(const FileSource& source) const {
RETURN_NOT_OK(source.Open().status());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you let OpenReader fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If source fails to open (for example if it points to a file which doesn't exist) that should raise an error rather than returning false, which is what this line detects.

return OpenReader(source, *this).ok();
}

Result<std::shared_ptr<Schema>> CsvFileFormat::Inspect(const FileSource& source) const {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, *this));
return reader->schema();
}

Result<ScanTaskIterator> CsvFileFormat::ScanFile(
const FileSource& source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const {
auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
auto task = std::make_shared<CsvScanTask>(std::move(this_), source, std::move(options),
std::move(context));

return MakeVectorIterator<std::shared_ptr<ScanTask>>({std::move(task)});
}

} // namespace dataset
} // namespace arrow
54 changes: 54 additions & 0 deletions cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "arrow/csv/options.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/result.h"

namespace arrow {
namespace dataset {

/// \brief A FileFormat implementation that reads from and writes to Csv files
class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
public:
/// Options affecting the parsing of CSV files
csv::ParseOptions parse_options = csv::ParseOptions::Defaults();

std::string type_name() const override { return "csv"; }

Result<bool> IsSupported(const FileSource& source) const override;

/// \brief Return the schema of the file if possible.
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;

/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const override;
};

} // namespace dataset
} // namespace arrow
Loading