From a4a0ae0094b0008c4048d321e978faf639e9b562 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Sat, 3 Jan 2026 14:45:34 -0800 Subject: [PATCH 1/5] GH-36889: [C++][Python] Fix duplicate CSV header when first batch is empty When writing CSV, if the first record batch was empty, the header would be written twice. This happened because: 1. Header is written to data_buffer_ and flushed during initialization 2. TranslateMinimalBatch returns early for empty batches without modifying data_buffer_ 3. The loop then writes data_buffer_ which still contains the header The fix clears the buffer (resize to 0) when encountering an empty batch, so the subsequent write outputs nothing. Added C++ and Python tests for empty batches at start and in middle of tables. Claude-Generated-By: Claude Code (cli/claude-opus-4-5=1%) Claude-Steers: 2 Claude-Permission-Prompts: 2 Claude-Escapes: 1 --- cpp/src/arrow/csv/writer.cc | 2 ++ cpp/src/arrow/csv/writer_test.cc | 44 ++++++++++++++++++++++++++++++++ python/pyarrow/tests/test_csv.py | 33 ++++++++++++++++++++++++ 3 files changed, 79 insertions(+) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index 5d14fe4b9b1..cb3d42e9348 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -659,6 +659,8 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { Status TranslateMinimalBatch(const RecordBatch& batch) { if (batch.num_rows() == 0) { + // GH-36889: Clear buffer to avoid writing stale content (e.g., header) + RETURN_NOT_OK(data_buffer_->Resize(0, /*shrink_to_fit=*/false)); return Status::OK(); } offsets_.resize(batch.num_rows()); diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc index 783d7631ab3..82c8cd7b068 100644 --- a/cpp/src/arrow/csv/writer_test.cc +++ b/cpp/src/arrow/csv/writer_test.cc @@ -28,6 +28,7 @@ #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" #include "arrow/result.h" +#include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/type.h" @@ -405,5 +406,48 @@ INSTANTIATE_TEST_SUITE_P( "\n2016-02-29 10:42:23-0700,2016-02-29 17:42:23Z\n"))); #endif +// GH-36889: Empty batches at the start should not cause duplicate headers +TEST(TestWriteCSV, EmptyBatchAtStart) { + auto schema = arrow::schema({field("col1", utf8())}); + auto empty_batch = RecordBatchFromJSON(schema, "[]"); + auto data_batch = RecordBatchFromJSON(schema, R"([{"col1": "a"}, {"col1": "b"}])"); + + // Concatenate empty table with data table + ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch})); + ASSERT_OK_AND_ASSIGN(auto data_table, Table::FromRecordBatches(schema, {data_batch})); + ASSERT_OK_AND_ASSIGN(auto combined_table, + ConcatenateTables({empty_table, data_table})); + + ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create()); + ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get())); + ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish()); + + std::string result(reinterpret_cast(buffer->data()), buffer->size()); + // Should have exactly one header, not two + EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n"); +} + +// GH-36889: Empty batches in the middle should not cause issues +TEST(TestWriteCSV, EmptyBatchInMiddle) { + auto schema = arrow::schema({field("col1", utf8())}); + auto batch1 = RecordBatchFromJSON(schema, R"([{"col1": "a"}])"); + auto empty_batch = RecordBatchFromJSON(schema, "[]"); + auto batch2 = RecordBatchFromJSON(schema, R"([{"col1": "b"}])"); + + ASSERT_OK_AND_ASSIGN(auto table1, Table::FromRecordBatches(schema, {batch1})); + ASSERT_OK_AND_ASSIGN(auto empty_table, + Table::FromRecordBatches(schema, {empty_batch})); + ASSERT_OK_AND_ASSIGN(auto table2, Table::FromRecordBatches(schema, {batch2})); + ASSERT_OK_AND_ASSIGN(auto combined_table, + ConcatenateTables({table1, empty_table, table2})); + + ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create()); + ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get())); + ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish()); + + std::string result(reinterpret_cast(buffer->data()), buffer->size()); + EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n"); +} + } // namespace csv } // namespace arrow diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index f510c6dbe23..cf681c1fdcc 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -2065,3 +2065,36 @@ def readinto(self, *args): for i in range(20): with pytest.raises(pa.ArrowInvalid): read_csv(MyBytesIO(data)) + + +def test_write_csv_empty_batch_no_duplicate_header(): + # GH-36889: Empty batches at the start should not cause duplicate headers + table = pa.table({"col1": ["a", "b", "c"]}) + + # Concatenate empty table with data table + empty_table = table.schema.empty_table() + combined = pa.concat_tables([empty_table, table]) + + buf = io.BytesIO() + write_csv(combined, buf) + buf.seek(0) + result = buf.read() + + # Should have exactly one header, not two + assert result == b'"col1"\n"a"\n"b"\n"c"\n' + + +def test_write_csv_empty_batch_in_middle(): + # GH-36889: Empty batches in the middle should not cause issues + table1 = pa.table({"col1": ["a"]}) + table2 = pa.table({"col1": ["b"]}) + empty_table = table1.schema.empty_table() + + combined = pa.concat_tables([table1, empty_table, table2]) + + buf = io.BytesIO() + write_csv(combined, buf) + buf.seek(0) + result = buf.read() + + assert result == b'"col1"\n"a"\n"b"\n' From 664e11d976389b40e091eb2355522251ce4624c6 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Sun, 4 Jan 2026 09:48:14 -0800 Subject: [PATCH 2/5] Fix clang-format styling in writer_test.cc Signed-off-by: Ruiyang Wang --- cpp/src/arrow/csv/writer_test.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc index 82c8cd7b068..6f68f019d3f 100644 --- a/cpp/src/arrow/csv/writer_test.cc +++ b/cpp/src/arrow/csv/writer_test.cc @@ -415,8 +415,7 @@ TEST(TestWriteCSV, EmptyBatchAtStart) { // Concatenate empty table with data table ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch})); ASSERT_OK_AND_ASSIGN(auto data_table, Table::FromRecordBatches(schema, {data_batch})); - ASSERT_OK_AND_ASSIGN(auto combined_table, - ConcatenateTables({empty_table, data_table})); + ASSERT_OK_AND_ASSIGN(auto combined_table, ConcatenateTables({empty_table, data_table})); ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create()); ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get())); @@ -435,8 +434,7 @@ TEST(TestWriteCSV, EmptyBatchInMiddle) { auto batch2 = RecordBatchFromJSON(schema, R"([{"col1": "b"}])"); ASSERT_OK_AND_ASSIGN(auto table1, Table::FromRecordBatches(schema, {batch1})); - ASSERT_OK_AND_ASSIGN(auto empty_table, - Table::FromRecordBatches(schema, {empty_batch})); + ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch})); ASSERT_OK_AND_ASSIGN(auto table2, Table::FromRecordBatches(schema, {batch2})); ASSERT_OK_AND_ASSIGN(auto combined_table, ConcatenateTables({table1, empty_table, table2})); From 84841ea1ae17ffb29033b6f358024cbf602ce093 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 12 Jan 2026 18:30:51 -0800 Subject: [PATCH 3/5] Refactor: Use WriteAndClearBuffer helper to prevent stale content Addresses review feedback from HuaHuaY: Instead of clearing the buffer in TranslateMinimalBatch for empty batches, use a WriteAndClearBuffer() helper that writes and clears the buffer in all write paths. This is cleaner because: - Every write follows the same pattern (write -> clear) - Easier to reason about for future write stages - The invariant is explicit: buffer is always clean after flush --- cpp/src/arrow/csv/writer.cc | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index cb3d42e9348..d6bf1e1ef9f 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -541,7 +541,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { for (auto maybe_slice : iterator) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr slice, maybe_slice); RETURN_NOT_OK(TranslateMinimalBatch(*slice)); - RETURN_NOT_OK(sink_->Write(data_buffer_)); + RETURN_NOT_OK(WriteAndClearBuffer()); stats_.num_record_batches++; } return Status::OK(); @@ -554,7 +554,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { RETURN_NOT_OK(reader.ReadNext(&batch)); while (batch != nullptr) { RETURN_NOT_OK(TranslateMinimalBatch(*batch)); - RETURN_NOT_OK(sink_->Write(data_buffer_)); + RETURN_NOT_OK(WriteAndClearBuffer()); RETURN_NOT_OK(reader.ReadNext(&batch)); stats_.num_record_batches++; } @@ -590,6 +590,13 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { return Status::OK(); } + // GH-36889: Write buffer to sink and clear it to avoid stale content + // being written again if the next batch is empty. + Status WriteAndClearBuffer() { + RETURN_NOT_OK(sink_->Write(data_buffer_)); + return data_buffer_->Resize(0, /*shrink_to_fit=*/false); + } + int64_t CalculateHeaderSize(QuotingStyle quoting_style) const { int64_t header_length = 0; for (int col = 0; col < schema_->num_fields(); col++) { @@ -654,13 +661,11 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { next += options_.eol.size(); DCHECK_EQ(reinterpret_cast(next), data_buffer_->data() + data_buffer_->size()); - return sink_->Write(data_buffer_); + return WriteAndClearBuffer(); } Status TranslateMinimalBatch(const RecordBatch& batch) { if (batch.num_rows() == 0) { - // GH-36889: Clear buffer to avoid writing stale content (e.g., header) - RETURN_NOT_OK(data_buffer_->Resize(0, /*shrink_to_fit=*/false)); return Status::OK(); } offsets_.resize(batch.num_rows()); From 2a16d2a225dee943d96ea46e92111fc180adf3e1 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Mon, 12 Jan 2026 22:28:12 -0800 Subject: [PATCH 4/5] Update cpp/src/arrow/csv/writer_test.cc Co-authored-by: Gang Wu --- cpp/src/arrow/csv/writer_test.cc | 58 +++++++++++++------------------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc index 6f68f019d3f..ce4d8ab16d0 100644 --- a/cpp/src/arrow/csv/writer_test.cc +++ b/cpp/src/arrow/csv/writer_test.cc @@ -406,45 +406,35 @@ INSTANTIATE_TEST_SUITE_P( "\n2016-02-29 10:42:23-0700,2016-02-29 17:42:23Z\n"))); #endif -// GH-36889: Empty batches at the start should not cause duplicate headers -TEST(TestWriteCSV, EmptyBatchAtStart) { +TEST(TestWriteCSV, EmptyBatchShouldNotPolluteOutput) { auto schema = arrow::schema({field("col1", utf8())}); auto empty_batch = RecordBatchFromJSON(schema, "[]"); - auto data_batch = RecordBatchFromJSON(schema, R"([{"col1": "a"}, {"col1": "b"}])"); + auto batch_a = RecordBatchFromJSON(schema, R"([{"col1": "a"}])"); + auto batch_b = RecordBatchFromJSON(schema, R"([{"col1": "b"}])"); - // Concatenate empty table with data table - ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch})); - ASSERT_OK_AND_ASSIGN(auto data_table, Table::FromRecordBatches(schema, {data_batch})); - ASSERT_OK_AND_ASSIGN(auto combined_table, ConcatenateTables({empty_table, data_table})); - - ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create()); - ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get())); - ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish()); - - std::string result(reinterpret_cast(buffer->data()), buffer->size()); - // Should have exactly one header, not two - EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n"); -} - -// GH-36889: Empty batches in the middle should not cause issues -TEST(TestWriteCSV, EmptyBatchInMiddle) { - auto schema = arrow::schema({field("col1", utf8())}); - auto batch1 = RecordBatchFromJSON(schema, R"([{"col1": "a"}])"); - auto empty_batch = RecordBatchFromJSON(schema, "[]"); - auto batch2 = RecordBatchFromJSON(schema, R"([{"col1": "b"}])"); - - ASSERT_OK_AND_ASSIGN(auto table1, Table::FromRecordBatches(schema, {batch1})); - ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch})); - ASSERT_OK_AND_ASSIGN(auto table2, Table::FromRecordBatches(schema, {batch2})); - ASSERT_OK_AND_ASSIGN(auto combined_table, - ConcatenateTables({table1, empty_table, table2})); + struct TestParam { + std::shared_ptr table; + std::string expected_output; + }; - ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create()); - ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get())); - ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish()); + std::vector test_params = { + // Empty batch in the beginning + {Table::FromRecordBatches(schema, {empty_batch, batch_a, batch_b}).ValueOrDie(), + "\"col1\"\n\"a\"\n\"b\"\n"}, + // Empty batch in the middle + {Table::FromRecordBatches(schema, {batch_a, empty_batch, batch_b}).ValueOrDie(), + "\"col1\"\n\"a\"\n\"b\"\n"}, + // Empty batch in the end + {Table::FromRecordBatches(schema, {batch_a, batch_b, empty_batch}).ValueOrDie(), + "\"col1\"\n\"a\"\n\"b\"\n"}, + }; - std::string result(reinterpret_cast(buffer->data()), buffer->size()); - EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n"); + for (const auto& param : test_params) { + ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create()); + ASSERT_OK(WriteCSV(*param.table, WriteOptions::Defaults(), out.get())); + ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish()); + EXPECT_EQ(buffer->ToString(), param.expected_output); + } } } // namespace csv From 902ca154ca695350d5eb884841ac42b4ce082b3d Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 12 Jan 2026 22:30:31 -0800 Subject: [PATCH 5/5] Address review feedback: rename to FlushToSink and consolidate Python tests - Rename WriteAndClearBuffer to FlushToSink (shorter, clearer intent) - Consolidate Python tests into a single parameterized test with 3 cases: empty batch at beginning, middle, and end --- cpp/src/arrow/csv/writer.cc | 10 +++---- python/pyarrow/tests/test_csv.py | 51 ++++++++++++++++---------------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index d6bf1e1ef9f..2db0dba2de7 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -541,7 +541,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { for (auto maybe_slice : iterator) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr slice, maybe_slice); RETURN_NOT_OK(TranslateMinimalBatch(*slice)); - RETURN_NOT_OK(WriteAndClearBuffer()); + RETURN_NOT_OK(FlushToSink()); stats_.num_record_batches++; } return Status::OK(); @@ -554,7 +554,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { RETURN_NOT_OK(reader.ReadNext(&batch)); while (batch != nullptr) { RETURN_NOT_OK(TranslateMinimalBatch(*batch)); - RETURN_NOT_OK(WriteAndClearBuffer()); + RETURN_NOT_OK(FlushToSink()); RETURN_NOT_OK(reader.ReadNext(&batch)); stats_.num_record_batches++; } @@ -590,9 +590,9 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { return Status::OK(); } - // GH-36889: Write buffer to sink and clear it to avoid stale content + // GH-36889: Flush buffer to sink and clear it to avoid stale content // being written again if the next batch is empty. - Status WriteAndClearBuffer() { + Status FlushToSink() { RETURN_NOT_OK(sink_->Write(data_buffer_)); return data_buffer_->Resize(0, /*shrink_to_fit=*/false); } @@ -661,7 +661,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter { next += options_.eol.size(); DCHECK_EQ(reinterpret_cast(next), data_buffer_->data() + data_buffer_->size()); - return WriteAndClearBuffer(); + return FlushToSink(); } Status TranslateMinimalBatch(const RecordBatch& batch) { diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index cf681c1fdcc..dce605c7156 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -2067,34 +2067,35 @@ def readinto(self, *args): read_csv(MyBytesIO(data)) -def test_write_csv_empty_batch_no_duplicate_header(): - # GH-36889: Empty batches at the start should not cause duplicate headers - table = pa.table({"col1": ["a", "b", "c"]}) - - # Concatenate empty table with data table - empty_table = table.schema.empty_table() - combined = pa.concat_tables([empty_table, table]) - - buf = io.BytesIO() - write_csv(combined, buf) - buf.seek(0) - result = buf.read() - - # Should have exactly one header, not two - assert result == b'"col1"\n"a"\n"b"\n"c"\n' - - -def test_write_csv_empty_batch_in_middle(): - # GH-36889: Empty batches in the middle should not cause issues - table1 = pa.table({"col1": ["a"]}) - table2 = pa.table({"col1": ["b"]}) - empty_table = table1.schema.empty_table() - - combined = pa.concat_tables([table1, empty_table, table2]) +@pytest.mark.parametrize("tables,expected", [ + # GH-36889: Empty batch at the beginning + ( + lambda: [pa.table({"col1": []}).cast(pa.schema([("col1", pa.string())])), + pa.table({"col1": ["a"]}), + pa.table({"col1": ["b"]})], + b'"col1"\n"a"\n"b"\n' + ), + # GH-36889: Empty batch in the middle + ( + lambda: [pa.table({"col1": ["a"]}), + pa.table({"col1": []}).cast(pa.schema([("col1", pa.string())])), + pa.table({"col1": ["b"]})], + b'"col1"\n"a"\n"b"\n' + ), + # GH-36889: Empty batch at the end + ( + lambda: [pa.table({"col1": ["a"]}), + pa.table({"col1": ["b"]}), + pa.table({"col1": []}).cast(pa.schema([("col1", pa.string())]))], + b'"col1"\n"a"\n"b"\n' + ), +]) +def test_write_csv_empty_batch_should_not_pollute_output(tables, expected): + combined = pa.concat_tables(tables()) buf = io.BytesIO() write_csv(combined, buf) buf.seek(0) result = buf.read() - assert result == b'"col1"\n"a"\n"b"\n' + assert result == expected