diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 382a851c159..abd5428b3d7 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -349,6 +349,9 @@ if(ARROW_CSV) csv/options.cc csv/parser.cc csv/reader.cc) + if(ARROW_COMPUTE) + list(APPEND ARROW_SRCS csv/writer.cc) + endif() list(APPEND ARROW_TESTING_SRCS csv/test_common.cc) endif() diff --git a/cpp/src/arrow/csv/CMakeLists.txt b/cpp/src/arrow/csv/CMakeLists.txt index 2766cfd3bd2..561faf1b584 100644 --- a/cpp/src/arrow/csv/CMakeLists.txt +++ b/cpp/src/arrow/csv/CMakeLists.txt @@ -15,14 +15,20 @@ # specific language governing permissions and limitations # under the License. -add_arrow_test(csv-test - SOURCES - chunker_test.cc - column_builder_test.cc - column_decoder_test.cc - converter_test.cc - parser_test.cc - reader_test.cc) +set(CSV_TEST_SRCS + chunker_test.cc + column_builder_test.cc + column_decoder_test.cc + converter_test.cc + parser_test.cc + reader_test.cc) + +# Writer depends on compute's cast functionality +if(ARROW_COMPUTE) + list(APPEND CSV_TEST_SRCS writer_test.cc) +endif() + +add_arrow_test(csv-test SOURCES ${CSV_TEST_SRCS}) add_arrow_benchmark(converter_benchmark PREFIX "arrow-csv") add_arrow_benchmark(parser_benchmark PREFIX "arrow-csv") diff --git a/cpp/src/arrow/csv/api.h b/cpp/src/arrow/csv/api.h index df88843f51b..7bf39315767 100644 --- a/cpp/src/arrow/csv/api.h +++ b/cpp/src/arrow/csv/api.h @@ -19,3 +19,8 @@ #include "arrow/csv/options.h" #include "arrow/csv/reader.h" + +// The writer depends on compute module for casting. +#ifdef ARROW_COMPUTE +#include "arrow/csv/writer.h" +#endif diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc index b6f1346bcd3..a515abf2cf4 100644 --- a/cpp/src/arrow/csv/options.cc +++ b/cpp/src/arrow/csv/options.cc @@ -34,6 +34,7 @@ ConvertOptions ConvertOptions::Defaults() { } ReadOptions ReadOptions::Defaults() { return ReadOptions(); } +WriteOptions WriteOptions::Defaults() { return WriteOptions(); } } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 82153ed466a..5c912e7fd85 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -137,5 +137,20 @@ struct ARROW_EXPORT ReadOptions { static ReadOptions Defaults(); }; +/// Experimental +struct ARROW_EXPORT WriteOptions { + /// Whether to write an initial header line with column names + bool include_header = true; + + /// \brief Maximum number of rows processed at a time + /// + /// The CSV writer converts and writes data in batches of N rows. + /// This number can impact performance. + int32_t batch_size = 1024; + + /// Create write options with default values + static WriteOptions Defaults(); +}; + } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc new file mode 100644 index 00000000000..ddd59b46fc1 --- /dev/null +++ b/cpp/src/arrow/csv/writer.cc @@ -0,0 +1,437 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/csv/writer.h" +#include "arrow/array.h" +#include "arrow/compute/cast.h" +#include "arrow/io/interfaces.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/result_internal.h" +#include "arrow/stl_allocator.h" +#include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" + +#include "arrow/visitor_inline.h" + +namespace arrow { +namespace csv { +// This implementation is intentionally light on configurability to minimize the size of +// the initial PR. Aditional features can be added as there is demand and interest to +// implement them. +// +// The algorithm used here at a high level is to break RecordBatches/Tables into slices +// and convert each slice independently. A slice is then converted to CSV by first +// scanning each column to determine the size of its contents when rendered as a string in +// CSV. For non-string types this requires casting the value to string (which is cached). +// This data is used to understand the precise length of each row and a single allocation +// for the final CSV data buffer. Once the final size is known each column is then +// iterated over again to place its contents into the CSV data buffer. The rationale for +// choosing this approach is it allows for reuse of the cast functionality in the compute +// module and inline data visiting functionality in the core library. A performance +// comparison has not been done using a naive single-pass approach. This approach might +// still be competitive due to reduction in the number of per row branches necessary with +// a single pass approach. Profiling would likely yield further opportunities for +// optimization with this approach. + +namespace { + +struct SliceIteratorFunctor { + Result> Next() { + if (current_offset < batch->num_rows()) { + std::shared_ptr next = batch->Slice(current_offset, slice_size); + current_offset += slice_size; + return next; + } + return IterationTraits>::End(); + } + const RecordBatch* const batch; + const int64_t slice_size; + int64_t current_offset; +}; + +RecordBatchIterator RecordBatchSliceIterator(const RecordBatch& batch, + int64_t slice_size) { + SliceIteratorFunctor functor = {&batch, slice_size, /*offset=*/static_cast(0)}; + return RecordBatchIterator(std::move(functor)); +} + +// Counts the number of characters that need escaping in s. +int64_t CountEscapes(util::string_view s) { + return static_cast(std::count(s.begin(), s.end(), '"')); +} + +// Matching quote pair character length. +constexpr int64_t kQuoteCount = 2; +constexpr int64_t kQuoteDelimiterCount = kQuoteCount + /*end_char*/ 1; + +// Interface for generating CSV data per column. +// The intended usage is to iteratively call UpdateRowLengths for a column and +// then PopulateColumns. PopulateColumns must be called in the reverse order of the +// populators (it populates data backwards). +class ColumnPopulator { + public: + ColumnPopulator(MemoryPool* pool, char end_char) : end_char_(end_char), pool_(pool) {} + + virtual ~ColumnPopulator() = default; + + // Adds the number of characters each entry in data will add to to elements + // in row_lengths. + Status UpdateRowLengths(const Array& data, int32_t* row_lengths) { + compute::ExecContext ctx(pool_); + // Populators are intented to be applied to reasonably small data. In most cases + // threading overhead would not be justified. + ctx.set_use_threads(false); + ASSIGN_OR_RAISE( + std::shared_ptr casted, + compute::Cast(data, /*to_type=*/utf8(), compute::CastOptions(), &ctx)); + casted_array_ = internal::checked_pointer_cast(casted); + return UpdateRowLengths(row_lengths); + } + + // Places string data onto each row in output and updates the corresponding row + // row pointers in preparation for calls to other (preceding) ColumnPopulators. + // Args: + // output: character buffer to write to. + // offsets: an array of end of row column within the the output buffer (values are + // one past the end of the position to write to). + virtual void PopulateColumns(char* output, int32_t* offsets) const = 0; + + protected: + virtual Status UpdateRowLengths(int32_t* row_lengths) = 0; + std::shared_ptr casted_array_; + const char end_char_; + + private: + MemoryPool* const pool_; +}; + +// Copies the contents of to out properly escaping any necessary characters. +// Returns the position prior to last copied character (out_end is decremented). +char* EscapeReverse(arrow::util::string_view s, char* out_end) { + for (const char* val = s.data() + s.length() - 1; val >= s.data(); val--, out_end--) { + if (*val == '"') { + *out_end = *val; + out_end--; + } + *out_end = *val; + } + return out_end; +} + +// Populator for non-string types. This populator relies on compute Cast functionality to +// String if it doesn't exist it will be an error. it also assumes the resulting string +// from a cast does not require quoting or escaping. +class UnquotedColumnPopulator : public ColumnPopulator { + public: + explicit UnquotedColumnPopulator(MemoryPool* memory_pool, char end_char) + : ColumnPopulator(memory_pool, end_char) {} + + Status UpdateRowLengths(int32_t* row_lengths) override { + for (int x = 0; x < casted_array_->length(); x++) { + row_lengths[x] += casted_array_->value_length(x); + } + return Status::OK(); + } + + void PopulateColumns(char* output, int32_t* offsets) const override { + VisitArrayDataInline( + *casted_array_->data(), + [&](arrow::util::string_view s) { + int64_t next_column_offset = s.length() + /*end_char*/ 1; + memcpy((output + *offsets - next_column_offset), s.data(), s.length()); + *(output + *offsets - 1) = end_char_; + *offsets -= static_cast(next_column_offset); + offsets++; + }, + [&]() { + // Nulls are empty (unquoted) to distinguish with empty string. + *(output + *offsets - 1) = end_char_; + *offsets -= 1; + offsets++; + }); + } +}; + +// Strings need special handling to ensure they are escaped properly. +// This class handles escaping assuming that all strings will be quoted +// and that the only character within the string that needs to escaped is +// a quote character (") and escaping is done my adding another quote. +class QuotedColumnPopulator : public ColumnPopulator { + public: + QuotedColumnPopulator(MemoryPool* pool, char end_char) + : ColumnPopulator(pool, end_char) {} + + Status UpdateRowLengths(int32_t* row_lengths) override { + const StringArray& input = *casted_array_; + int row_number = 0; + row_needs_escaping_.resize(casted_array_->length()); + VisitArrayDataInline( + *input.data(), + [&](arrow::util::string_view s) { + int64_t escaped_count = CountEscapes(s); + // TODO: Maybe use 64 bit row lengths or safe cast? + row_needs_escaping_[row_number] = escaped_count > 0; + row_lengths[row_number] += static_cast(s.length()) + + static_cast(escaped_count + kQuoteCount); + row_number++; + }, + [&]() { + row_needs_escaping_[row_number] = false; + row_number++; + }); + return Status::OK(); + } + + void PopulateColumns(char* output, int32_t* offsets) const override { + auto needs_escaping = row_needs_escaping_.begin(); + VisitArrayDataInline( + *(casted_array_->data()), + [&](arrow::util::string_view s) { + // still needs string content length to be added + char* row_end = output + *offsets; + int32_t next_column_offset = 0; + if (!*needs_escaping) { + next_column_offset = static_cast(s.length() + kQuoteDelimiterCount); + memcpy(row_end - next_column_offset + /*quote_offset=*/1, s.data(), + s.length()); + } else { + // Adjust row_end by 3: 1 quote char, 1 end char and 1 to position at the + // first position to write to. + next_column_offset = + static_cast(row_end - EscapeReverse(s, row_end - 3)); + } + *(row_end - next_column_offset) = '"'; + *(row_end - 2) = '"'; + *(row_end - 1) = end_char_; + *offsets -= next_column_offset; + offsets++; + needs_escaping++; + }, + [&]() { + // Nulls are empty (unquoted) to distinguish with empty string. + *(output + *offsets - 1) = end_char_; + *offsets -= 1; + offsets++; + needs_escaping++; + }); + } + + private: + // Older version of GCC don't support custom allocators + // at some point we should change this to use memory_pool + // backed allocator. + std::vector row_needs_escaping_; +}; + +struct PopulatorFactory { + template + enable_if_t::value || + std::is_same::value, + Status> + Visit(const TypeClass& type) { + populator = new QuotedColumnPopulator(pool, end_char); + return Status::OK(); + } + + template + enable_if_dictionary Visit(const TypeClass& type) { + return VisitTypeInline(*type.value_type(), this); + } + + template + enable_if_t::value || is_extension_type::value, + Status> + Visit(const TypeClass& type) { + return Status::Invalid("Unsupported Type:", type.ToString()); + } + + template + enable_if_t::value || is_decimal_type::value || + is_null_type::value || is_temporal_type::value, + Status> + Visit(const TypeClass& type) { + populator = new UnquotedColumnPopulator(pool, end_char); + return Status::OK(); + } + + char end_char; + MemoryPool* pool; + ColumnPopulator* populator; +}; + +Result> MakePopulator(const Field& field, char end_char, + MemoryPool* pool) { + PopulatorFactory factory{end_char, pool, nullptr}; + RETURN_NOT_OK(VisitTypeInline(*field.type(), &factory)); + return std::unique_ptr(factory.populator); +} + +class CSVConverter { + public: + static Result> Make(std::shared_ptr schema, + MemoryPool* pool) { + std::vector> populators(schema->num_fields()); + for (int col = 0; col < schema->num_fields(); col++) { + char end_char = col < schema->num_fields() - 1 ? ',' : '\n'; + ASSIGN_OR_RAISE(populators[col], + MakePopulator(*schema->field(col), end_char, pool)); + } + return std::unique_ptr( + new CSVConverter(std::move(schema), std::move(populators), pool)); + } + + Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, + io::OutputStream* out) { + RETURN_NOT_OK(PrepareForContentsWrite(options, out)); + RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size); + for (auto maybe_slice : iterator) { + ASSIGN_OR_RAISE(std::shared_ptr slice, maybe_slice); + RETURN_NOT_OK(TranslateMinimalBatch(*slice)); + RETURN_NOT_OK(out->Write(data_buffer_)); + } + return Status::OK(); + } + + Status WriteCSV(const Table& table, const WriteOptions& options, + io::OutputStream* out) { + TableBatchReader reader(table); + reader.set_chunksize(options.batch_size); + RETURN_NOT_OK(PrepareForContentsWrite(options, out)); + std::shared_ptr batch; + RETURN_NOT_OK(reader.ReadNext(&batch)); + while (batch != nullptr) { + RETURN_NOT_OK(TranslateMinimalBatch(*batch)); + RETURN_NOT_OK(out->Write(data_buffer_)); + RETURN_NOT_OK(reader.ReadNext(&batch)); + } + + return Status::OK(); + } + + private: + CSVConverter(std::shared_ptr schema, + std::vector> populators, MemoryPool* pool) + : column_populators_(std::move(populators)), + offsets_(0, 0, ::arrow::stl::allocator(pool)), + schema_(std::move(schema)), + pool_(pool) {} + + Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) { + if (data_buffer_ == nullptr) { + ASSIGN_OR_RAISE( + data_buffer_, + AllocateResizableBuffer( + options.batch_size * schema_->num_fields() * kColumnSizeGuess, pool_)); + } + if (options.include_header) { + RETURN_NOT_OK(WriteHeader(out)); + } + return Status::OK(); + } + + int64_t CalculateHeaderSize() const { + int64_t header_length = 0; + for (int col = 0; col < schema_->num_fields(); col++) { + const std::string& col_name = schema_->field(col)->name(); + header_length += col_name.size(); + header_length += CountEscapes(col_name); + } + return header_length + (kQuoteDelimiterCount * schema_->num_fields()); + } + + Status WriteHeader(io::OutputStream* out) { + RETURN_NOT_OK(data_buffer_->Resize(CalculateHeaderSize(), /*shrink_to_fit=*/false)); + char* next = + reinterpret_cast(data_buffer_->mutable_data() + data_buffer_->size() - 1); + for (int col = schema_->num_fields() - 1; col >= 0; col--) { + *next-- = ','; + *next-- = '"'; + next = EscapeReverse(schema_->field(col)->name(), next); + *next-- = '"'; + } + *(data_buffer_->mutable_data() + data_buffer_->size() - 1) = '\n'; + DCHECK_EQ(reinterpret_cast(next + 1), data_buffer_->data()); + return out->Write(data_buffer_); + } + + Status TranslateMinimalBatch(const RecordBatch& batch) { + if (batch.num_rows() == 0) { + return Status::OK(); + } + offsets_.resize(batch.num_rows()); + std::fill(offsets_.begin(), offsets_.end(), 0); + + // Calculate relative offsets for each row (excluding delimiters) + for (int32_t col = 0; col < static_cast(column_populators_.size()); col++) { + RETURN_NOT_OK( + column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets_.data())); + } + // Calculate cumulalative offsets for each row (including delimiters). + offsets_[0] += batch.num_columns(); + for (int64_t row = 1; row < batch.num_rows(); row++) { + offsets_[row] += offsets_[row - 1] + /*delimiter lengths*/ batch.num_columns(); + } + // Resize the target buffer to required size. We assume batch to batch sizes + // should be pretty close so don't shrink the buffer to avoid allocation churn. + RETURN_NOT_OK(data_buffer_->Resize(offsets_.back(), /*shrink_to_fit=*/false)); + + // Use the offsets to populate contents. + for (auto populator = column_populators_.rbegin(); + populator != column_populators_.rend(); populator++) { + (*populator) + ->PopulateColumns(reinterpret_cast(data_buffer_->mutable_data()), + offsets_.data()); + } + DCHECK_EQ(0, offsets_[0]); + return Status::OK(); + } + + static constexpr int64_t kColumnSizeGuess = 8; + std::vector> column_populators_; + std::vector> offsets_; + std::shared_ptr data_buffer_; + const std::shared_ptr schema_; + MemoryPool* pool_; +}; + +} // namespace + +Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool, + arrow::io::OutputStream* output) { + if (pool == nullptr) { + pool = default_memory_pool(); + } + ASSIGN_OR_RAISE(std::unique_ptr converter, + CSVConverter::Make(table.schema(), pool)); + return converter->WriteCSV(table, options, output); +} + +Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, MemoryPool* pool, + arrow::io::OutputStream* output) { + if (pool == nullptr) { + pool = default_memory_pool(); + } + + ASSIGN_OR_RAISE(std::unique_ptr converter, + CSVConverter::Make(batch.schema(), pool)); + return converter->WriteCSV(batch, options, output); +} + +} // namespace csv +} // namespace arrow diff --git a/cpp/src/arrow/csv/writer.h b/cpp/src/arrow/csv/writer.h new file mode 100644 index 00000000000..c009d7849f4 --- /dev/null +++ b/cpp/src/arrow/csv/writer.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/csv/options.h" +#include "arrow/io/interfaces.h" +#include "arrow/record_batch.h" +#include "arrow/table.h" + +namespace arrow { +namespace csv { +// Functionality for converting Arrow data to Comma separated value text. +// This library supports all primitive types that can be cast to a StringArrays. +// It applies to following formatting rules: +// - For non-binary types no quotes surround values. Nulls are represented as the empty +// string. +// - For binary types all non-null data is quoted (and quotes within data are escaped +// with an additional quote). +// Null values are empty and unquoted. +// - LF (\n) is always used as a line ending. + +/// \brief Converts table to a CSV and writes the results to output. +/// Experimental +ARROW_EXPORT Status WriteCSV(const Table& table, const WriteOptions& options, + MemoryPool* pool, arrow::io::OutputStream* output); +/// \brief Converts batch to CSV and writes the results to output. +/// Experimental +ARROW_EXPORT Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, + MemoryPool* pool, arrow::io::OutputStream* output); + +} // namespace csv +} // namespace arrow diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc new file mode 100644 index 00000000000..dc59fefa8fe --- /dev/null +++ b/cpp/src/arrow/csv/writer_test.cc @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/csv/writer.h" +#include "arrow/io/memory.h" +#include "arrow/record_batch.h" +#include "arrow/result_internal.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" + +namespace arrow { +namespace csv { + +struct TestParams { + std::shared_ptr record_batch; + WriteOptions options; + std::string expected_output; +}; + +WriteOptions DefaultTestOptions(bool include_header) { + WriteOptions options; + options.batch_size = 5; + options.include_header = include_header; + return options; +} + +std::vector GenerateTestCases() { + auto abc_schema = schema({ + {field("a", uint64())}, + {field("b\"", utf8())}, + {field("c ", int32())}, + }); + auto empty_batch = + RecordBatch::Make(abc_schema, /*num_rows=*/0, + { + ArrayFromJSON(abc_schema->field(0)->type(), "[]"), + ArrayFromJSON(abc_schema->field(1)->type(), "[]"), + ArrayFromJSON(abc_schema->field(2)->type(), "[]"), + }); + auto populated_batch = RecordBatchFromJSON(abc_schema, R"([{"a": 1, "c ": -1}, + { "a": 1, "b\"": "abc\"efg", "c ": 2324}, + { "b\"": "abcd", "c ": 5467}, + { }, + { "a": 546, "b\"": "", "c ": 517 }, + { "a": 124, "b\"": "a\"\"b\"" }])"); + std::string expected_without_header = std::string("1,,-1") + "\n" + // line 1 + +R"(1,"abc""efg",2324)" + "\n" + // line 2 + R"(,"abcd",5467)" + "\n" + // line 3 + R"(,,)" + "\n" + // line 4 + R"(546,"",517)" + "\n" + // line 5 + R"(124,"a""""b""",)" + "\n"; // line 6 + std::string expected_header = std::string(R"("a","b""","c ")") + "\n"; + + return std::vector{ + {empty_batch, DefaultTestOptions(/*header=*/false), ""}, + {empty_batch, DefaultTestOptions(/*header=*/true), expected_header}, + {populated_batch, DefaultTestOptions(/*header=*/false), expected_without_header}, + {populated_batch, DefaultTestOptions(/*header=*/true), + expected_header + expected_without_header}}; +} + +class TestWriteCSV : public ::testing::TestWithParam { + protected: + template + Result ToCsvString(const Data& data, const WriteOptions& options) { + std::shared_ptr out; + ASSIGN_OR_RAISE(out, io::BufferOutputStream::Create()); + + RETURN_NOT_OK(WriteCSV(data, options, default_memory_pool(), out.get())); + ASSIGN_OR_RAISE(std::shared_ptr buffer, out->Finish()); + return std::string(reinterpret_cast(buffer->data()), buffer->size()); + } +}; + +TEST_P(TestWriteCSV, TestWrite) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + io::BufferOutputStream::Create()); + WriteOptions options = GetParam().options; + std::string csv; + ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options)); + EXPECT_EQ(csv, GetParam().expected_output); + + // Batch size shouldn't matter. + options.batch_size /= 2; + ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options)); + EXPECT_EQ(csv, GetParam().expected_output); + + // Table and Record batch should work identically. + ASSERT_OK_AND_ASSIGN(std::shared_ptr table, + Table::FromRecordBatches({GetParam().record_batch})); + ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*table, options)); + EXPECT_EQ(csv, GetParam().expected_output); +} + +INSTANTIATE_TEST_SUITE_P(MultiColumnWriteCSVTest, TestWriteCSV, + ::testing::ValuesIn(GenerateTestCases())); + +INSTANTIATE_TEST_SUITE_P( + SingleColumnWriteCSVTest, TestWriteCSV, + ::testing::Values(TestParams{ + RecordBatchFromJSON(schema({field("int64", int64())}), + R"([{ "int64": 9999}, {}, { "int64": -15}])"), + WriteOptions(), + R"("int64")" + "\n9999\n\n-15\n"})); + +} // namespace csv +} // namespace arrow diff --git a/cpp/src/arrow/ipc/json_simple.cc b/cpp/src/arrow/ipc/json_simple.cc index fba8194aeb1..caf6fd06b9c 100644 --- a/cpp/src/arrow/ipc/json_simple.cc +++ b/cpp/src/arrow/ipc/json_simple.cc @@ -43,6 +43,7 @@ #include #include #include +#include namespace rj = arrow::rapidjson; @@ -652,8 +653,11 @@ class StructConverter final : public ConcreteConverter { } } if (remaining > 0) { + rj::StringBuffer sb; + rj::Writer writer(sb); + json_obj.Accept(writer); return Status::Invalid("Unexpected members in JSON object for type ", - type_->ToString()); + type_->ToString(), " Object: ", sb.GetString()); } return builder_->Append(); } diff --git a/cpp/src/arrow/util/config.h.cmake b/cpp/src/arrow/util/config.h.cmake index 8f8dea0c6c8..be6686f253e 100644 --- a/cpp/src/arrow/util/config.h.cmake +++ b/cpp/src/arrow/util/config.h.cmake @@ -34,6 +34,8 @@ #define ARROW_PACKAGE_KIND "@ARROW_PACKAGE_KIND@" +#cmakedefine ARROW_COMPUTE + #cmakedefine ARROW_S3 #cmakedefine ARROW_USE_NATIVE_INT128 diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 75ccf283aa5..771b209a406 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -64,7 +64,8 @@ template class Iterator : public util::EqualityComparable> { public: /// \brief Iterator may be constructed from any type which has a member function - /// with signature Status Next(T*); + /// with signature Result Next(); + /// End of iterator is signalled by returning IteratorTraits::End(); /// /// The argument is moved or copied to the heap and kept in a unique_ptr. Only /// its destructor and its Next method (which are stored in function pointers) are diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 4068a0b9141..f5b8e4d5fba 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -30,10 +30,13 @@ from pyarrow.includes.libarrow cimport * from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, RecordBatchReader, ensure_type, maybe_unbox_memory_pool, get_input_stream, - native_transcoding_input_stream, + get_writer, native_transcoding_input_stream, + pyarrow_unwrap_batch, pyarrow_unwrap_table, pyarrow_wrap_schema, pyarrow_wrap_table, - pyarrow_wrap_data_type, pyarrow_unwrap_data_type) + pyarrow_wrap_data_type, pyarrow_unwrap_data_type, + Table, RecordBatch) from pyarrow.lib import frombytes, tobytes +from pyarrow.util import _stringify_path cdef unsigned char _single_char(s) except 0: @@ -763,3 +766,99 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + + +cdef class WriteOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to write an initial header line with column names + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV data + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.include_header = include_header + if batch_size is not None: + self.batch_size = batch_size + + @property + def include_header(self): + """ + Whether to write an initial header line with column names. + """ + return self.options.include_header + + @include_header.setter + def include_header(self, value): + self.options.include_header = value + + @property + def batch_size(self): + """ + How many rows to process together when converting and writing + CSV data. + """ + return self.options.batch_size + + @batch_size.setter + def batch_size(self, value): + self.options.batch_size = value + + +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, + MemoryPool memory_pool=None): + """ + Write record batch or table to a CSV file. + + Parameters + ---------- + data: pyarrow.RecordBatch or pyarrow.Table + The data to write. + output_file: string, path, pyarrow.OutputStream or file-like object + The location where to write the CSV data. + write_options: pyarrow.csv.WriteOptions + Options to configure writing the CSV data. + memory_pool: MemoryPool, optional + Pool for temporary allocations. + """ + cdef: + shared_ptr[COutputStream] stream + CCSVWriteOptions c_write_options + CMemoryPool* c_memory_pool + CRecordBatch* batch + CTable* table + _get_write_options(write_options, &c_write_options) + + get_writer(output_file, &stream) + c_memory_pool = maybe_unbox_memory_pool(memory_pool) + if isinstance(data, RecordBatch): + batch = pyarrow_unwrap_batch(data).get() + with nogil: + check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool, + stream.get())) + elif isinstance(data, Table): + table = pyarrow_unwrap_table(data).get() + with nogil: + check_status(WriteCSV(deref(table), c_write_options, c_memory_pool, + stream.get())) + else: + raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'") diff --git a/python/pyarrow/csv.py b/python/pyarrow/csv.py index b116ea11d83..fc1dcafba0b 100644 --- a/python/pyarrow/csv.py +++ b/python/pyarrow/csv.py @@ -18,4 +18,5 @@ from pyarrow._csv import ( # noqa ReadOptions, ParseOptions, ConvertOptions, ISO8601, - open_csv, read_csv, CSVStreamingReader) + open_csv, read_csv, CSVStreamingReader, write_csv, + WriteOptions) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ba3c3ad7d2b..a4f6f186284 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1618,6 +1618,13 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: @staticmethod CCSVReadOptions Defaults() + cdef cppclass CCSVWriteOptions" arrow::csv::WriteOptions": + c_bool include_header + int32_t batch_size + + @staticmethod + CCSVWriteOptions Defaults() + cdef cppclass CCSVReader" arrow::csv::TableReader": @staticmethod CResult[shared_ptr[CCSVReader]] Make( @@ -1633,6 +1640,11 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: CMemoryPool*, shared_ptr[CInputStream], CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions) + cdef CStatus WriteCSV( + CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + cdef CStatus WriteCSV( + CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*) + cdef extern from "arrow/json/options.h" nogil: diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 462fe011492..5ca31aefebc 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -36,7 +36,8 @@ import pyarrow as pa from pyarrow.csv import ( - open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601) + open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601, + write_csv, WriteOptions) def generate_col_names(): @@ -203,6 +204,21 @@ def test_convert_options(): assert opts.timestamp_parsers == [ISO8601, '%Y-%m-%d'] +def test_write_options(): + cls = WriteOptions + opts = cls() + + check_options_class( + cls, include_header=[True, False]) + + assert opts.batch_size > 0 + opts.batch_size = 12345 + assert opts.batch_size == 12345 + + opts = cls(batch_size=9876) + assert opts.batch_size == 9876 + + class BaseTestCSVRead: def read_bytes(self, b, **kwargs): @@ -1257,3 +1273,22 @@ def test_read_csv_does_not_close_passed_file_handles(): buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6") read_csv(buf) assert not buf.closed + + +def test_write_read_round_trip(): + t = pa.Table.from_arrays([[1, 2, 3], ["a", "b", "c"]], ["c1", "c2"]) + record_batch = t.to_batches(max_chunksize=4)[0] + for data in [t, record_batch]: + # Test with header + buf = io.BytesIO() + write_csv(data, buf, WriteOptions(include_header=True)) + buf.seek(0) + assert t == read_csv(buf) + + # Test without header + buf = io.BytesIO() + write_csv(data, buf, WriteOptions(include_header=False)) + buf.seek(0) + + read_options = ReadOptions(column_names=t.column_names) + assert t == read_csv(buf, read_options=read_options)