From 7eaab503486963ab807d384d2aa314c6155ef743 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 3 Feb 2021 06:38:13 +0000 Subject: [PATCH 01/18] ARROW-2229: [C++][Python] Add WriteCsv functionality. This offers possibly performance naive CSV writer with limited options to keep the initial PR down. Obvious potential improvements to this approach are: - Smarter casts for dictionaries - Arena allocation for intermediate cast results The implementation also means that for all primitive type support we might have to fill in gaps in our cast function. --- cpp/src/arrow/CMakeLists.txt | 3 +- cpp/src/arrow/csv/api.h | 1 + cpp/src/arrow/csv/options.h | 8 + cpp/src/arrow/csv/writer.cc | 397 +++++++++++++++++++++++++++ cpp/src/arrow/csv/writer.h | 47 ++++ cpp/src/arrow/ipc/json_simple.cc | 6 +- cpp/src/arrow/record_batch.cc | 25 ++ cpp/src/arrow/record_batch.h | 5 + cpp/src/arrow/util/iterator.h | 3 +- python/pyarrow/_csv.pyx | 54 +++- python/pyarrow/csv.py | 2 +- python/pyarrow/includes/libarrow.pxd | 7 + python/pyarrow/tests/test_csv.py | 20 +- 13 files changed, 571 insertions(+), 7 deletions(-) create mode 100644 cpp/src/arrow/csv/writer.cc create mode 100644 cpp/src/arrow/csv/writer.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 382a851c159..e00c8448bbe 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -348,7 +348,8 @@ if(ARROW_CSV) csv/column_decoder.cc csv/options.cc csv/parser.cc - csv/reader.cc) + csv/reader.cc + csv/writer.cc) list(APPEND ARROW_TESTING_SRCS csv/test_common.cc) endif() diff --git a/cpp/src/arrow/csv/api.h b/cpp/src/arrow/csv/api.h index df88843f51b..4af1835cd70 100644 --- a/cpp/src/arrow/csv/api.h +++ b/cpp/src/arrow/csv/api.h @@ -19,3 +19,4 @@ #include "arrow/csv/options.h" #include "arrow/csv/reader.h" +#include "arrow/csv/writer.h" diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 82153ed466a..2786a673654 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -137,5 +137,13 @@ struct ARROW_EXPORT ReadOptions { static ReadOptions Defaults(); }; +/// Experimental +struct WriteOptions { + bool include_header = true; + // The writer processes batches of rows together. This is the + // maximum number of rows processed at a time. + int32_t batch_size = 1024; +}; + } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc new file mode 100644 index 00000000000..cb6f0e63313 --- /dev/null +++ b/cpp/src/arrow/csv/writer.cc @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/csv/writer.h" +#include "arrow/array.h" +#include "arrow/compute/cast.h" +#include "arrow/io/interfaces.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/result_internal.h" +#include "arrow/stl_allocator.h" +#include "arrow/util/make_unique.h" + +#include "arrow/visitor_inline.h" + +namespace arrow { +namespace csv { +// This implementation is intentionally light on configurability to minimize the size of +// the initial PR. Aditional features can be added as there is demand and interest to +// implement them. +// +// The algorithm used here at a high level is to break RecordBatches/Tables into slices +// and convert each slice independently. A slice is then converted to CSV by first +// scanning each column to determine how the size its contents will take in the CSV. For +// non-string types this requires casting the value to string (which is cached). This +// data is used to understand the precise length of each row and a single allocation for +// the final CSV data buffer. Once the final size is known each column is then iterated +// over again to place its contents into the CSV data buffer. The rationale for choosing +// this approach is it allows for reuse of the cast functionality in the compute module // +// and inline data visiting functionality in the core library. A performance comparison +// has not been done using a naive single-pass approach. This approach might still be +// competitive due to reduction in the number of per row branches necessary with a single +// pass approach. Profiling would likely yield further opportunities for optimization with +// this approach. + +namespace { + +// Counts the number of characters that need escaping in s. +int64_t CountEscapes(util::string_view s) { + return static_cast(std::count(s.begin(), s.end(), '"')); +} + +// Matching quote pair character length. +constexpr int64_t kQuoteCount = 2; + +// Interface for generating CSV data per column. +// The intended usage is to iteratively call UpdateRowLengths for a column and +// then PopulateColumns. +class ColumnPopulator { + public: + ColumnPopulator(MemoryPool* pool, char end_char) : end_char_(end_char), pool_(pool) {} + virtual ~ColumnPopulator() = default; + // Adds the number of characters each entry in data will add to to elements + // in row_lengths. + Status UpdateRowLengths(const Array& data, int32_t* row_lengths) { + compute::ExecContext ctx(pool_); + // Populators are intented to be applied to reasonably small data. In most cases + // threading overhead would not be justified. + ctx.set_use_threads(false); + ASSIGN_OR_RAISE( + std::shared_ptr casted, + compute::Cast(data, /*to_type=*/utf8(), compute::CastOptions(), &ctx)); + casted_array_ = internal::checked_pointer_cast(casted); + return UpdateRowLengths(row_lengths); + } + + // Places string data onto each row in output and updates the corresponding row + // row pointers in preparation for calls to other ColumnPopulators. + virtual void PopulateColumns(char** output) const = 0; + + protected: + virtual Status UpdateRowLengths(int32_t* row_lengths) = 0; + std::shared_ptr casted_array_; + const char end_char_; + + private: + MemoryPool* const pool_; +}; + +// Copies the contents of to out properly escaping any necessary charaters. +char* Escape(arrow::util::string_view s, char* out) { + for (const char* val = s.data(); val < s.data() + s.length(); val++, out++) { + if (*val == '"') { + *out = *val; + out++; + } + *out = *val; + } + return out; +} + +// Populator for non-string types. This populator relies on compute Cast functionality to +// String if it doesn't exist it will be an error. it also assumes the resulting string +// from a cast does not require quoting or escaping. +class UnquotedColumnPopulator : public ColumnPopulator { + public: + explicit UnquotedColumnPopulator(MemoryPool* memory_pool, char end_char) + : ColumnPopulator(memory_pool, end_char) {} + Status UpdateRowLengths(int32_t* row_lengths) override { + for (int x = 0; x < casted_array_->length(); x++) { + row_lengths[x] += casted_array_->value_length(x); + } + return Status::OK(); + } + + void PopulateColumns(char** rows) const override { + VisitArrayDataInline( + *casted_array_->data(), + [&](arrow::util::string_view s) { + int64_t next_column_offset = s.length() + /*end_char*/ 1; + memcpy(*rows, s.data(), s.length()); + *(*rows + s.length()) = end_char_; + *rows += next_column_offset; + rows++; + }, + [&]() { + // Nulls are empty (unquoted) to distinguish with empty string. + **rows = end_char_; + *rows += 1; + rows++; + }); + } +}; + +// Strings need special handling to ensure they are escaped properly. +// This class handles escaping assuming that all strings will be quoted +// and that the only character within the string that needs to escaped is +// a quote character (") and escaping is done my adding another quote. +class QuotedColumnPopulator : public ColumnPopulator { + public: + QuotedColumnPopulator(MemoryPool* pool, char end_char) + : ColumnPopulator(pool, end_char) {} + + Status UpdateRowLengths(int32_t* row_lengths) override { + const StringArray& input = *casted_array_; + extra_chars_count_.resize(input.length()); + auto extra_chars = extra_chars_count_.begin(); + VisitArrayDataInline( + *input.data(), + [&](arrow::util::string_view s) { + int64_t escaped_count = CountEscapes(s); + *extra_chars = escaped_count + kQuoteCount; + extra_chars++; + }, + [&]() { + *extra_chars = 0; + extra_chars++; + }); + + for (int x = 0; x < input.length(); x++) { + row_lengths[x] += extra_chars_count_[x] + input.value_length(x); + } + return Status::OK(); + } + + void PopulateColumns(char** rows) const override { + const int32_t* extra_chars = extra_chars_count_.data(); + VisitArrayDataInline( + *(casted_array_->data()), + [&](arrow::util::string_view s) { + int64_t next_column_offset = *extra_chars + s.length() + /*end_char*/ 1; + **rows = '"'; + if (*extra_chars == kQuoteCount) { + memcpy((*rows + 1), s.data(), s.length()); + } else { + Escape(s, (*rows + 1)); + } + *(*rows + next_column_offset - 2) = '"'; + *(*rows + next_column_offset - 1) = end_char_; + *rows += next_column_offset; + extra_chars++; + rows++; + }, + [&]() { + // Nulls are empty (unquoted) to distinguish with empty string. + **rows = end_char_; + *rows += 1; + rows++; + extra_chars++; + }); + } + + private: + std::vector> extra_chars_count_; +}; + +struct PopulatorFactory { + template + enable_if_t::value || + std::is_same::value, + Status> + Visit(const TypeClass& type) { + populator = new QuotedColumnPopulator(pool, end_char); + return Status::OK(); + } + + template + enable_if_dictionary Visit(const TypeClass& type) { + return VisitTypeInline(*type.value_type(), this); + } + + template + enable_if_t::value || is_extension_type::value, + Status> + Visit(const TypeClass& type) { + return Status::Invalid("Nested and extension types not supported"); + } + + template + enable_if_t::value || is_decimal_type::value || + is_null_type::value || is_temporal_type::value, + Status> + Visit(const TypeClass& type) { + populator = new UnquotedColumnPopulator(pool, end_char); + return Status::OK(); + } + + char end_char; + MemoryPool* pool; + ColumnPopulator* populator; +}; + +Result> MakePopulator(const Field& field, char end_char, + MemoryPool* pool) { + PopulatorFactory factory{end_char, pool, nullptr}; + RETURN_NOT_OK(VisitTypeInline(*field.type(), &factory)); + return std::unique_ptr(factory.populator); +} + +class CsvConverter { + public: + static Result> Make(std::shared_ptr schema, + MemoryPool* pool) { + std::vector> populators(schema->num_fields()); + for (int col = 0; col < schema->num_fields(); col++) { + char end_char = col < schema->num_fields() - 1 ? ',' : '\n'; + ASSIGN_OR_RAISE(populators[col], + MakePopulator(*schema->field(col), end_char, pool)); + } + return std::unique_ptr( + new CsvConverter(std::move(schema), std::move(populators), pool)); + } + static constexpr int64_t kColumnSizeGuess = 8; + Status WriteCsv(const RecordBatch& batch, const WriteOptions& options, + io::OutputStream* out) { + RETURN_NOT_OK(PrepareForContentsWrite(options, out)); + RecordBatchIterator iterator = batch.SliceIterator(options.batch_size); + for (auto maybe_slice : iterator) { + ASSIGN_OR_RAISE(std::shared_ptr slice, maybe_slice); + RETURN_NOT_OK(TranslateMininalBatch(*slice)); + RETURN_NOT_OK(out->Write(data_buffer_)); + } + return Status::OK(); + } + + Status WriteCsv(const Table& table, const WriteOptions& options, + io::OutputStream* out) { + TableBatchReader reader(table); + reader.set_chunksize(options.batch_size); + RETURN_NOT_OK(PrepareForContentsWrite(options, out)); + std::shared_ptr batch; + RETURN_NOT_OK(reader.ReadNext(&batch)); + while (batch != nullptr) { + RETURN_NOT_OK(TranslateMininalBatch(*batch)); + RETURN_NOT_OK(out->Write(data_buffer_)); + RETURN_NOT_OK(reader.ReadNext(&batch)); + } + + return Status::OK(); + } + + private: + CsvConverter(std::shared_ptr schema, + std::vector> populators, MemoryPool* pool) + : schema_(std::move(schema)), + column_populators_(std::move(populators)), + row_positions_(1024, nullptr, arrow::stl::allocator(pool)), + pool_(pool) {} + + const std::shared_ptr schema_; + + Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) { + if (data_buffer_ == nullptr) { + ASSIGN_OR_RAISE( + data_buffer_, + AllocateResizableBuffer( + options.batch_size * schema_->num_fields() * kColumnSizeGuess, pool_)); + } + if (options.include_header) { + RETURN_NOT_OK(WriteHeader(out)); + } + return Status::OK(); + } + + int64_t CalculateHeaderSize() const { + int64_t header_length = 0; + for (int col = 0; col < schema_->num_fields(); col++) { + const std::string& col_name = schema_->field(col)->name(); + header_length += col_name.size(); + header_length += CountEscapes(col_name); + } + return header_length + (3 * schema_->num_fields()); + } + + Status WriteHeader(io::OutputStream* out) { + RETURN_NOT_OK(data_buffer_->Resize(CalculateHeaderSize(), /*shrink_to_fit=*/false)); + char* next = reinterpret_cast(data_buffer_->mutable_data()); + for (int col = 0; col < schema_->num_fields(); col++) { + *next++ = '"'; + next = Escape(schema_->field(col)->name(), next); + *next++ = '"'; + *next++ = ','; + } + next--; + *next = '\n'; + return out->Write(data_buffer_); + } + + Status TranslateMininalBatch(const RecordBatch& batch) { + if (batch.num_rows() == 0) { + return Status::OK(); + } + std::vector> offsets( + batch.num_rows(), 0, arrow::stl::allocator(pool_)); + + // Calculate relative offsets for each row (excluding delimiters) + for (size_t col = 0; col < column_populators_.size(); col++) { + RETURN_NOT_OK( + column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets.data())); + } + // Calculate cumulalative offsets for each row (including delimiters). + offsets[0] += batch.num_columns(); + for (int64_t row = 1; row < batch.num_rows(); row++) { + offsets[row] += offsets[row - 1] + /*delimiter lengths*/ batch.num_columns(); + } + // Resize the target buffer to required size. We assume batch to batch sizes + // should be pretty close so don't shrink the buffer to avoid allocation churn. + RETURN_NOT_OK(data_buffer_->Resize(offsets.back(), /*shrink_to_fit=*/false)); + + // Calculate pointers to the start of each row. + row_positions_.resize(batch.num_rows()); + row_positions_[0] = reinterpret_cast(data_buffer_->mutable_data()); + for (size_t row = 1; row < row_positions_.size(); row++) { + row_positions_[row] = + reinterpret_cast(data_buffer_->mutable_data()) + offsets[row - 1]; + } + // Use the pointers to populate all of the data. + for (const auto& populator : column_populators_) { + populator->PopulateColumns(row_positions_.data()); + } + return Status::OK(); + } + std::vector> column_populators_; + std::vector> row_positions_; + std::shared_ptr data_buffer_; + MemoryPool* pool_; +}; + +} // namespace + +Status WriteCsv(const Table& table, const WriteOptions& options, MemoryPool* pool, + arrow::io::OutputStream* output) { + if (pool == nullptr) { + pool = default_memory_pool(); + } + ASSIGN_OR_RAISE(std::unique_ptr converter, + CsvConverter::Make(table.schema(), pool)); + return converter->WriteCsv(table, options, output); +} + +Status WriteCsv(const RecordBatch& batch, const WriteOptions& options, MemoryPool* pool, + arrow::io::OutputStream* output) { + if (pool == nullptr) { + pool = default_memory_pool(); + } + + ASSIGN_OR_RAISE(std::unique_ptr converter, + CsvConverter::Make(batch.schema(), pool)); + return converter->WriteCsv(batch, options, output); +} + +} // namespace csv +} // namespace arrow diff --git a/cpp/src/arrow/csv/writer.h b/cpp/src/arrow/csv/writer.h new file mode 100644 index 00000000000..90d0fa06d3b --- /dev/null +++ b/cpp/src/arrow/csv/writer.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/csv/options.h" +#include "arrow/io/interfaces.h" +#include "arrow/record_batch.h" +#include "arrow/table.h" + +namespace arrow { +namespace csv { +// Functionality for converting Arrow data to Comma separated value text. +// This library supports all primitive types that can be cast to a StringArrays. +// It applies to following formatting rules: +// - For non-binary types no quotes surround values. Nulls are represented as the empty +// string. +// - For binary types all non-null data is quoted (and quotes within data are escaped +// with an additional quote). +// Null values are empty and unquoted. +// - LF (\n) is always used as a line ending. + +/// \brief Converts table to a CSV and writes the results to output. +/// Experimental +ARROW_EXPORT Status WriteCsv(const Table& table, const WriteOptions& options, + MemoryPool* pool, arrow::io::OutputStream* output); +/// \brief Converts batch to CSV and writes the results to output. +/// Experimental +ARROW_EXPORT Status WriteCsv(const RecordBatch& batch, const WriteOptions& options, + MemoryPool* pool, arrow::io::OutputStream* output); + +} // namespace csv +} // namespace arrow diff --git a/cpp/src/arrow/ipc/json_simple.cc b/cpp/src/arrow/ipc/json_simple.cc index fba8194aeb1..caf6fd06b9c 100644 --- a/cpp/src/arrow/ipc/json_simple.cc +++ b/cpp/src/arrow/ipc/json_simple.cc @@ -43,6 +43,7 @@ #include #include #include +#include namespace rj = arrow::rapidjson; @@ -652,8 +653,11 @@ class StructConverter final : public ConcreteConverter { } } if (remaining > 0) { + rj::StringBuffer sb; + rj::Writer writer(sb); + json_obj.Accept(writer); return Status::Invalid("Unexpected members in JSON object for type ", - type_->ToString()); + type_->ToString(), " Object: ", sb.GetString()); } return builder_->Append(); } diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index cb8b77e2be8..62bf2186b8a 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -37,6 +37,26 @@ namespace arrow { +namespace { +// If there will only be one slice returned it is cheaper to just return the original +// RecordBatch (no overhead from vector and std::shared_ptr copying on underlying arrays). + +struct SliceIteratorFunctor { + Result> Next() { + if (current_offset < batch->num_rows()) { + std::shared_ptr next = batch->Slice(current_offset, slice_size); + current_offset += slice_size; + return next; + } + return IterationTraits>::End(); + } + const RecordBatch* const batch; + const int64_t slice_size; + int64_t current_offset; +}; + +} // namespace + Result> RecordBatch::AddColumn( int i, std::string field_name, const std::shared_ptr& column) const { auto field = ::arrow::field(std::move(field_name), column->type()); @@ -196,6 +216,11 @@ Result> RecordBatch::FromStructArray( array->data()->child_data); } +RecordBatchIterator RecordBatch::SliceIterator(int64_t slice_size) const { + SliceIteratorFunctor functor = {this, slice_size, /*offset=*/static_cast(0)}; + return RecordBatchIterator(std::move(functor)); +} + Result> RecordBatch::ToStructArray() const { if (num_columns() != 0) { return StructArray::Make(columns(), schema()->fields()); diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 95229eb78d5..f779b06386b 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -25,6 +25,7 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" +#include "arrow/util/iterator.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -163,6 +164,10 @@ class ARROW_EXPORT RecordBatch { /// \return new record batch virtual std::shared_ptr Slice(int64_t offset, int64_t length) const = 0; + // Returns an iterator for maximum slice size over this record batch. The Iterator + // Becomes invalid when this object goes out of scope. + RecordBatchIterator SliceIterator(int64_t slice_size) const; + /// \return PrettyPrint representation suitable for debugging std::string ToString() const; diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 75ccf283aa5..771b209a406 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -64,7 +64,8 @@ template class Iterator : public util::EqualityComparable> { public: /// \brief Iterator may be constructed from any type which has a member function - /// with signature Status Next(T*); + /// with signature Result Next(); + /// End of iterator is signalled by returning IteratorTraits::End(); /// /// The argument is moved or copied to the heap and kept in a unique_ptr. Only /// its destructor and its Next method (which are stored in function pointers) are diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 4068a0b9141..e1f92096b37 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -30,10 +30,11 @@ from pyarrow.includes.libarrow cimport * from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, RecordBatchReader, ensure_type, maybe_unbox_memory_pool, get_input_stream, - native_transcoding_input_stream, + get_writer, native_transcoding_input_stream, pyarrow_wrap_schema, pyarrow_wrap_table, - pyarrow_wrap_data_type, pyarrow_unwrap_data_type) + pyarrow_wrap_data_type, pyarrow_unwrap_data_type, Table, RecordBatch) from pyarrow.lib import frombytes, tobytes +from pyarrow.util import _stringify_path cdef unsigned char _single_char(s) except 0: @@ -763,3 +764,52 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + +def write_csv(output_file, data, include_header=True, + MemoryPool memory_pool=None): + """ + + Parameters + ---------- + output_file: string, path, pyarrow.OutputStream or file-like object + The location of CSV data. + data: The data to write. + Either a pyarrow.RecordBatch or a pyarrow.Table + include_header: bool, optional + Include header based on schema field names when writing out + (defaults to true). + memory_pool: MemoryPool, optional + Pool to allocate Table memory from + + Returns + ------- + None + """ + cdef: + shared_ptr[COutputStream] stream + CCSVWriteOptions c_write_options + CMemoryPool* c_memory_pool + CRecordBatch* batch + CTable* table + try: + where = _stringify_path(output_file) + except TypeError: + get_writer(output_file, &stream) + else: + c_where = tobytes(where) + stream = GetResultValue(FileOutputStream.Open(c_where)) + + c_write_options.include_header = include_header + c_memory_pool = maybe_unbox_memory_pool(memory_pool) + if isinstance(data, RecordBatch): + batch = (data).batch + with nogil: + check_status(WriteCsv(deref(batch), c_write_options, c_memory_pool, + stream.get())) + elif isinstance(data, Table): + table = (data).table + with nogil: + check_status(WriteCsv(deref(table), c_write_options, c_memory_pool, + stream.get())) + else: + raise ValueError(type(data)) diff --git a/python/pyarrow/csv.py b/python/pyarrow/csv.py index b116ea11d83..31bf158ad67 100644 --- a/python/pyarrow/csv.py +++ b/python/pyarrow/csv.py @@ -18,4 +18,4 @@ from pyarrow._csv import ( # noqa ReadOptions, ParseOptions, ConvertOptions, ISO8601, - open_csv, read_csv, CSVStreamingReader) + open_csv, read_csv, CSVStreamingReader, write_csv) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ba3c3ad7d2b..ca045e17b6b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1618,6 +1618,10 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: @staticmethod CCSVReadOptions Defaults() + cdef cppclass CCSVWriteOptions" arrow::csv::WriteOptions": + c_bool include_header + int32_t batch_size + cdef cppclass CCSVReader" arrow::csv::TableReader": @staticmethod CResult[shared_ptr[CCSVReader]] Make( @@ -1633,6 +1637,9 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: CMemoryPool*, shared_ptr[CInputStream], CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions) + cdef CStatus WriteCsv(CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + cdef CStatus WriteCsv(CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + cdef extern from "arrow/json/options.h" nogil: diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 462fe011492..4c13ce3a048 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -36,7 +36,7 @@ import pyarrow as pa from pyarrow.csv import ( - open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601) + open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601, write_csv) def generate_col_names(): @@ -1257,3 +1257,21 @@ def test_read_csv_does_not_close_passed_file_handles(): buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6") read_csv(buf) assert not buf.closed + +def test_write_read_round_trip(): + t = pa.Table.from_arrays([[1,2,3], ["a", "b", "c"]], ["c1", "c2"]) + record_batch = t.to_batches(max_chunksize=4)[0] + for data in [t, record_batch]: + buf = io.BytesIO() + # test with header + write_csv(buf, data, include_header=True) + buf.seek(0) + assert t == read_csv(buf) + + # Test without header + buf = io.BytesIO() + write_csv(buf, data, include_header=False) + buf.seek(0) + + read_options = ReadOptions(column_names=t.column_names) + assert t == read_csv(buf, read_options=read_options) From ecc508b37f3208511c17a35374aa22418629babd Mon Sep 17 00:00:00 2001 From: emkornfield Date: Mon, 15 Feb 2021 23:38:06 -0800 Subject: [PATCH 02/18] Update cpp/src/arrow/csv/writer.cc --- cpp/src/arrow/csv/writer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index cb6f0e63313..2a47a1e91af 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -35,7 +35,7 @@ namespace csv { // // The algorithm used here at a high level is to break RecordBatches/Tables into slices // and convert each slice independently. A slice is then converted to CSV by first -// scanning each column to determine how the size its contents will take in the CSV. For +// scanning each column to determine the size of its contents when rendered as a string in CSV. For // non-string types this requires casting the value to string (which is cached). This // data is used to understand the precise length of each row and a single allocation for // the final CSV data buffer. Once the final size is known each column is then iterated From ac7b3f470653ce38010c1fb2952c3c2bdee1fcd0 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 16 Feb 2021 07:49:01 +0000 Subject: [PATCH 03/18] fix lint --- python/pyarrow/_csv.pyx | 15 ++++++++------- python/pyarrow/tests/test_csv.py | 27 ++++++++++++++------------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index e1f92096b37..863aeb2dcd5 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -765,6 +765,7 @@ def open_csv(input_file, read_options=None, parse_options=None, maybe_unbox_memory_pool(memory_pool)) return reader + def write_csv(output_file, data, include_header=True, MemoryPool memory_pool=None): """ @@ -792,12 +793,12 @@ def write_csv(output_file, data, include_header=True, CRecordBatch* batch CTable* table try: - where = _stringify_path(output_file) + where = _stringify_path(output_file) except TypeError: - get_writer(output_file, &stream) + get_writer(output_file, &stream) else: - c_where = tobytes(where) - stream = GetResultValue(FileOutputStream.Open(c_where)) + c_where = tobytes(where) + stream = GetResultValue(FileOutputStream.Open(c_where)) c_write_options.include_header = include_header c_memory_pool = maybe_unbox_memory_pool(memory_pool) @@ -805,11 +806,11 @@ def write_csv(output_file, data, include_header=True, batch = (data).batch with nogil: check_status(WriteCsv(deref(batch), c_write_options, c_memory_pool, - stream.get())) + stream.get())) elif isinstance(data, Table): table = (
data).table with nogil: check_status(WriteCsv(deref(table), c_write_options, c_memory_pool, - stream.get())) + stream.get())) else: - raise ValueError(type(data)) + raise ValueError(type(data)) diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 4c13ce3a048..ea5305bcc37 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -1258,20 +1258,21 @@ def test_read_csv_does_not_close_passed_file_handles(): read_csv(buf) assert not buf.closed + def test_write_read_round_trip(): t = pa.Table.from_arrays([[1,2,3], ["a", "b", "c"]], ["c1", "c2"]) record_batch = t.to_batches(max_chunksize=4)[0] for data in [t, record_batch]: - buf = io.BytesIO() - # test with header - write_csv(buf, data, include_header=True) - buf.seek(0) - assert t == read_csv(buf) - - # Test without header - buf = io.BytesIO() - write_csv(buf, data, include_header=False) - buf.seek(0) - - read_options = ReadOptions(column_names=t.column_names) - assert t == read_csv(buf, read_options=read_options) + # test with header + buf = io.BytesIO() + write_csv(buf, data, include_header=True) + buf.seek(0) + assert t == read_csv(buf) + + # Test without header + buf = io.BytesIO() + write_csv(buf, data, include_header=False) + buf.seek(0) + + read_options = ReadOptions(column_names=t.column_names) + assert t == read_csv(buf, read_options=read_options) From 8dcef6ab201a3d220a9e611aa6963b17bba45b11 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 16 Feb 2021 07:50:14 +0000 Subject: [PATCH 04/18] add missing test --- cpp/src/arrow/csv/writer_test.cc | 102 +++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 cpp/src/arrow/csv/writer_test.cc diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc new file mode 100644 index 00000000000..4d026208c16 --- /dev/null +++ b/cpp/src/arrow/csv/writer_test.cc @@ -0,0 +1,102 @@ +#include "gtest/gtest.h" + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/csv/writer.h" +#include "arrow/io/memory.h" +#include "arrow/record_batch.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" + +namespace arrow { +namespace csv { + +struct TestParams { + std::shared_ptr record_batch; + WriteOptions options; + std::string expected_output; +}; + +WriteOptions DefaultTestOptions(bool include_header) { + WriteOptions options; + options.batch_size = 5; + options.include_header = include_header; + return options; +} + +std::vector GenerateTestCases() { + auto abc_schema = schema({ + {field("a", uint64())}, + {field("b\"", utf8())}, + {field("c ", int32())}, + }); + auto empty_batch = + RecordBatch::Make(abc_schema, /*num_rows=*/0, + { + ArrayFromJSON(abc_schema->field(0)->type(), "[]"), + ArrayFromJSON(abc_schema->field(1)->type(), "[]"), + ArrayFromJSON(abc_schema->field(2)->type(), "[]"), + }); + auto populated_batch = RecordBatchFromJSON(abc_schema, R"([{"a": 1, "c ": -1}, + { "a": 1, "b\"": "abc\"efg", "c ": 2324}, + { "b\"": "abcd", "c ": 5467}, + { }, + { "a": 546, "b\"": "", "c ": 517 }, + { "a": 124, "b\"": "a\"\"b\"" }])"); + std::string expected_without_header = std::string("1,,-1") + "\n" + // line 1 + +R"(1,"abc""efg",2324)" + "\n" + // line 2 + R"(,"abcd",5467)" + "\n" + // line 3 + R"(,,)" + "\n" + // line 4 + R"(546,"",517)" + "\n" + // line 5 + R"(124,"a""""b""",)" + "\n"; // line 6 + std::string expected_header = std::string(R"("a","b""","c ")") + "\n"; + + return std::vector{ + {empty_batch, DefaultTestOptions(/*header=*/false), ""}, + {empty_batch, DefaultTestOptions(/*header=*/true), expected_header}, + {populated_batch, DefaultTestOptions(/*header=*/false), expected_without_header}, + {populated_batch, DefaultTestOptions(/*header=*/true), + expected_header + expected_without_header}}; +} + +class TestWriteCsv : public ::testing::TestWithParam {}; + +TEST_P(TestWriteCsv, TestWrite) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + io::BufferOutputStream::Create()); + WriteOptions options = GetParam().options; + + ASSERT_OK( + WriteCsv(*GetParam().record_batch, options, default_memory_pool(), out.get())); + ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, out->Finish()); + EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), + GetParam().expected_output); + ASSERT_OK(out->Reset()); + + // Batch size shouldn't matter. + options.batch_size /= 2; + ASSERT_OK( + WriteCsv(*GetParam().record_batch, options, default_memory_pool(), out.get())); + ASSERT_OK_AND_ASSIGN(buffer, out->Finish()); + EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), + GetParam().expected_output); + ASSERT_OK(out->Reset()); + + // Table and Record batch should work identically. + ASSERT_OK_AND_ASSIGN(std::shared_ptr
table, + Table::FromRecordBatches({GetParam().record_batch})); + ASSERT_OK(WriteCsv(*table, options, default_memory_pool(), out.get())); + ASSERT_OK_AND_ASSIGN(buffer, out->Finish()); + EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), + GetParam().expected_output); + ASSERT_OK(out->Reset()); +} + +INSTANTIATE_TEST_SUITE_P(WriteCsvTest, TestWriteCsv, + ::testing::ValuesIn(GenerateTestCases())); + +} // namespace csv +} // namespace arrow From efffe810a23eaeddfc3d1eb2cfb59600f494a322 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 16 Feb 2021 08:03:08 +0000 Subject: [PATCH 05/18] lint and appveyor fixes --- cpp/src/arrow/csv/writer.cc | 27 ++++++++++++++------------- cpp/src/arrow/csv/writer_test.cc | 17 +++++++++++++++++ python/pyarrow/_csv.pyx | 11 ++++++----- python/pyarrow/tests/test_csv.py | 2 +- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index 2a47a1e91af..815c34a1bb9 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -35,17 +35,17 @@ namespace csv { // // The algorithm used here at a high level is to break RecordBatches/Tables into slices // and convert each slice independently. A slice is then converted to CSV by first -// scanning each column to determine the size of its contents when rendered as a string in CSV. For -// non-string types this requires casting the value to string (which is cached). This -// data is used to understand the precise length of each row and a single allocation for -// the final CSV data buffer. Once the final size is known each column is then iterated -// over again to place its contents into the CSV data buffer. The rationale for choosing -// this approach is it allows for reuse of the cast functionality in the compute module // -// and inline data visiting functionality in the core library. A performance comparison -// has not been done using a naive single-pass approach. This approach might still be -// competitive due to reduction in the number of per row branches necessary with a single -// pass approach. Profiling would likely yield further opportunities for optimization with -// this approach. +// scanning each column to determine the size of its contents when rendered as a string in +// CSV. For non-string types this requires casting the value to string (which is cached). +// This data is used to understand the precise length of each row and a single allocation +// for the final CSV data buffer. Once the final size is known each column is then +// iterated over again to place its contents into the CSV data buffer. The rationale for +// choosing this approach is it allows for reuse of the cast functionality in the compute +// module // and inline data visiting functionality in the core library. A performance +// comparison has not been done using a naive single-pass approach. This approach might +// still be competitive due to reduction in the number of per row branches necessary with +// a single pass approach. Profiling would likely yield further opportunities for +// optimization with this approach. namespace { @@ -153,7 +153,8 @@ class QuotedColumnPopulator : public ColumnPopulator { *input.data(), [&](arrow::util::string_view s) { int64_t escaped_count = CountEscapes(s); - *extra_chars = escaped_count + kQuoteCount; + // TODO: Maybe use 64 bit row lengths or safe cast? + *extra_chars = static_cast(escaped_count) + kQuoteCount; extra_chars++; }, [&]() { @@ -348,7 +349,7 @@ class CsvConverter { offsets[row] += offsets[row - 1] + /*delimiter lengths*/ batch.num_columns(); } // Resize the target buffer to required size. We assume batch to batch sizes - // should be pretty close so don't shrink the buffer to avoid allocation churn. + // should be pretty close so don't shrink the buffer to avoid allocation churn. RETURN_NOT_OK(data_buffer_->Resize(offsets.back(), /*shrink_to_fit=*/false)); // Calculate pointers to the start of each row. diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc index 4d026208c16..335468db4f3 100644 --- a/cpp/src/arrow/csv/writer_test.cc +++ b/cpp/src/arrow/csv/writer_test.cc @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #include "gtest/gtest.h" #include diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 863aeb2dcd5..01c4df9771e 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -32,7 +32,8 @@ from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, maybe_unbox_memory_pool, get_input_stream, get_writer, native_transcoding_input_stream, pyarrow_wrap_schema, pyarrow_wrap_table, - pyarrow_wrap_data_type, pyarrow_unwrap_data_type, Table, RecordBatch) + pyarrow_wrap_data_type, pyarrow_unwrap_data_type, + Table, RecordBatch) from pyarrow.lib import frombytes, tobytes from pyarrow.util import _stringify_path @@ -805,12 +806,12 @@ def write_csv(output_file, data, include_header=True, if isinstance(data, RecordBatch): batch = (data).batch with nogil: - check_status(WriteCsv(deref(batch), c_write_options, c_memory_pool, - stream.get())) + check_status(WriteCsv(deref(batch), c_write_options, c_memory_pool, + stream.get())) elif isinstance(data, Table): table = (
data).table with nogil: - check_status(WriteCsv(deref(table), c_write_options, c_memory_pool, - stream.get())) + check_status(WriteCsv(deref(table), c_write_options, c_memory_pool, + stream.get())) else: raise ValueError(type(data)) diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index ea5305bcc37..7121de84443 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -1260,7 +1260,7 @@ def test_read_csv_does_not_close_passed_file_handles(): def test_write_read_round_trip(): - t = pa.Table.from_arrays([[1,2,3], ["a", "b", "c"]], ["c1", "c2"]) + t = pa.Table.from_arrays([[1, 2, 3], ["a", "b", "c"]], ["c1", "c2"]) record_batch = t.to_batches(max_chunksize=4)[0] for data in [t, record_batch]: # test with header From 77943072e34ef62cf0a1f048706e171522d92757 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sat, 20 Feb 2021 07:18:49 +0000 Subject: [PATCH 06/18] start addressing feedback --- cpp/src/arrow/csv/writer.cc | 235 ++++++++++++++++++------------- cpp/src/arrow/csv/writer.h | 4 +- cpp/src/arrow/csv/writer_test.cc | 12 +- cpp/src/arrow/record_batch.cc | 25 ---- cpp/src/arrow/record_batch.h | 5 - python/pyarrow/_csv.pyx | 37 ++++- python/pyarrow/csv.py | 3 +- 7 files changed, 176 insertions(+), 145 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index 815c34a1bb9..33a0a911a18 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -23,6 +23,8 @@ #include "arrow/result.h" #include "arrow/result_internal.h" #include "arrow/stl_allocator.h" +#include "arrow/util/iterator.h" +#include "arrow/util/logging.h" #include "arrow/util/make_unique.h" #include "arrow/visitor_inline.h" @@ -41,7 +43,7 @@ namespace csv { // for the final CSV data buffer. Once the final size is known each column is then // iterated over again to place its contents into the CSV data buffer. The rationale for // choosing this approach is it allows for reuse of the cast functionality in the compute -// module // and inline data visiting functionality in the core library. A performance +// module and inline data visiting functionality in the core library. A performance // comparison has not been done using a naive single-pass approach. This approach might // still be competitive due to reduction in the number of per row branches necessary with // a single pass approach. Profiling would likely yield further opportunities for @@ -49,6 +51,26 @@ namespace csv { namespace { +struct SliceIteratorFunctor { + Result> Next() { + if (current_offset < batch->num_rows()) { + std::shared_ptr next = batch->Slice(current_offset, slice_size); + current_offset += slice_size; + return next; + } + return IterationTraits>::End(); + } + const RecordBatch* const batch; + const int64_t slice_size; + int64_t current_offset; +}; + +RecordBatchIterator RecordBatchSliceIterator(const RecordBatch& batch, + int64_t slice_size) { + SliceIteratorFunctor functor = {&batch, slice_size, /*offset=*/static_cast(0)}; + return RecordBatchIterator(std::move(functor)); +} + // Counts the number of characters that need escaping in s. int64_t CountEscapes(util::string_view s) { return static_cast(std::count(s.begin(), s.end(), '"')); @@ -56,14 +78,18 @@ int64_t CountEscapes(util::string_view s) { // Matching quote pair character length. constexpr int64_t kQuoteCount = 2; +constexpr int64_t kQuoteDelimiterCount = kQuoteCount + /*end_char*/ 1; // Interface for generating CSV data per column. // The intended usage is to iteratively call UpdateRowLengths for a column and -// then PopulateColumns. +// then PopulateColumns. PopulateColumns must be called in the reverse order of the +// populators (it populates data backwards). class ColumnPopulator { public: ColumnPopulator(MemoryPool* pool, char end_char) : end_char_(end_char), pool_(pool) {} + virtual ~ColumnPopulator() = default; + // Adds the number of characters each entry in data will add to to elements // in row_lengths. Status UpdateRowLengths(const Array& data, int32_t* row_lengths) { @@ -79,8 +105,12 @@ class ColumnPopulator { } // Places string data onto each row in output and updates the corresponding row - // row pointers in preparation for calls to other ColumnPopulators. - virtual void PopulateColumns(char** output) const = 0; + // row pointers in preparation for calls to other (preceding) ColumnPopulators. + // Args: + // output: character buffer to write to. + // offsets: an array of end of row column within the the output buffer (values are + // one past the end of the position to write to). + virtual void PopulateColumns(char* output, int32_t* offsets) const = 0; protected: virtual Status UpdateRowLengths(int32_t* row_lengths) = 0; @@ -91,16 +121,17 @@ class ColumnPopulator { MemoryPool* const pool_; }; -// Copies the contents of to out properly escaping any necessary charaters. -char* Escape(arrow::util::string_view s, char* out) { - for (const char* val = s.data(); val < s.data() + s.length(); val++, out++) { +// Copies the contents of to out properly escaping any necessary characters. +// Returns the position prior to last copied character (out_end is decremented). +char* EscapeReverse(arrow::util::string_view s, char* out_end) { + for (const char* val = s.data() + s.length() - 1; val >= s.data(); val--, out_end--) { if (*val == '"') { - *out = *val; - out++; + *out_end = *val; + out_end--; } - *out = *val; + *out_end = *val; } - return out; + return out_end; } // Populator for non-string types. This populator relies on compute Cast functionality to @@ -110,6 +141,7 @@ class UnquotedColumnPopulator : public ColumnPopulator { public: explicit UnquotedColumnPopulator(MemoryPool* memory_pool, char end_char) : ColumnPopulator(memory_pool, end_char) {} + Status UpdateRowLengths(int32_t* row_lengths) override { for (int x = 0; x < casted_array_->length(); x++) { row_lengths[x] += casted_array_->value_length(x); @@ -117,21 +149,21 @@ class UnquotedColumnPopulator : public ColumnPopulator { return Status::OK(); } - void PopulateColumns(char** rows) const override { + void PopulateColumns(char* output, int32_t* offsets) const override { VisitArrayDataInline( *casted_array_->data(), [&](arrow::util::string_view s) { int64_t next_column_offset = s.length() + /*end_char*/ 1; - memcpy(*rows, s.data(), s.length()); - *(*rows + s.length()) = end_char_; - *rows += next_column_offset; - rows++; + memcpy((output + *offsets - next_column_offset), s.data(), s.length()); + *(output + *offsets - 1) = end_char_; + *offsets -= next_column_offset; + offsets++; }, [&]() { // Nulls are empty (unquoted) to distinguish with empty string. - **rows = end_char_; - *rows += 1; - rows++; + *(output + *offsets - 1) = end_char_; + *offsets -= 1; + offsets++; }); } }; @@ -143,60 +175,64 @@ class UnquotedColumnPopulator : public ColumnPopulator { class QuotedColumnPopulator : public ColumnPopulator { public: QuotedColumnPopulator(MemoryPool* pool, char end_char) - : ColumnPopulator(pool, end_char) {} + : ColumnPopulator(pool, end_char), + row_needs_escaping_(::arrow::stl::allocator(pool)) {} Status UpdateRowLengths(int32_t* row_lengths) override { const StringArray& input = *casted_array_; - extra_chars_count_.resize(input.length()); - auto extra_chars = extra_chars_count_.begin(); + int row_number = 0; + row_needs_escaping_.resize(casted_array_->length()); VisitArrayDataInline( *input.data(), [&](arrow::util::string_view s) { int64_t escaped_count = CountEscapes(s); // TODO: Maybe use 64 bit row lengths or safe cast? - *extra_chars = static_cast(escaped_count) + kQuoteCount; - extra_chars++; + row_needs_escaping_[row_number] = escaped_count > 0; + row_lengths[row_number] += s.length() + escaped_count + kQuoteCount; + row_number++; }, [&]() { - *extra_chars = 0; - extra_chars++; + row_needs_escaping_[row_number] = false; + row_number++; }); - - for (int x = 0; x < input.length(); x++) { - row_lengths[x] += extra_chars_count_[x] + input.value_length(x); - } return Status::OK(); } - void PopulateColumns(char** rows) const override { - const int32_t* extra_chars = extra_chars_count_.data(); + void PopulateColumns(char* output, int32_t* offsets) const override { + auto needs_escaping = row_needs_escaping_.begin(); VisitArrayDataInline( *(casted_array_->data()), [&](arrow::util::string_view s) { - int64_t next_column_offset = *extra_chars + s.length() + /*end_char*/ 1; - **rows = '"'; - if (*extra_chars == kQuoteCount) { - memcpy((*rows + 1), s.data(), s.length()); + // still needs string content length to be added + char* row_end = output + *offsets; + int32_t next_column_offset = 0; + if (!*needs_escaping) { + next_column_offset = s.length() + kQuoteDelimiterCount; + memcpy(row_end - next_column_offset + /*quote_offset=*/1, s.data(), + s.length()); } else { - Escape(s, (*rows + 1)); + // Adjust row_end by 3: 1 quote char, 1 end char and 1 to position at the + // first position to write to. + next_column_offset = row_end - EscapeReverse(s, row_end - 3); } - *(*rows + next_column_offset - 2) = '"'; - *(*rows + next_column_offset - 1) = end_char_; - *rows += next_column_offset; - extra_chars++; - rows++; + *(row_end - next_column_offset) = '"'; + *(row_end - 2) = '"'; + *(row_end - 1) = end_char_; + *offsets -= next_column_offset; + offsets++; + needs_escaping++; }, [&]() { // Nulls are empty (unquoted) to distinguish with empty string. - **rows = end_char_; - *rows += 1; - rows++; - extra_chars++; + *(output + *offsets - 1) = end_char_; + *offsets -= 1; + offsets++; + needs_escaping++; }); } private: - std::vector> extra_chars_count_; + std::vector> row_needs_escaping_; }; struct PopulatorFactory { @@ -218,7 +254,7 @@ struct PopulatorFactory { enable_if_t::value || is_extension_type::value, Status> Visit(const TypeClass& type) { - return Status::Invalid("Nested and extension types not supported"); + return Status::Invalid("Unsupported Type:", type.ToString()); } template @@ -242,9 +278,9 @@ Result> MakePopulator(const Field& field, char return std::unique_ptr(factory.populator); } -class CsvConverter { +class CSVConverter { public: - static Result> Make(std::shared_ptr schema, + static Result> Make(std::shared_ptr schema, MemoryPool* pool) { std::vector> populators(schema->num_fields()); for (int col = 0; col < schema->num_fields(); col++) { @@ -252,23 +288,23 @@ class CsvConverter { ASSIGN_OR_RAISE(populators[col], MakePopulator(*schema->field(col), end_char, pool)); } - return std::unique_ptr( - new CsvConverter(std::move(schema), std::move(populators), pool)); + return std::unique_ptr( + new CSVConverter(std::move(schema), std::move(populators), pool)); } - static constexpr int64_t kColumnSizeGuess = 8; - Status WriteCsv(const RecordBatch& batch, const WriteOptions& options, + + Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, io::OutputStream* out) { RETURN_NOT_OK(PrepareForContentsWrite(options, out)); - RecordBatchIterator iterator = batch.SliceIterator(options.batch_size); + RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size); for (auto maybe_slice : iterator) { ASSIGN_OR_RAISE(std::shared_ptr slice, maybe_slice); - RETURN_NOT_OK(TranslateMininalBatch(*slice)); + RETURN_NOT_OK(TranslateMinimalBatch(*slice)); RETURN_NOT_OK(out->Write(data_buffer_)); } return Status::OK(); } - Status WriteCsv(const Table& table, const WriteOptions& options, + Status WriteCSV(const Table& table, const WriteOptions& options, io::OutputStream* out) { TableBatchReader reader(table); reader.set_chunksize(options.batch_size); @@ -276,7 +312,7 @@ class CsvConverter { std::shared_ptr batch; RETURN_NOT_OK(reader.ReadNext(&batch)); while (batch != nullptr) { - RETURN_NOT_OK(TranslateMininalBatch(*batch)); + RETURN_NOT_OK(TranslateMinimalBatch(*batch)); RETURN_NOT_OK(out->Write(data_buffer_)); RETURN_NOT_OK(reader.ReadNext(&batch)); } @@ -285,15 +321,13 @@ class CsvConverter { } private: - CsvConverter(std::shared_ptr schema, + CSVConverter(std::shared_ptr schema, std::vector> populators, MemoryPool* pool) - : schema_(std::move(schema)), - column_populators_(std::move(populators)), - row_positions_(1024, nullptr, arrow::stl::allocator(pool)), + : column_populators_(std::move(populators)), + offsets_(0, 0, ::arrow::stl::allocator(pool)), + schema_(std::move(schema)), pool_(pool) {} - const std::shared_ptr schema_; - Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) { if (data_buffer_ == nullptr) { ASSIGN_OR_RAISE( @@ -314,84 +348,85 @@ class CsvConverter { header_length += col_name.size(); header_length += CountEscapes(col_name); } - return header_length + (3 * schema_->num_fields()); + return header_length + (kQuoteDelimiterCount * schema_->num_fields()); } Status WriteHeader(io::OutputStream* out) { RETURN_NOT_OK(data_buffer_->Resize(CalculateHeaderSize(), /*shrink_to_fit=*/false)); - char* next = reinterpret_cast(data_buffer_->mutable_data()); - for (int col = 0; col < schema_->num_fields(); col++) { - *next++ = '"'; - next = Escape(schema_->field(col)->name(), next); - *next++ = '"'; - *next++ = ','; + char* next = + reinterpret_cast(data_buffer_->mutable_data() + data_buffer_->size() - 1); + for (int col = schema_->num_fields() - 1; col >= 0; col--) { + *next-- = ','; + *next-- = '"'; + next = EscapeReverse(schema_->field(col)->name(), next); + *next-- = '"'; } - next--; - *next = '\n'; + *(data_buffer_->mutable_data() + data_buffer_->size() - 1) = '\n'; + DCHECK_EQ(reinterpret_cast(next + 1), data_buffer_->data()); return out->Write(data_buffer_); } - Status TranslateMininalBatch(const RecordBatch& batch) { + Status TranslateMinimalBatch(const RecordBatch& batch) { if (batch.num_rows() == 0) { return Status::OK(); } - std::vector> offsets( - batch.num_rows(), 0, arrow::stl::allocator(pool_)); + offsets_.resize(batch.num_rows()); + std::fill(offsets_.begin(), offsets_.end(), 0); // Calculate relative offsets for each row (excluding delimiters) for (size_t col = 0; col < column_populators_.size(); col++) { RETURN_NOT_OK( - column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets.data())); + column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets_.data())); } // Calculate cumulalative offsets for each row (including delimiters). - offsets[0] += batch.num_columns(); + offsets_[0] += batch.num_columns(); for (int64_t row = 1; row < batch.num_rows(); row++) { - offsets[row] += offsets[row - 1] + /*delimiter lengths*/ batch.num_columns(); + offsets_[row] += offsets_[row - 1] + /*delimiter lengths*/ batch.num_columns(); } // Resize the target buffer to required size. We assume batch to batch sizes // should be pretty close so don't shrink the buffer to avoid allocation churn. - RETURN_NOT_OK(data_buffer_->Resize(offsets.back(), /*shrink_to_fit=*/false)); - - // Calculate pointers to the start of each row. - row_positions_.resize(batch.num_rows()); - row_positions_[0] = reinterpret_cast(data_buffer_->mutable_data()); - for (size_t row = 1; row < row_positions_.size(); row++) { - row_positions_[row] = - reinterpret_cast(data_buffer_->mutable_data()) + offsets[row - 1]; - } - // Use the pointers to populate all of the data. - for (const auto& populator : column_populators_) { - populator->PopulateColumns(row_positions_.data()); + RETURN_NOT_OK(data_buffer_->Resize(offsets_.back(), /*shrink_to_fit=*/false)); + + // Use the offsets to populate contents. + for (auto populator = column_populators_.rbegin(); + populator != column_populators_.rend(); populator++) { + (*populator) + ->PopulateColumns(reinterpret_cast(data_buffer_->mutable_data()), + offsets_.data()); } + DCHECK_EQ(0, offsets_[0]); return Status::OK(); } + + static constexpr int64_t kColumnSizeGuess = 8; std::vector> column_populators_; - std::vector> row_positions_; + std::vector> offsets_; std::shared_ptr data_buffer_; + const std::shared_ptr schema_; MemoryPool* pool_; }; } // namespace -Status WriteCsv(const Table& table, const WriteOptions& options, MemoryPool* pool, +Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool, arrow::io::OutputStream* output) { if (pool == nullptr) { pool = default_memory_pool(); } - ASSIGN_OR_RAISE(std::unique_ptr converter, - CsvConverter::Make(table.schema(), pool)); - return converter->WriteCsv(table, options, output); + ASSIGN_OR_RAISE(std::unique_ptr converter, + CSVConverter::Make(table.schema(), pool)); + return converter->WriteCSV(table, options, output); } -Status WriteCsv(const RecordBatch& batch, const WriteOptions& options, MemoryPool* pool, +Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, MemoryPool* pool, arrow::io::OutputStream* output) { if (pool == nullptr) { pool = default_memory_pool(); } - ASSIGN_OR_RAISE(std::unique_ptr converter, - CsvConverter::Make(batch.schema(), pool)); - return converter->WriteCsv(batch, options, output); + ASSIGN_OR_RAISE(std::unique_ptr converter, + CSVConverter::Make(batch.schema(), pool)); + return converter->WriteCSV(batch, options, output); } } // namespace csv diff --git a/cpp/src/arrow/csv/writer.h b/cpp/src/arrow/csv/writer.h index 90d0fa06d3b..c009d7849f4 100644 --- a/cpp/src/arrow/csv/writer.h +++ b/cpp/src/arrow/csv/writer.h @@ -36,11 +36,11 @@ namespace csv { /// \brief Converts table to a CSV and writes the results to output. /// Experimental -ARROW_EXPORT Status WriteCsv(const Table& table, const WriteOptions& options, +ARROW_EXPORT Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool, arrow::io::OutputStream* output); /// \brief Converts batch to CSV and writes the results to output. /// Experimental -ARROW_EXPORT Status WriteCsv(const RecordBatch& batch, const WriteOptions& options, +ARROW_EXPORT Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, MemoryPool* pool, arrow::io::OutputStream* output); } // namespace csv diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc index 335468db4f3..d68ae4ba580 100644 --- a/cpp/src/arrow/csv/writer_test.cc +++ b/cpp/src/arrow/csv/writer_test.cc @@ -79,15 +79,15 @@ std::vector GenerateTestCases() { expected_header + expected_without_header}}; } -class TestWriteCsv : public ::testing::TestWithParam {}; +class TestWriteCSV : public ::testing::TestWithParam {}; -TEST_P(TestWriteCsv, TestWrite) { +TEST_P(TestWriteCSV, TestWrite) { ASSERT_OK_AND_ASSIGN(std::shared_ptr out, io::BufferOutputStream::Create()); WriteOptions options = GetParam().options; ASSERT_OK( - WriteCsv(*GetParam().record_batch, options, default_memory_pool(), out.get())); + WriteCSV(*GetParam().record_batch, options, default_memory_pool(), out.get())); ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, out->Finish()); EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), GetParam().expected_output); @@ -96,7 +96,7 @@ TEST_P(TestWriteCsv, TestWrite) { // Batch size shouldn't matter. options.batch_size /= 2; ASSERT_OK( - WriteCsv(*GetParam().record_batch, options, default_memory_pool(), out.get())); + WriteCSV(*GetParam().record_batch, options, default_memory_pool(), out.get())); ASSERT_OK_AND_ASSIGN(buffer, out->Finish()); EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), GetParam().expected_output); @@ -105,14 +105,14 @@ TEST_P(TestWriteCsv, TestWrite) { // Table and Record batch should work identically. ASSERT_OK_AND_ASSIGN(std::shared_ptr
table, Table::FromRecordBatches({GetParam().record_batch})); - ASSERT_OK(WriteCsv(*table, options, default_memory_pool(), out.get())); + ASSERT_OK(WriteCSV(*table, options, default_memory_pool(), out.get())); ASSERT_OK_AND_ASSIGN(buffer, out->Finish()); EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), GetParam().expected_output); ASSERT_OK(out->Reset()); } -INSTANTIATE_TEST_SUITE_P(WriteCsvTest, TestWriteCsv, +INSTANTIATE_TEST_SUITE_P(WriteCSVTest, TestWriteCSV, ::testing::ValuesIn(GenerateTestCases())); } // namespace csv diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 62bf2186b8a..cb8b77e2be8 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -37,26 +37,6 @@ namespace arrow { -namespace { -// If there will only be one slice returned it is cheaper to just return the original -// RecordBatch (no overhead from vector and std::shared_ptr copying on underlying arrays). - -struct SliceIteratorFunctor { - Result> Next() { - if (current_offset < batch->num_rows()) { - std::shared_ptr next = batch->Slice(current_offset, slice_size); - current_offset += slice_size; - return next; - } - return IterationTraits>::End(); - } - const RecordBatch* const batch; - const int64_t slice_size; - int64_t current_offset; -}; - -} // namespace - Result> RecordBatch::AddColumn( int i, std::string field_name, const std::shared_ptr& column) const { auto field = ::arrow::field(std::move(field_name), column->type()); @@ -216,11 +196,6 @@ Result> RecordBatch::FromStructArray( array->data()->child_data); } -RecordBatchIterator RecordBatch::SliceIterator(int64_t slice_size) const { - SliceIteratorFunctor functor = {this, slice_size, /*offset=*/static_cast(0)}; - return RecordBatchIterator(std::move(functor)); -} - Result> RecordBatch::ToStructArray() const { if (num_columns() != 0) { return StructArray::Make(columns(), schema()->fields()); diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index f779b06386b..95229eb78d5 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -25,7 +25,6 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" -#include "arrow/util/iterator.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -164,10 +163,6 @@ class ARROW_EXPORT RecordBatch { /// \return new record batch virtual std::shared_ptr Slice(int64_t offset, int64_t length) const = 0; - // Returns an iterator for maximum slice size over this record batch. The Iterator - // Becomes invalid when this object goes out of scope. - RecordBatchIterator SliceIterator(int64_t slice_size) const; - /// \return PrettyPrint representation suitable for debugging std::string ToString() const; diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 01c4df9771e..dd4bbd3b55c 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -766,6 +766,28 @@ def open_csv(input_file, read_options=None, parse_options=None, maybe_unbox_memory_pool(memory_pool)) return reader +cdef class WriterOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to include the header + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(include_header=True, batch_size=1024): + options.include_header = include_header + options.batch_size = 1024 + def write_csv(output_file, data, include_header=True, MemoryPool memory_pool=None): @@ -777,11 +799,9 @@ def write_csv(output_file, data, include_header=True, The location of CSV data. data: The data to write. Either a pyarrow.RecordBatch or a pyarrow.Table - include_header: bool, optional - Include header based on schema field names when writing out - (defaults to true). + write_options: pyarrow.csv.WriteOptions memory_pool: MemoryPool, optional - Pool to allocate Table memory from + Pool for temporary allocations. Returns ------- @@ -793,6 +813,11 @@ def write_csv(output_file, data, include_header=True, CMemoryPool* c_memory_pool CRecordBatch* batch CTable* table + if write_options is None: + c_write_options = CCSVWriteOptions.Defaults() + else: + c_write_options = write_options.options + try: where = _stringify_path(output_file) except TypeError: @@ -806,12 +831,12 @@ def write_csv(output_file, data, include_header=True, if isinstance(data, RecordBatch): batch = (data).batch with nogil: - check_status(WriteCsv(deref(batch), c_write_options, c_memory_pool, + check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool, stream.get())) elif isinstance(data, Table): table = (
data).table with nogil: - check_status(WriteCsv(deref(table), c_write_options, c_memory_pool, + check_status(WriteCSV(deref(table), c_write_options, c_memory_pool, stream.get())) else: raise ValueError(type(data)) diff --git a/python/pyarrow/csv.py b/python/pyarrow/csv.py index 31bf158ad67..dafe1073209 100644 --- a/python/pyarrow/csv.py +++ b/python/pyarrow/csv.py @@ -18,4 +18,5 @@ from pyarrow._csv import ( # noqa ReadOptions, ParseOptions, ConvertOptions, ISO8601, - open_csv, read_csv, CSVStreamingReader, write_csv) + open_csv, read_csv, CSVStreamingReader, write_csv, + WriteOptions) From 4deb8a8b32cc5427eb08244c8b8271e26d848f02 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sat, 20 Feb 2021 22:14:57 +0000 Subject: [PATCH 07/18] add cmake configuration --- cpp/cmake_modules/SetupCxxFlags.cmake | 5 +++++ cpp/src/arrow/CMakeLists.txt | 6 ++++-- cpp/src/arrow/csv/CMakeLists.txt | 22 ++++++++++++++-------- cpp/src/arrow/csv/api.h | 4 ++++ 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake index b534552c3c0..b7cb4cd07ec 100644 --- a/cpp/cmake_modules/SetupCxxFlags.cmake +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -629,3 +629,8 @@ if(MSVC) set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${MSVC_LINKER_FLAGS}") endif() endif() + +#-------------------------------------------------------------------------------------- +if(ARROW_COMPUTE) + add_definitions(-DARROW_HAVE_COMPUTE_MODULE) +endif() diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index e00c8448bbe..abd5428b3d7 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -348,8 +348,10 @@ if(ARROW_CSV) csv/column_decoder.cc csv/options.cc csv/parser.cc - csv/reader.cc - csv/writer.cc) + csv/reader.cc) + if(ARROW_COMPUTE) + list(APPEND ARROW_SRCS csv/writer.cc) + endif() list(APPEND ARROW_TESTING_SRCS csv/test_common.cc) endif() diff --git a/cpp/src/arrow/csv/CMakeLists.txt b/cpp/src/arrow/csv/CMakeLists.txt index 2766cfd3bd2..561faf1b584 100644 --- a/cpp/src/arrow/csv/CMakeLists.txt +++ b/cpp/src/arrow/csv/CMakeLists.txt @@ -15,14 +15,20 @@ # specific language governing permissions and limitations # under the License. -add_arrow_test(csv-test - SOURCES - chunker_test.cc - column_builder_test.cc - column_decoder_test.cc - converter_test.cc - parser_test.cc - reader_test.cc) +set(CSV_TEST_SRCS + chunker_test.cc + column_builder_test.cc + column_decoder_test.cc + converter_test.cc + parser_test.cc + reader_test.cc) + +# Writer depends on compute's cast functionality +if(ARROW_COMPUTE) + list(APPEND CSV_TEST_SRCS writer_test.cc) +endif() + +add_arrow_test(csv-test SOURCES ${CSV_TEST_SRCS}) add_arrow_benchmark(converter_benchmark PREFIX "arrow-csv") add_arrow_benchmark(parser_benchmark PREFIX "arrow-csv") diff --git a/cpp/src/arrow/csv/api.h b/cpp/src/arrow/csv/api.h index 4af1835cd70..620ee9ff180 100644 --- a/cpp/src/arrow/csv/api.h +++ b/cpp/src/arrow/csv/api.h @@ -19,4 +19,8 @@ #include "arrow/csv/options.h" #include "arrow/csv/reader.h" + +// The writer depends on compute module for casting. +#ifdef ARROW_HAVE_COMPUTE_MODULE #include "arrow/csv/writer.h" +#endif From 8b40ed7fca69d45953571b7fd7ff6bfa408801a0 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 21 Feb 2021 01:17:58 +0000 Subject: [PATCH 08/18] update python --- cpp/src/arrow/csv/options.cc | 1 + cpp/src/arrow/csv/options.h | 3 +++ python/pyarrow/_csv.pyx | 31 +++++++++++++++++----------- python/pyarrow/includes/libarrow.pxd | 12 +++++++++-- python/pyarrow/tests/test_csv.py | 7 ++++--- 5 files changed, 37 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc index b6f1346bcd3..a515abf2cf4 100644 --- a/cpp/src/arrow/csv/options.cc +++ b/cpp/src/arrow/csv/options.cc @@ -34,6 +34,7 @@ ConvertOptions ConvertOptions::Defaults() { } ReadOptions ReadOptions::Defaults() { return ReadOptions(); } +WriteOptions WriteOptions::Defaults() { return WriteOptions(); } } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 2786a673654..35ca5fdb0d2 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -143,6 +143,9 @@ struct WriteOptions { // The writer processes batches of rows together. This is the // maximum number of rows processed at a time. int32_t batch_size = 1024; + + // Create write options with default values. + static WriteOptions Defaults(); }; } // namespace csv diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index dd4bbd3b55c..933285e4c33 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -766,7 +766,7 @@ def open_csv(input_file, read_options=None, parse_options=None, maybe_unbox_memory_pool(memory_pool)) return reader -cdef class WriterOptions(_Weakrefable): +cdef class WriteOptions(_Weakrefable): """ Options for writing CSV files. @@ -784,22 +784,33 @@ cdef class WriterOptions(_Weakrefable): # Avoid mistakingly creating attributes __slots__ = () - def __init__(include_header=True, batch_size=1024): - options.include_header = include_header - options.batch_size = 1024 + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.options.include_header = include_header + if batch_size is not None: + self.options.batch_size = 1024 -def write_csv(output_file, data, include_header=True, +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, MemoryPool memory_pool=None): """ Parameters ---------- - output_file: string, path, pyarrow.OutputStream or file-like object - The location of CSV data. data: The data to write. Either a pyarrow.RecordBatch or a pyarrow.Table + output_file: string, path, pyarrow.OutputStream or file-like object + The location of CSV data. write_options: pyarrow.csv.WriteOptions + Options to configure writing the CSV file. memory_pool: MemoryPool, optional Pool for temporary allocations. @@ -813,10 +824,7 @@ def write_csv(output_file, data, include_header=True, CMemoryPool* c_memory_pool CRecordBatch* batch CTable* table - if write_options is None: - c_write_options = CCSVWriteOptions.Defaults() - else: - c_write_options = write_options.options + _get_write_options(write_options, &c_write_options) try: where = _stringify_path(output_file) @@ -826,7 +834,6 @@ def write_csv(output_file, data, include_header=True, c_where = tobytes(where) stream = GetResultValue(FileOutputStream.Open(c_where)) - c_write_options.include_header = include_header c_memory_pool = maybe_unbox_memory_pool(memory_pool) if isinstance(data, RecordBatch): batch = (data).batch diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ca045e17b6b..a3aa79291db 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1622,6 +1622,10 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: c_bool include_header int32_t batch_size + @staticmethod + CCSVWriteOptions Defaults() + + cdef cppclass CCSVReader" arrow::csv::TableReader": @staticmethod CResult[shared_ptr[CCSVReader]] Make( @@ -1637,8 +1641,12 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: CMemoryPool*, shared_ptr[CInputStream], CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions) - cdef CStatus WriteCsv(CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) - cdef CStatus WriteCsv(CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + +# Writer is included explicity to avoid having to set additional +# C-Processor definitions in setup.py for cmake. +cdef extern from "arrow/csv/writer.h" namespace "arrow::csv" nogil: + cdef CStatus WriteCSV(CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + cdef CStatus WriteCSV(CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) cdef extern from "arrow/json/options.h" nogil: diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 7121de84443..89f5322edd1 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -36,7 +36,8 @@ import pyarrow as pa from pyarrow.csv import ( - open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601, write_csv) + open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601, + write_csv, WriteOptions) def generate_col_names(): @@ -1265,13 +1266,13 @@ def test_write_read_round_trip(): for data in [t, record_batch]: # test with header buf = io.BytesIO() - write_csv(buf, data, include_header=True) + write_csv(data, buf, WriteOptions(include_header=True)) buf.seek(0) assert t == read_csv(buf) # Test without header buf = io.BytesIO() - write_csv(buf, data, include_header=False) + write_csv(data, buf, WriteOptions(include_header=False)) buf.seek(0) read_options = ReadOptions(column_names=t.column_names) From 61856a4b0135208e3fbc8b036bc51b27f3611581 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 22 Feb 2021 01:55:03 +0000 Subject: [PATCH 09/18] refactor c++ tests --- cpp/src/arrow/csv/writer_test.cc | 50 +++++++++++++++++++------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc index d68ae4ba580..dc59fefa8fe 100644 --- a/cpp/src/arrow/csv/writer_test.cc +++ b/cpp/src/arrow/csv/writer_test.cc @@ -24,6 +24,7 @@ #include "arrow/csv/writer.h" #include "arrow/io/memory.h" #include "arrow/record_batch.h" +#include "arrow/result_internal.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" #include "arrow/type_fwd.h" @@ -79,41 +80,50 @@ std::vector GenerateTestCases() { expected_header + expected_without_header}}; } -class TestWriteCSV : public ::testing::TestWithParam {}; +class TestWriteCSV : public ::testing::TestWithParam { + protected: + template + Result ToCsvString(const Data& data, const WriteOptions& options) { + std::shared_ptr out; + ASSIGN_OR_RAISE(out, io::BufferOutputStream::Create()); + + RETURN_NOT_OK(WriteCSV(data, options, default_memory_pool(), out.get())); + ASSIGN_OR_RAISE(std::shared_ptr buffer, out->Finish()); + return std::string(reinterpret_cast(buffer->data()), buffer->size()); + } +}; TEST_P(TestWriteCSV, TestWrite) { ASSERT_OK_AND_ASSIGN(std::shared_ptr out, io::BufferOutputStream::Create()); WriteOptions options = GetParam().options; - - ASSERT_OK( - WriteCSV(*GetParam().record_batch, options, default_memory_pool(), out.get())); - ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, out->Finish()); - EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), - GetParam().expected_output); - ASSERT_OK(out->Reset()); + std::string csv; + ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options)); + EXPECT_EQ(csv, GetParam().expected_output); // Batch size shouldn't matter. options.batch_size /= 2; - ASSERT_OK( - WriteCSV(*GetParam().record_batch, options, default_memory_pool(), out.get())); - ASSERT_OK_AND_ASSIGN(buffer, out->Finish()); - EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), - GetParam().expected_output); - ASSERT_OK(out->Reset()); + ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options)); + EXPECT_EQ(csv, GetParam().expected_output); // Table and Record batch should work identically. ASSERT_OK_AND_ASSIGN(std::shared_ptr
table, Table::FromRecordBatches({GetParam().record_batch})); - ASSERT_OK(WriteCSV(*table, options, default_memory_pool(), out.get())); - ASSERT_OK_AND_ASSIGN(buffer, out->Finish()); - EXPECT_EQ(std::string(reinterpret_cast(buffer->data()), buffer->size()), - GetParam().expected_output); - ASSERT_OK(out->Reset()); + ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*table, options)); + EXPECT_EQ(csv, GetParam().expected_output); } -INSTANTIATE_TEST_SUITE_P(WriteCSVTest, TestWriteCSV, +INSTANTIATE_TEST_SUITE_P(MultiColumnWriteCSVTest, TestWriteCSV, ::testing::ValuesIn(GenerateTestCases())); +INSTANTIATE_TEST_SUITE_P( + SingleColumnWriteCSVTest, TestWriteCSV, + ::testing::Values(TestParams{ + RecordBatchFromJSON(schema({field("int64", int64())}), + R"([{ "int64": 9999}, {}, { "int64": -15}])"), + WriteOptions(), + R"("int64")" + "\n9999\n\n-15\n"})); + } // namespace csv } // namespace arrow From 5c833a8a17582251207c9b3a1dfdb299d846bdb8 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 22 Feb 2021 03:31:26 +0000 Subject: [PATCH 10/18] hopefully fix lint --- python/pyarrow/_csv.pyx | 5 +++-- python/pyarrow/csv.py | 2 +- python/pyarrow/includes/libarrow.pxd | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 933285e4c33..033367b928c 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -766,6 +766,7 @@ def open_csv(input_file, read_options=None, parse_options=None, maybe_unbox_memory_pool(memory_pool)) return reader + cdef class WriteOptions(_Weakrefable): """ Options for writing CSV files. @@ -787,9 +788,9 @@ cdef class WriteOptions(_Weakrefable): def __init__(self, *, include_header=None, batch_size=None): self.options = CCSVWriteOptions.Defaults() if include_header is not None: - self.options.include_header = include_header + self.options.include_header = include_header if batch_size is not None: - self.options.batch_size = 1024 + self.options.batch_size = 1024 cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): diff --git a/python/pyarrow/csv.py b/python/pyarrow/csv.py index dafe1073209..fc1dcafba0b 100644 --- a/python/pyarrow/csv.py +++ b/python/pyarrow/csv.py @@ -18,5 +18,5 @@ from pyarrow._csv import ( # noqa ReadOptions, ParseOptions, ConvertOptions, ISO8601, - open_csv, read_csv, CSVStreamingReader, write_csv, + open_csv, read_csv, CSVStreamingReader, write_csv, WriteOptions) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index a3aa79291db..87ba397215f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1625,7 +1625,6 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: @staticmethod CCSVWriteOptions Defaults() - cdef cppclass CCSVReader" arrow::csv::TableReader": @staticmethod CResult[shared_ptr[CCSVReader]] Make( From 73826b82d1c4b8afb4a5e19d705216d09b226ecd Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 22 Feb 2021 03:39:58 +0000 Subject: [PATCH 11/18] hopefully make windows builds happy --- cpp/src/arrow/csv/writer.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index 33a0a911a18..a4d45927ff4 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -156,7 +156,7 @@ class UnquotedColumnPopulator : public ColumnPopulator { int64_t next_column_offset = s.length() + /*end_char*/ 1; memcpy((output + *offsets - next_column_offset), s.data(), s.length()); *(output + *offsets - 1) = end_char_; - *offsets -= next_column_offset; + *offsets -= static_cast(next_column_offset); offsets++; }, [&]() { @@ -188,7 +188,8 @@ class QuotedColumnPopulator : public ColumnPopulator { int64_t escaped_count = CountEscapes(s); // TODO: Maybe use 64 bit row lengths or safe cast? row_needs_escaping_[row_number] = escaped_count > 0; - row_lengths[row_number] += s.length() + escaped_count + kQuoteCount; + row_lengths[row_number] += static_cast(s.length()) + + static_cast(escaped_count + kQuoteCount); row_number++; }, [&]() { @@ -213,7 +214,8 @@ class QuotedColumnPopulator : public ColumnPopulator { } else { // Adjust row_end by 3: 1 quote char, 1 end char and 1 to position at the // first position to write to. - next_column_offset = row_end - EscapeReverse(s, row_end - 3); + next_column_offset = + static_cast(row_end - EscapeReverse(s, row_end - 3)); } *(row_end - next_column_offset) = '"'; *(row_end - 2) = '"'; @@ -375,8 +377,8 @@ class CSVConverter { // Calculate relative offsets for each row (excluding delimiters) for (size_t col = 0; col < column_populators_.size(); col++) { - RETURN_NOT_OK( - column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets_.data())); + RETURN_NOT_OK(column_populators_[static_cast(col)]->UpdateRowLengths( + *batch.column(col), offsets_.data())); } // Calculate cumulalative offsets for each row (including delimiters). offsets_[0] += batch.num_columns(); From c5d76eb71521cc71a4bc713aedaf6e3a81c8b4ed Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 22 Feb 2021 03:55:38 +0000 Subject: [PATCH 12/18] more lint and windows --- cpp/src/arrow/csv/writer.cc | 8 ++++---- python/pyarrow/_csv.pyx | 2 +- python/pyarrow/includes/libarrow.pxd | 6 ++++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index a4d45927ff4..9487f52e4da 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -208,7 +208,7 @@ class QuotedColumnPopulator : public ColumnPopulator { char* row_end = output + *offsets; int32_t next_column_offset = 0; if (!*needs_escaping) { - next_column_offset = s.length() + kQuoteDelimiterCount; + next_column_offset = static_cast(s.length() + kQuoteDelimiterCount); memcpy(row_end - next_column_offset + /*quote_offset=*/1, s.data(), s.length()); } else { @@ -376,9 +376,9 @@ class CSVConverter { std::fill(offsets_.begin(), offsets_.end(), 0); // Calculate relative offsets for each row (excluding delimiters) - for (size_t col = 0; col < column_populators_.size(); col++) { - RETURN_NOT_OK(column_populators_[static_cast(col)]->UpdateRowLengths( - *batch.column(col), offsets_.data())); + for (int32_t col = 0; col < static_cast(column_populators_.size()); col++) { + RETURN_NOT_OK( + column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets_.data())); } // Calculate cumulalative offsets for each row (including delimiters). offsets_[0] += batch.num_columns(); diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 033367b928c..8c38712a851 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -774,7 +774,7 @@ cdef class WriteOptions(_Weakrefable): Parameters ---------- include_header : bool, optional (default True) - Whether to include the header + Whether to include the header batch_size : int, optional (default 1024) How many rows to process together when converting and writing CSV diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 87ba397215f..ece8bf31b85 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1644,8 +1644,10 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: # Writer is included explicity to avoid having to set additional # C-Processor definitions in setup.py for cmake. cdef extern from "arrow/csv/writer.h" namespace "arrow::csv" nogil: - cdef CStatus WriteCSV(CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) - cdef CStatus WriteCSV(CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + cdef CStatus WriteCSV( + CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + cdef CStatus WriteCSV( + CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) cdef extern from "arrow/json/options.h" nogil: From 26958913b3abb0c9547018130d62b2c3da5a07b4 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 24 Feb 2021 05:03:56 +0000 Subject: [PATCH 13/18] address comments --- cpp/cmake_modules/SetupCxxFlags.cmake | 5 ----- cpp/src/arrow/csv/api.h | 2 +- cpp/src/arrow/util/config.h.cmake | 2 ++ python/pyarrow/_csv.pyx | 21 ++++++--------------- python/pyarrow/includes/libarrow.pxd | 4 ---- 5 files changed, 9 insertions(+), 25 deletions(-) diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake index b7cb4cd07ec..b534552c3c0 100644 --- a/cpp/cmake_modules/SetupCxxFlags.cmake +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -629,8 +629,3 @@ if(MSVC) set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${MSVC_LINKER_FLAGS}") endif() endif() - -#-------------------------------------------------------------------------------------- -if(ARROW_COMPUTE) - add_definitions(-DARROW_HAVE_COMPUTE_MODULE) -endif() diff --git a/cpp/src/arrow/csv/api.h b/cpp/src/arrow/csv/api.h index 620ee9ff180..7bf39315767 100644 --- a/cpp/src/arrow/csv/api.h +++ b/cpp/src/arrow/csv/api.h @@ -21,6 +21,6 @@ #include "arrow/csv/reader.h" // The writer depends on compute module for casting. -#ifdef ARROW_HAVE_COMPUTE_MODULE +#ifdef ARROW_COMPUTE #include "arrow/csv/writer.h" #endif diff --git a/cpp/src/arrow/util/config.h.cmake b/cpp/src/arrow/util/config.h.cmake index 8f8dea0c6c8..be6686f253e 100644 --- a/cpp/src/arrow/util/config.h.cmake +++ b/cpp/src/arrow/util/config.h.cmake @@ -34,6 +34,8 @@ #define ARROW_PACKAGE_KIND "@ARROW_PACKAGE_KIND@" +#cmakedefine ARROW_COMPUTE + #cmakedefine ARROW_S3 #cmakedefine ARROW_USE_NATIVE_INT128 diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 8c38712a851..47a1c3000be 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -31,6 +31,7 @@ from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, RecordBatchReader, ensure_type, maybe_unbox_memory_pool, get_input_stream, get_writer, native_transcoding_input_stream, + pyarrow_unwrap_batch, pyarrow_unwrap_table, pyarrow_wrap_schema, pyarrow_wrap_table, pyarrow_wrap_data_type, pyarrow_unwrap_data_type, Table, RecordBatch) @@ -803,6 +804,7 @@ cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): def write_csv(data, output_file, write_options=None, MemoryPool memory_pool=None): """ + Writes data to output_file. Parameters ---------- @@ -814,10 +816,6 @@ def write_csv(data, output_file, write_options=None, Options to configure writing the CSV file. memory_pool: MemoryPool, optional Pool for temporary allocations. - - Returns - ------- - None """ cdef: shared_ptr[COutputStream] stream @@ -827,24 +825,17 @@ def write_csv(data, output_file, write_options=None, CTable* table _get_write_options(write_options, &c_write_options) - try: - where = _stringify_path(output_file) - except TypeError: - get_writer(output_file, &stream) - else: - c_where = tobytes(where) - stream = GetResultValue(FileOutputStream.Open(c_where)) - + get_writer(output_file, &stream) c_memory_pool = maybe_unbox_memory_pool(memory_pool) if isinstance(data, RecordBatch): - batch = (data).batch + batch = pyarrow_unwrap_batch(data).get() with nogil: check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool, stream.get())) elif isinstance(data, Table): - table = (
data).table + table = pyarrow_unwrap_table(data).get() with nogil: check_status(WriteCSV(deref(table), c_write_options, c_memory_pool, stream.get())) else: - raise ValueError(type(data)) + raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'") diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ece8bf31b85..a4f6f186284 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1640,10 +1640,6 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: CMemoryPool*, shared_ptr[CInputStream], CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions) - -# Writer is included explicity to avoid having to set additional -# C-Processor definitions in setup.py for cmake. -cdef extern from "arrow/csv/writer.h" namespace "arrow::csv" nogil: cdef CStatus WriteCSV( CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) cdef CStatus WriteCSV( From da03cd696c6cec7fef223c30f3589ce26212da9e Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 24 Feb 2021 05:04:44 +0000 Subject: [PATCH 14/18] add back options --- cpp/src/arrow/csv/options.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 35ca5fdb0d2..134181d8427 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -138,7 +138,7 @@ struct ARROW_EXPORT ReadOptions { }; /// Experimental -struct WriteOptions { +struct ARROW_EXPORT WriteOptions { bool include_header = true; // The writer processes batches of rows together. This is the // maximum number of rows processed at a time. From 2eb225138f595c8a592c6605a55ec790f789b81c Mon Sep 17 00:00:00 2001 From: emkornfield Date: Wed, 24 Feb 2021 08:36:29 -0800 Subject: [PATCH 15/18] Try uint8_t instead of bool. --- cpp/src/arrow/csv/writer.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index 9487f52e4da..7b88afafa7a 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -176,7 +176,7 @@ class QuotedColumnPopulator : public ColumnPopulator { public: QuotedColumnPopulator(MemoryPool* pool, char end_char) : ColumnPopulator(pool, end_char), - row_needs_escaping_(::arrow::stl::allocator(pool)) {} + row_needs_escaping_(::arrow::stl::allocator(pool)) {} Status UpdateRowLengths(int32_t* row_lengths) override { const StringArray& input = *casted_array_; @@ -234,7 +234,7 @@ class QuotedColumnPopulator : public ColumnPopulator { } private: - std::vector> row_needs_escaping_; + std::vector> row_needs_escaping_; }; struct PopulatorFactory { From 23a2b0cfa28cf69b24c3b893253e02101382ca81 Mon Sep 17 00:00:00 2001 From: emkornfield Date: Wed, 24 Feb 2021 12:46:09 -0800 Subject: [PATCH 16/18] change back to bool and remove custom allocator --- cpp/src/arrow/csv/writer.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index 7b88afafa7a..f55a964cc3c 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -176,7 +176,7 @@ class QuotedColumnPopulator : public ColumnPopulator { public: QuotedColumnPopulator(MemoryPool* pool, char end_char) : ColumnPopulator(pool, end_char), - row_needs_escaping_(::arrow::stl::allocator(pool)) {} + row_needs_escaping_() {} Status UpdateRowLengths(int32_t* row_lengths) override { const StringArray& input = *casted_array_; @@ -234,7 +234,10 @@ class QuotedColumnPopulator : public ColumnPopulator { } private: - std::vector> row_needs_escaping_; + // Older version of GCC don't support custom allocators + // at some point we should change this to use memory_pool + // backed allocator. + std::vector row_needs_escaping_; }; struct PopulatorFactory { From 7e33776459873d2a51957168c541f44f34e29699 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 24 Feb 2021 22:11:10 -0800 Subject: [PATCH 17/18] remove from initialize list --- cpp/src/arrow/csv/writer.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index f55a964cc3c..ddd59b46fc1 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -175,8 +175,7 @@ class UnquotedColumnPopulator : public ColumnPopulator { class QuotedColumnPopulator : public ColumnPopulator { public: QuotedColumnPopulator(MemoryPool* pool, char end_char) - : ColumnPopulator(pool, end_char), - row_needs_escaping_() {} + : ColumnPopulator(pool, end_char) {} Status UpdateRowLengths(int32_t* row_lengths) override { const StringArray& input = *casted_array_; From a5ee563f65696cba958e025d2e0f22b47fad4cc4 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 25 Feb 2021 14:14:41 +0100 Subject: [PATCH 18/18] Improve docstrings Add Python properties and test them --- cpp/src/arrow/csv/options.h | 10 +++++--- python/pyarrow/_csv.pyx | 41 +++++++++++++++++++++++++------- python/pyarrow/tests/test_csv.py | 17 ++++++++++++- 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 134181d8427..5c912e7fd85 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -139,12 +139,16 @@ struct ARROW_EXPORT ReadOptions { /// Experimental struct ARROW_EXPORT WriteOptions { + /// Whether to write an initial header line with column names bool include_header = true; - // The writer processes batches of rows together. This is the - // maximum number of rows processed at a time. + + /// \brief Maximum number of rows processed at a time + /// + /// The CSV writer converts and writes data in batches of N rows. + /// This number can impact performance. int32_t batch_size = 1024; - // Create write options with default values. + /// Create write options with default values static WriteOptions Defaults(); }; diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 47a1c3000be..f5b8e4d5fba 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -775,10 +775,10 @@ cdef class WriteOptions(_Weakrefable): Parameters ---------- include_header : bool, optional (default True) - Whether to include the header + Whether to write an initial header line with column names batch_size : int, optional (default 1024) How many rows to process together when converting and writing - CSV + CSV data """ cdef: CCSVWriteOptions options @@ -789,9 +789,32 @@ cdef class WriteOptions(_Weakrefable): def __init__(self, *, include_header=None, batch_size=None): self.options = CCSVWriteOptions.Defaults() if include_header is not None: - self.options.include_header = include_header + self.include_header = include_header if batch_size is not None: - self.options.batch_size = 1024 + self.batch_size = batch_size + + @property + def include_header(self): + """ + Whether to write an initial header line with column names. + """ + return self.options.include_header + + @include_header.setter + def include_header(self, value): + self.options.include_header = value + + @property + def batch_size(self): + """ + How many rows to process together when converting and writing + CSV data. + """ + return self.options.batch_size + + @batch_size.setter + def batch_size(self, value): + self.options.batch_size = value cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): @@ -804,16 +827,16 @@ cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): def write_csv(data, output_file, write_options=None, MemoryPool memory_pool=None): """ - Writes data to output_file. + Write record batch or table to a CSV file. Parameters ---------- - data: The data to write. - Either a pyarrow.RecordBatch or a pyarrow.Table + data: pyarrow.RecordBatch or pyarrow.Table + The data to write. output_file: string, path, pyarrow.OutputStream or file-like object - The location of CSV data. + The location where to write the CSV data. write_options: pyarrow.csv.WriteOptions - Options to configure writing the CSV file. + Options to configure writing the CSV data. memory_pool: MemoryPool, optional Pool for temporary allocations. """ diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 89f5322edd1..5ca31aefebc 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -204,6 +204,21 @@ def test_convert_options(): assert opts.timestamp_parsers == [ISO8601, '%Y-%m-%d'] +def test_write_options(): + cls = WriteOptions + opts = cls() + + check_options_class( + cls, include_header=[True, False]) + + assert opts.batch_size > 0 + opts.batch_size = 12345 + assert opts.batch_size == 12345 + + opts = cls(batch_size=9876) + assert opts.batch_size == 9876 + + class BaseTestCSVRead: def read_bytes(self, b, **kwargs): @@ -1264,7 +1279,7 @@ def test_write_read_round_trip(): t = pa.Table.from_arrays([[1, 2, 3], ["a", "b", "c"]], ["c1", "c2"]) record_batch = t.to_batches(max_chunksize=4)[0] for data in [t, record_batch]: - # test with header + # Test with header buf = io.BytesIO() write_csv(data, buf, WriteOptions(include_header=True)) buf.seek(0)