diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index af50588a856..81071300ee4 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -121,7 +121,6 @@ set(ARROW_SRCS io/hdfs_internal.cc io/interfaces.cc io/memory.cc - io/readahead.cc io/slow.cc testing/util.cc util/basic_decimal.cc @@ -129,6 +128,7 @@ set(ARROW_SRCS util/compression.cc util/cpu_info.cc util/decimal.cc + util/delimiting.cc util/formatting.cc util/int_util.cc util/io_util.cc diff --git a/cpp/src/arrow/csv/chunker.cc b/cpp/src/arrow/csv/chunker.cc index 92ccbbccc69..17de1828796 100644 --- a/cpp/src/arrow/csv/chunker.cc +++ b/cpp/src/arrow/csv/chunker.cc @@ -18,168 +18,248 @@ #include "arrow/csv/chunker.h" #include +#include +#include #include "arrow/status.h" #include "arrow/util/logging.h" +#include "arrow/util/stl.h" +#include "arrow/util/string_view.h" namespace arrow { namespace csv { namespace { -// Find the last newline character in the given data block. -// nullptr is returned if not found (like memchr()). -const char* FindNewlineReverse(const char* data, uint32_t size) { - if (size == 0) { - return nullptr; - } - const char* s = data + size - 1; - while (size > 0) { - if (*s == '\r' || *s == '\n') { - return s; - } - --s; - --size; - } - return nullptr; -} +// NOTE: csvmonkey (https://github.com/dw/csvmonkey) has optimization ideas -} // namespace +template +class Lexer { + public: + enum State { + FIELD_START, + IN_FIELD, + AT_ESCAPE, + IN_QUOTED_FIELD, + AT_QUOTED_QUOTE, + AT_QUOTED_ESCAPE + }; -Chunker::Chunker(ParseOptions options) : options_(options) {} + explicit Lexer(const ParseOptions& options) : options_(options) { + DCHECK_EQ(quoting, options_.quoting); + DCHECK_EQ(escaping, options_.escaping); + } -// NOTE: cvsmonkey (https://github.com/dw/csvmonkey) has optimization ideas + const char* ReadLine(const char* data, const char* data_end) { + // The parsing state machine + char c; + DCHECK_GT(data_end - data, 0); + if (ARROW_PREDICT_TRUE(state_ == FIELD_START)) { + goto FieldStart; + } + switch (state_) { + case FIELD_START: + goto FieldStart; + case IN_FIELD: + goto InField; + case AT_ESCAPE: + goto AtEscape; + case IN_QUOTED_FIELD: + goto InQuotedField; + case AT_QUOTED_QUOTE: + goto AtQuotedQuote; + case AT_QUOTED_ESCAPE: + goto AtQuotedEscape; + } -template -inline const char* Chunker::ReadLine(const char* data, const char* data_end) { - DCHECK_EQ(quoting, options_.quoting); - DCHECK_EQ(escaping, options_.escaping); + FieldStart: + // At the start of a field + if (ARROW_PREDICT_FALSE(data == data_end)) { + state_ = FIELD_START; + goto AbortLine; + } + // Quoting is only recognized at start of field + if (quoting && *data == options_.quote_char) { + data++; + goto InQuotedField; + } else { + goto InField; + } - // The parsing state machine - char c; + InField: + // Inside a non-quoted part of a field + if (ARROW_PREDICT_FALSE(data == data_end)) { + state_ = IN_FIELD; + goto AbortLine; + } + c = *data++; + if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) { + if (ARROW_PREDICT_FALSE(data == data_end)) { + state_ = AT_ESCAPE; + goto AbortLine; + } + data++; + goto InField; + } + if (ARROW_PREDICT_FALSE(c == '\r')) { + if (ARROW_PREDICT_TRUE(data != data_end) && *data == '\n') { + data++; + } + goto LineEnd; + } + if (ARROW_PREDICT_FALSE(c == '\n')) { + goto LineEnd; + } + if (ARROW_PREDICT_FALSE(c == options_.delimiter)) { + goto FieldEnd; + } + goto InField; -FieldStart: - // At the start of a field - // Quoting is only recognized at start of field - if (quoting && ARROW_PREDICT_TRUE(data != data_end) && *data == options_.quote_char) { + AtEscape: + // Coming here if last block ended on a non-quoted escape data++; - goto InQuotedField; - } else { goto InField; - } -InField: - // Inside a non-quoted part of a field - if (ARROW_PREDICT_FALSE(data == data_end)) { - goto AbortLine; - } - c = *data++; - if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) { + InQuotedField: + // Inside a quoted part of a field if (ARROW_PREDICT_FALSE(data == data_end)) { + state_ = IN_QUOTED_FIELD; goto AbortLine; } - data++; - goto InField; - } - if (ARROW_PREDICT_FALSE(c == '\r')) { - if (ARROW_PREDICT_TRUE(data != data_end) && *data == '\n') { + c = *data++; + if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) { + if (ARROW_PREDICT_FALSE(data == data_end)) { + state_ = AT_QUOTED_ESCAPE; + goto AbortLine; + } data++; + goto InQuotedField; } - goto LineEnd; - } - if (ARROW_PREDICT_FALSE(c == '\n')) { - goto LineEnd; - } - if (ARROW_PREDICT_FALSE(c == options_.delimiter)) { - goto FieldEnd; - } - goto InField; - -InQuotedField: - // Inside a quoted part of a field - if (ARROW_PREDICT_FALSE(data == data_end)) { - goto AbortLine; - } - c = *data++; - if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) { - if (data == data_end) { - goto AbortLine; + if (ARROW_PREDICT_FALSE(c == options_.quote_char)) { + if (ARROW_PREDICT_FALSE(data == data_end)) { + state_ = AT_QUOTED_QUOTE; + goto AbortLine; + } + if (options_.double_quote && *data == options_.quote_char) { + // Double-quoting + data++; + } else { + // End of single-quoting + goto InField; + } } + goto InQuotedField; + + AtQuotedEscape: + // Coming here if last block ended on a quoted escape data++; goto InQuotedField; - } - if (ARROW_PREDICT_FALSE(c == options_.quote_char)) { - if (options_.double_quote && data != data_end && *data == options_.quote_char) { + + AtQuotedQuote: + // Coming here if last block ended on a quoted quote + if (options_.double_quote && *data == options_.quote_char) { // Double-quoting data++; + goto InQuotedField; } else { // End of single-quoting goto InField; } - } - goto InQuotedField; -FieldEnd: - // At the end of a field - goto FieldStart; + FieldEnd: + // At the end of a field + goto FieldStart; -LineEnd: - return data; + LineEnd: + return data; -AbortLine: - // Truncated line at end of block - return nullptr; -} + AbortLine: + // Truncated line + return nullptr; + } + protected: + const ParseOptions& options_; + State state_ = FIELD_START; +}; + +// A BoundaryFinder implementation that assumes CSV cells can contain raw newlines, +// and uses actual CSV lexing to delimit them. template -Status Chunker::ProcessSpecialized(const char* start, uint32_t size, uint32_t* out_size) { - DCHECK_EQ(quoting, options_.quoting); - DCHECK_EQ(escaping, options_.escaping); +class LexingBoundaryFinder : public BoundaryFinder { + public: + explicit LexingBoundaryFinder(ParseOptions options) : options_(std::move(options)) {} + + Status FindFirst(util::string_view partial, util::string_view block, + int64_t* out_pos) override { + Lexer lexer(options_); - const char* data = start; - const char* data_end = start + size; + const char* line_end = + lexer.ReadLine(partial.data(), partial.data() + partial.size()); + DCHECK_EQ(line_end, nullptr); // Otherwise `partial` is a whole CSV line + line_end = lexer.ReadLine(block.data(), block.data() + block.size()); - while (data < data_end) { - const char* line_end = ReadLine(data, data_end); if (line_end == nullptr) { - // Cannot read any further - break; + // No complete CSV line + *out_pos = -1; + } else { + *out_pos = static_cast(line_end - block.data()); + DCHECK_GT(*out_pos, 0); } - data = line_end; + return Status::OK(); } - *out_size = static_cast(data - start); - return Status::OK(); -} -Status Chunker::Process(const char* start, uint32_t size, uint32_t* out_size) { - if (!options_.newlines_in_values) { - // In newlines are not accepted in CSV values, we can simply search for - // the last newline character. - // For common block sizes and CSV row sizes, this avoids reading - // most of the data block, making the chunker extremely fast compared - // to the rest of the CSV reading pipeline. - const char* nl = FindNewlineReverse(start, size); - if (nl == nullptr) { - *out_size = 0; + Status FindLast(util::string_view block, int64_t* out_pos) override { + Lexer lexer(options_); + + const char* data = block.data(); + const char* const data_end = block.data() + block.size(); + + while (data < data_end) { + const char* line_end = lexer.ReadLine(data, data_end); + if (line_end == nullptr) { + // Cannot read any further + break; + } + DCHECK_GT(line_end, data); + data = line_end; + } + if (data == block.data()) { + // No complete CSV line + *out_pos = -1; } else { - *out_size = static_cast(nl - start + 1); + *out_pos = static_cast(data - block.data()); + DCHECK_GT(*out_pos, 0); } return Status::OK(); } - if (options_.quoting) { - if (options_.escaping) { - return ProcessSpecialized(start, size, out_size); - } else { - return ProcessSpecialized(start, size, out_size); - } + protected: + ParseOptions options_; +}; + +} // namespace + +std::unique_ptr MakeChunker(const ParseOptions& options) { + std::shared_ptr delimiter; + if (!options.newlines_in_values) { + delimiter = MakeNewlineBoundaryFinder(); } else { - if (options_.escaping) { - return ProcessSpecialized(start, size, out_size); + if (options.quoting) { + if (options.escaping) { + delimiter = std::make_shared>(options); + } else { + delimiter = std::make_shared>(options); + } } else { - return ProcessSpecialized(start, size, out_size); + if (options.escaping) { + delimiter = std::make_shared>(options); + } else { + delimiter = std::make_shared>(options); + } } } + return internal::make_unique(std::move(delimiter)); } } // namespace csv diff --git a/cpp/src/arrow/csv/chunker.h b/cpp/src/arrow/csv/chunker.h index 6c61632614c..ba6077e27c5 100644 --- a/cpp/src/arrow/csv/chunker.h +++ b/cpp/src/arrow/csv/chunker.h @@ -19,49 +19,19 @@ #define ARROW_CSV_CHUNKER_H #include +#include #include "arrow/csv/options.h" #include "arrow/status.h" +#include "arrow/util/delimiting.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" namespace arrow { namespace csv { -/// \class Chunker -/// \brief A reusable block-based chunker for CSV data -/// -/// The chunker takes a block of CSV data and finds a suitable place -/// to cut it up without splitting a row. -/// If the block is truncated (i.e. not all data can be chunked), it is up -/// to the caller to arrange the next block to start with the trailing data. -/// -/// Note: if the previous block ends with CR (0x0d) and a new block starts -/// with LF (0x0a), the chunker will consider the leading newline as an empty line. -class ARROW_EXPORT Chunker { - public: - explicit Chunker(ParseOptions options); - - /// \brief Carve up a chunk in a block of data - /// - /// Process a block of CSV data, reading up to size bytes. - /// The number of bytes in the chunk is returned in out_size. - Status Process(const char* data, uint32_t size, uint32_t* out_size); - - protected: - ARROW_DISALLOW_COPY_AND_ASSIGN(Chunker); - - // Like Process(), but specialized for some parsing options - template - Status ProcessSpecialized(const char* data, uint32_t size, uint32_t* out_size); - - // Detect a single line from the data pointer. Return the line end, - // or nullptr if the remaining line is truncated. - template - inline const char* ReadLine(const char* data, const char* data_end); - - ParseOptions options_; -}; +ARROW_EXPORT +std::unique_ptr MakeChunker(const ParseOptions& options); } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/chunker_test.cc b/cpp/src/arrow/csv/chunker_test.cc index 951e543247c..85bdf6f728e 100644 --- a/cpp/src/arrow/csv/chunker_test.cc +++ b/cpp/src/arrow/csv/chunker_test.cc @@ -16,11 +16,13 @@ // under the License. #include +#include #include #include #include +#include "arrow/buffer.h" #include "arrow/csv/chunker.h" #include "arrow/csv/options.h" #include "arrow/csv/test_common.h" @@ -30,25 +32,28 @@ namespace arrow { namespace csv { void AssertChunkSize(Chunker& chunker, const std::string& str, uint32_t chunk_size) { - uint32_t actual_chunk_size; - ASSERT_OK( - chunker.Process(str.data(), static_cast(str.size()), &actual_chunk_size)); + std::shared_ptr block, whole, partial; + block = std::make_shared(reinterpret_cast(str.data()), + static_cast(str.size())); + ASSERT_OK(chunker.Process(block, &whole, &partial)); + ASSERT_EQ(block->size(), whole->size() + partial->size()); + auto actual_chunk_size = static_cast(whole->size()); ASSERT_EQ(actual_chunk_size, chunk_size); } template void AssertChunking(Chunker& chunker, const std::string& str, - const IntContainer& lengths) { + const IntContainer& expected_lengths) { uint32_t expected_chunk_size; // First chunkize whole CSV block - expected_chunk_size = - static_cast(std::accumulate(lengths.begin(), lengths.end(), 0ULL)); + expected_chunk_size = static_cast( + std::accumulate(expected_lengths.begin(), expected_lengths.end(), 0ULL)); AssertChunkSize(chunker, str, expected_chunk_size); // Then chunkize incomplete substrings of the block expected_chunk_size = 0; - for (const auto length : lengths) { + for (const auto length : expected_lengths) { AssertChunkSize(chunker, str.substr(0, expected_chunk_size + length - 1), expected_chunk_size); @@ -64,7 +69,10 @@ class BaseChunkerTest : public ::testing::TestWithParam { options_.newlines_in_values = GetParam(); } + void MakeChunker() { chunker_ = ::arrow::csv::MakeChunker(options_); } + ParseOptions options_; + std::unique_ptr chunker_; }; INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true)); @@ -74,42 +82,42 @@ INSTANTIATE_TEST_CASE_P(NoNewlineChunkerTest, BaseChunkerTest, ::testing::Values TEST_P(BaseChunkerTest, Basics) { auto csv = MakeCSVData({"ab,c,\n", "def,,gh\n", ",ij,kl\n"}); auto lengths = {6, 8, 7}; - Chunker chunker(options_); - AssertChunking(chunker, csv, lengths); + MakeChunker(); + AssertChunking(*chunker_, csv, lengths); } TEST_P(BaseChunkerTest, Empty) { - Chunker chunker(options_); + MakeChunker(); { auto csv = MakeCSVData({"\n"}); auto lengths = {1}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { auto csv = MakeCSVData({"\n\n"}); auto lengths = {1, 1}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { auto csv = MakeCSVData({",\n"}); auto lengths = {2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { auto csv = MakeCSVData({",\n,\n"}); auto lengths = {2, 2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } } TEST_P(BaseChunkerTest, Newlines) { - Chunker chunker(options_); + MakeChunker(); { auto csv = MakeCSVData({"a\n", "b\r", "c,d\r\n"}); - AssertChunkSize(chunker, csv, static_cast(csv.size())); + AssertChunkSize(*chunker_, csv, static_cast(csv.size())); // Trailing \n after \r is optional - AssertChunkSize(chunker, csv.substr(0, csv.size() - 1), + AssertChunkSize(*chunker_, csv.substr(0, csv.size() - 1), static_cast(csv.size() - 1)); } } @@ -117,30 +125,30 @@ TEST_P(BaseChunkerTest, Newlines) { TEST_P(BaseChunkerTest, QuotingSimple) { auto csv = MakeCSVData({"1,\",3,\",5\n"}); { - Chunker chunker(options_); + MakeChunker(); auto lengths = {csv.size()}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { options_.quoting = false; - Chunker chunker(options_); + MakeChunker(); auto lengths = {csv.size()}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } } TEST_P(BaseChunkerTest, QuotingNewline) { auto csv = MakeCSVData({"a,\"c \n d\",e\n"}); if (options_.newlines_in_values) { - Chunker chunker(options_); + MakeChunker(); auto lengths = {12}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { options_.quoting = false; - Chunker chunker(options_); + MakeChunker(); auto lengths = {6, 6}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } } @@ -148,60 +156,60 @@ TEST_P(BaseChunkerTest, QuotingUnbalanced) { // Quote introduces a quoted field that doesn't end auto csv = MakeCSVData({"a,b\n", "1,\",3,,5\n", "c,d\n"}); if (options_.newlines_in_values) { - Chunker chunker(options_); + MakeChunker(); auto lengths = {4}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { options_.quoting = false; - Chunker chunker(options_); + MakeChunker(); auto lengths = {4, 9, 4}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } } TEST_P(BaseChunkerTest, QuotingEmpty) { - Chunker chunker(options_); + MakeChunker(); { auto csv = MakeCSVData({"\"\"\n", "a\n"}); auto lengths = {3, 2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { auto csv = MakeCSVData({",\"\"\n", "a\n"}); auto lengths = {4, 2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { auto csv = MakeCSVData({"\"\",\n", "a\n"}); auto lengths = {4, 2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } } TEST_P(BaseChunkerTest, QuotingDouble) { { - Chunker chunker(options_); + MakeChunker(); // 4 quotes is a quoted quote auto csv = MakeCSVData({"\"\"\"\"\n", "a\n"}); auto lengths = {5, 2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } } TEST_P(BaseChunkerTest, QuotesSpecial) { // Some non-trivial cases { - Chunker chunker(options_); + MakeChunker(); auto csv = MakeCSVData({"a,b\"c,d\n", "e\n"}); auto lengths = {8, 2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } { - Chunker chunker(options_); + MakeChunker(); auto csv = MakeCSVData({"a,\"b\" \"c\",d\n", "e\n"}); auto lengths = {12, 2}; - AssertChunking(chunker, csv, lengths); + AssertChunking(*chunker_, csv, lengths); } } @@ -211,13 +219,13 @@ TEST_P(BaseChunkerTest, Escaping) { auto lengths = {6, 2}; { options_.escaping = false; - Chunker chunker(options_); - AssertChunking(chunker, csv, lengths); + MakeChunker(); + AssertChunking(*chunker_, csv, lengths); } { options_.escaping = true; - Chunker chunker(options_); - AssertChunking(chunker, csv, lengths); + MakeChunker(); + AssertChunking(*chunker_, csv, lengths); } } { @@ -225,13 +233,13 @@ TEST_P(BaseChunkerTest, Escaping) { auto lengths = {7, 2}; { options_.escaping = false; - Chunker chunker(options_); - AssertChunking(chunker, csv, lengths); + MakeChunker(); + AssertChunking(*chunker_, csv, lengths); } { options_.escaping = true; - Chunker chunker(options_); - AssertChunking(chunker, csv, lengths); + MakeChunker(); + AssertChunking(*chunker_, csv, lengths); } } } @@ -241,14 +249,14 @@ TEST_P(BaseChunkerTest, EscapingNewline) { auto csv = MakeCSVData({"a\\\nb\n", "c\n"}); { auto lengths = {3, 2, 2}; - Chunker chunker(options_); - AssertChunking(chunker, csv, lengths); + MakeChunker(); + AssertChunking(*chunker_, csv, lengths); } options_.escaping = true; { auto lengths = {5, 2}; - Chunker chunker(options_); - AssertChunking(chunker, csv, lengths); + MakeChunker(); + AssertChunking(*chunker_, csv, lengths); } } } diff --git a/cpp/src/arrow/csv/column_builder_test.cc b/cpp/src/arrow/csv/column_builder_test.cc index 540b1bc4882..27b242f8bfb 100644 --- a/cpp/src/arrow/csv/column_builder_test.cc +++ b/cpp/src/arrow/csv/column_builder_test.cc @@ -121,6 +121,33 @@ TEST(NullColumnBuilder, InsertTyped) { AssertChunkedEqual(*actual, *expected); } +TEST(NullColumnBuilder, EmptyChunks) { + std::shared_ptr type = int16(); + auto tg = TaskGroup::MakeSerial(); + + std::shared_ptr builder; + ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder)); + + std::shared_ptr parser; + std::shared_ptr actual, expected; + // Those values are indifferent, only the number of rows is used + MakeColumnParser({}, &parser); + builder->Insert(0, parser); + MakeColumnParser({"abc", "def"}, &parser); + builder->Insert(1, parser); + MakeColumnParser({}, &parser); + builder->Insert(2, parser); + ASSERT_OK(builder->task_group()->Finish()); + ASSERT_OK(builder->Finish(&actual)); + ASSERT_OK(actual->Validate()); + + auto chunks = + ArrayVector{ArrayFromJSON(type, "[]"), ArrayFromJSON(type, "[null, null]"), + ArrayFromJSON(type, "[]")}; + expected = std::make_shared(chunks); + AssertChunkedEqual(*actual, *expected); +} + ////////////////////////////////////////////////////////////////////////// // Tests for fixed-type column builder @@ -205,6 +232,21 @@ TEST(ColumnBuilder, MultipleChunksParallel) { AssertChunkedEqual(*actual, *expected); } +TEST(ColumnBuilder, EmptyChunks) { + auto options = ConvertOptions::Defaults(); + auto tg = TaskGroup::MakeSerial(); + std::shared_ptr builder; + ASSERT_OK( + ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); + + std::shared_ptr actual; + AssertBuilding(builder, {{}, {"1", "2"}, {}}, &actual); + + std::shared_ptr expected; + ChunkedArrayFromVector({{}, {1, 2}, {}}, &expected); + AssertChunkedEqual(*actual, *expected); +} + ////////////////////////////////////////////////////////////////////////// // Tests for type-inferring column builder diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc index 8d085dc21ae..7b06bce5ad0 100644 --- a/cpp/src/arrow/csv/parser.cc +++ b/cpp/src/arrow/csv/parser.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "arrow/memory_pool.h" @@ -405,8 +406,8 @@ Status BlockParser::ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed } template -Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is_final, - uint32_t* out_size) { +Status BlockParser::DoParseSpecialized(const std::vector& views, + bool is_final, uint32_t* out_size) { num_rows_ = 0; values_size_ = 0; parsed_size_ = 0; @@ -414,47 +415,66 @@ Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is parsed_buffer_.reset(); parsed_ = nullptr; - const char* data = start; - const char* data_end = start + size; - bool finished_parsing = false; + size_t total_view_length = 0; + for (const auto& view : views) { + total_view_length += view.length(); + } + if (total_view_length > std::numeric_limits::max()) { + return Status::Invalid("CSV block too large"); + } - PresizedParsedWriter parsed_writer(pool_, size); + PresizedParsedWriter parsed_writer(pool_, static_cast(total_view_length)); + uint32_t total_parsed_length = 0; + + for (const auto& view : views) { + const char* data = view.data(); + const char* data_end = view.data() + view.length(); + bool finished_parsing = false; - if (num_cols_ == -1) { - // Can't presize values when the number of columns is not known, first parse - // a single line - const int32_t rows_in_chunk = 1; - ResizableValuesWriter values_writer(pool_); - values_writer.Start(parsed_writer); - - RETURN_NOT_OK(ParseChunk(&values_writer, &parsed_writer, data, - data_end, is_final, rows_in_chunk, &data, - &finished_parsing)); if (num_cols_ == -1) { - return ParseError("Empty CSV file or block: cannot infer number of columns"); + // Can't presize values when the number of columns is not known, first parse + // a single line + const int32_t rows_in_chunk = 1; + ResizableValuesWriter values_writer(pool_); + values_writer.Start(parsed_writer); + + RETURN_NOT_OK(ParseChunk(&values_writer, &parsed_writer, data, + data_end, is_final, rows_in_chunk, + &data, &finished_parsing)); + if (num_cols_ == -1) { + return ParseError("Empty CSV file or block: cannot infer number of columns"); + } } - } - while (!finished_parsing && data < data_end && num_rows_ < max_num_rows_) { - // We know the number of columns, so can presize a values array for - // a given number of rows - DCHECK_GE(num_cols_, 0); + while (!finished_parsing && data < data_end && num_rows_ < max_num_rows_) { + // We know the number of columns, so can presize a values array for + // a given number of rows + DCHECK_GE(num_cols_, 0); + + int32_t rows_in_chunk; + constexpr int32_t kTargetChunkSize = 32768; + if (num_cols_ > 0) { + rows_in_chunk = std::min(std::max(kTargetChunkSize / num_cols_, 512), + max_num_rows_ - num_rows_); + } else { + rows_in_chunk = std::min(kTargetChunkSize, max_num_rows_ - num_rows_); + } - int32_t rows_in_chunk; - constexpr int32_t kTargetChunkSize = 32768; - if (num_cols_ > 0) { - rows_in_chunk = std::min(std::max(kTargetChunkSize / num_cols_, 512), - max_num_rows_ - num_rows_); - } else { - rows_in_chunk = std::min(kTargetChunkSize, max_num_rows_ - num_rows_); - } + PresizedValuesWriter values_writer(pool_, rows_in_chunk, num_cols_); + values_writer.Start(parsed_writer); - PresizedValuesWriter values_writer(pool_, rows_in_chunk, num_cols_); - values_writer.Start(parsed_writer); + RETURN_NOT_OK(ParseChunk(&values_writer, &parsed_writer, data, + data_end, is_final, rows_in_chunk, + &data, &finished_parsing)); + } + DCHECK_GE(data, view.data()); + DCHECK_LE(data, data_end); + total_parsed_length += static_cast(data - view.data()); - RETURN_NOT_OK(ParseChunk(&values_writer, &parsed_writer, data, - data_end, is_final, rows_in_chunk, &data, - &finished_parsing)); + if (data < data_end) { + // Stopped early, for some reason + break; + } } parsed_writer.Finish(&parsed_buffer_); @@ -478,37 +498,46 @@ Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is DCHECK_EQ(parsed_size_, 0); } #endif - *out_size = static_cast(data - start); + *out_size = static_cast(total_parsed_length); return Status::OK(); } -Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final, +Status BlockParser::DoParse(const std::vector& data, bool is_final, uint32_t* out_size) { if (options_.quoting) { if (options_.escaping) { - return DoParseSpecialized>(start, size, is_final, - out_size); + return DoParseSpecialized>(data, is_final, out_size); } else { - return DoParseSpecialized>(start, size, is_final, + return DoParseSpecialized>(data, is_final, out_size); } } else { if (options_.escaping) { - return DoParseSpecialized>(start, size, is_final, + return DoParseSpecialized>(data, is_final, out_size); } else { - return DoParseSpecialized>(start, size, is_final, + return DoParseSpecialized>(data, is_final, out_size); } } } -Status BlockParser::Parse(const char* data, uint32_t size, uint32_t* out_size) { - return DoParse(data, size, false /* is_final */, out_size); +Status BlockParser::Parse(const std::vector& data, + uint32_t* out_size) { + return DoParse(data, false /* is_final */, out_size); +} + +Status BlockParser::ParseFinal(const std::vector& data, + uint32_t* out_size) { + return DoParse(data, true /* is_final */, out_size); +} + +Status BlockParser::Parse(util::string_view data, uint32_t* out_size) { + return DoParse({data}, false /* is_final */, out_size); } -Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_size) { - return DoParse(data, size, true /* is_final */, out_size); +Status BlockParser::ParseFinal(util::string_view data, uint32_t* out_size) { + return DoParse({data}, true /* is_final */, out_size); } BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols, diff --git a/cpp/src/arrow/csv/parser.h b/cpp/src/arrow/csv/parser.h index 60ad4c2f1ec..f7c9e9fb579 100644 --- a/cpp/src/arrow/csv/parser.h +++ b/cpp/src/arrow/csv/parser.h @@ -27,6 +27,7 @@ #include "arrow/csv/options.h" #include "arrow/status.h" #include "arrow/util/macros.h" +#include "arrow/util/string_view.h" #include "arrow/util/visibility.h" namespace arrow { @@ -67,13 +68,23 @@ class ARROW_EXPORT BlockParser { /// /// Parse a block of CSV data, ingesting up to max_num_rows rows. /// The number of bytes actually parsed is returned in out_size. - Status Parse(const char* data, uint32_t size, uint32_t* out_size); + Status Parse(util::string_view data, uint32_t* out_size); + + /// \brief Parse sequential blocks of data + /// + /// Only the last block is allowed to be truncated. + Status Parse(const std::vector& data, uint32_t* out_size); /// \brief Parse the final block of data /// /// Like Parse(), but called with the final block in a file. /// The last row may lack a trailing line separator. - Status ParseFinal(const char* data, uint32_t size, uint32_t* out_size); + Status ParseFinal(util::string_view data, uint32_t* out_size); + + /// \brief Parse the final sequential blocks of data + /// + /// Only the last block is allowed to be truncated. + Status ParseFinal(const std::vector& data, uint32_t* out_size); /// \brief Return the number of parsed rows int32_t num_rows() const { return num_rows_; } @@ -121,9 +132,10 @@ class ARROW_EXPORT BlockParser { protected: ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser); - Status DoParse(const char* data, uint32_t size, bool is_final, uint32_t* out_size); + Status DoParse(const std::vector& data, bool is_final, + uint32_t* out_size); template - Status DoParseSpecialized(const char* data, uint32_t size, bool is_final, + Status DoParseSpecialized(const std::vector& data, bool is_final, uint32_t* out_size); template diff --git a/cpp/src/arrow/csv/parser_benchmark.cc b/cpp/src/arrow/csv/parser_benchmark.cc index bb84f8eb59c..3012754ee0b 100644 --- a/cpp/src/arrow/csv/parser_benchmark.cc +++ b/cpp/src/arrow/csv/parser_benchmark.cc @@ -17,6 +17,7 @@ #include "benchmark/benchmark.h" +#include #include #include @@ -24,6 +25,7 @@ #include "arrow/csv/options.h" #include "arrow/csv/parser.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/string_view.h" namespace arrow { namespace csv { @@ -45,16 +47,16 @@ static std::string BuildCSVData(const std::string& row, int32_t repeat) { static void BenchmarkCSVChunking(benchmark::State& state, // NOLINT non-const reference const std::string& csv, ParseOptions options) { - Chunker chunker(options); - const uint32_t csv_size = static_cast(csv.size()); + auto chunker = MakeChunker(options); + auto block = std::make_shared(util::string_view(csv)); while (state.KeepRunning()) { - uint32_t chunk_size = 0; - ABORT_NOT_OK(chunker.Process(csv.data(), csv_size, &chunk_size)); - benchmark::DoNotOptimize(chunk_size); + std::shared_ptr whole, partial; + ABORT_NOT_OK(chunker->Process(block, &whole, &partial)); + benchmark::DoNotOptimize(whole->size()); } - state.SetBytesProcessed(state.iterations() * csv_size); + state.SetBytesProcessed(state.iterations() * csv.length()); } static void ChunkCSVQuotedBlock(benchmark::State& state) { // NOLINT non-const reference @@ -95,11 +97,10 @@ static void BenchmarkCSVParsing(benchmark::State& state, // NOLINT non-const re const std::string& csv, int32_t rows, ParseOptions options) { BlockParser parser(options, -1, rows + 1); - const uint32_t csv_size = static_cast(csv.size()); while (state.KeepRunning()) { uint32_t parsed_size = 0; - ABORT_NOT_OK(parser.Parse(csv.data(), csv_size, &parsed_size)); + ABORT_NOT_OK(parser.Parse(util::string_view(csv), &parsed_size)); // Include performance of visiting the parsed values, as that might // vary depending on the parser's internal data structures. @@ -117,7 +118,7 @@ static void BenchmarkCSVParsing(benchmark::State& state, // NOLINT non-const re } } - state.SetBytesProcessed(state.iterations() * csv_size); + state.SetBytesProcessed(state.iterations() * csv.size()); } static void ParseCSVQuotedBlock(benchmark::State& state) { // NOLINT non-const reference diff --git a/cpp/src/arrow/csv/parser_test.cc b/cpp/src/arrow/csv/parser_test.cc index c0c7befd6fe..f988f3ce21a 100644 --- a/cpp/src/arrow/csv/parser_test.cc +++ b/cpp/src/arrow/csv/parser_test.cc @@ -119,16 +119,20 @@ void GetLastRow(const BlockParser& parser, std::vector* out, } } +size_t TotalViewLength(const std::vector& views) { + size_t total_view_length = 0; + for (const auto& view : views) { + total_view_length += view.length(); + } + return total_view_length; +} + Status Parse(BlockParser& parser, const std::string& str, uint32_t* out_size) { - const char* data = str.data(); - uint32_t size = static_cast(str.length()); - return parser.Parse(data, size, out_size); + return parser.Parse(util::string_view(str), out_size); } Status ParseFinal(BlockParser& parser, const std::string& str, uint32_t* out_size) { - const char* data = str.data(); - uint32_t size = static_cast(str.length()); - return parser.ParseFinal(data, size, out_size); + return parser.ParseFinal(util::string_view(str), out_size); } void AssertParseOk(BlockParser& parser, const std::string& str) { @@ -137,12 +141,24 @@ void AssertParseOk(BlockParser& parser, const std::string& str) { ASSERT_EQ(parsed_size, str.size()); } +void AssertParseOk(BlockParser& parser, const std::vector& data) { + uint32_t parsed_size = static_cast(-1); + ASSERT_OK(parser.Parse(data, &parsed_size)); + ASSERT_EQ(parsed_size, TotalViewLength(data)); +} + void AssertParseFinal(BlockParser& parser, const std::string& str) { uint32_t parsed_size = static_cast(-1); ASSERT_OK(ParseFinal(parser, str, &parsed_size)); ASSERT_EQ(parsed_size, str.size()); } +void AssertParseFinal(BlockParser& parser, const std::vector& data) { + uint32_t parsed_size = static_cast(-1); + ASSERT_OK(parser.ParseFinal(data, &parsed_size)); + ASSERT_EQ(parsed_size, TotalViewLength(data)); +} + void AssertParsePartial(BlockParser& parser, const std::string& str, uint32_t expected_size) { uint32_t parsed_size = static_cast(-1); @@ -211,11 +227,21 @@ void AssertColumnsEq(const BlockParser& parser, } TEST(BlockParser, Basics) { - auto csv = MakeCSVData({"ab,cd,\n", "ef,,gh\n", ",ij,kl\n"}); - BlockParser parser(ParseOptions::Defaults()); - AssertParseOk(parser, csv); - AssertColumnsEq(parser, {{"ab", "ef", ""}, {"cd", "", "ij"}, {"", "gh", "kl"}}); - AssertLastRowEq(parser, {"", "ij", "kl"}, {false, false, false}); + { + auto csv = MakeCSVData({"ab,cd,\n", "ef,,gh\n", ",ij,kl\n"}); + BlockParser parser(ParseOptions::Defaults()); + AssertParseOk(parser, csv); + AssertColumnsEq(parser, {{"ab", "ef", ""}, {"cd", "", "ij"}, {"", "gh", "kl"}}); + AssertLastRowEq(parser, {"", "ij", "kl"}, {false, false, false}); + } + { + auto csv1 = MakeCSVData({"ab,cd,\n", "ef,,gh\n"}); + auto csv2 = MakeCSVData({",ij,kl\n"}); + BlockParser parser(ParseOptions::Defaults()); + AssertParseOk(parser, {csv1, csv2}); + AssertColumnsEq(parser, {{"ab", "ef", ""}, {"cd", "", "ij"}, {"", "gh", "kl"}}); + AssertLastRowEq(parser, {"", "ij", "kl"}, {false, false, false}); + } } TEST(BlockParser, EmptyHeader) { @@ -360,6 +386,12 @@ TEST(BlockParser, Final) { csv = MakeCSVData({"ab,cd"}); AssertParseFinal(parser, csv); AssertColumnsEq(parser, {{"ab"}, {"cd"}}); + + // Two blocks + auto csv1 = MakeCSVData({"ab,cd\n"}); + auto csv2 = MakeCSVData({"ef,"}); + AssertParseFinal(parser, {csv1, csv2}); + AssertColumnsEq(parser, {{"ab", "ef"}, {"cd", ""}}); } TEST(BlockParser, FinalTruncatedData) { diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index da6d1eea20a..5d90f6620a5 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -32,10 +32,11 @@ #include "arrow/csv/column_builder.h" #include "arrow/csv/options.h" #include "arrow/csv/parser.h" -#include "arrow/io/readahead.h" +#include "arrow/io/interfaces.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" +#include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/task_group.h" @@ -56,103 +57,69 @@ namespace csv { using internal::GetCpuThreadPool; using internal::ThreadPool; -using io::internal::ReadaheadBuffer; -using io::internal::ReadaheadSpooler; - -static constexpr int64_t kDefaultLeftPadding = 2048; // 2 kB -static constexpr int64_t kDefaultRightPadding = 16; ///////////////////////////////////////////////////////////////////////// // Base class for common functionality class BaseTableReader : public csv::TableReader { public: - BaseTableReader(MemoryPool* pool, const ReadOptions& read_options, - const ParseOptions& parse_options, + BaseTableReader(MemoryPool* pool, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) : pool_(pool), read_options_(read_options), parse_options_(parse_options), - convert_options_(convert_options) {} - - protected: - Status ReadFirstBlock() { - RETURN_NOT_OK(ReadNextBlock()); - const uint8_t* data; - RETURN_NOT_OK(util::SkipUTF8BOM(cur_data_, cur_size_, &data)); - cur_size_ -= data - cur_data_; - cur_data_ = data; - return Status::OK(); - } - - // Read a next data block, stitch it to trailing data - Status ReadNextBlock() { - bool trailing_data = cur_size_ > 0; - ReadaheadBuffer rh; + convert_options_(convert_options), + input_(std::move(input)) {} - if (trailing_data) { - if (readahead_->GetLeftPadding() < cur_size_) { - // Growth heuristic to try and ensure sufficient left padding - // in subsequent reads - readahead_->SetLeftPadding(cur_size_ * 3 / 2); - } - } + virtual Status Init() = 0; - RETURN_NOT_OK(readahead_->Read(&rh)); - if (!rh.buffer) { - // EOF, let caller finish with existing data - eof_ = true; + protected: + Status ReadNextBlock(bool first_block, std::shared_ptr* out) { + std::shared_ptr buf; + RETURN_NOT_OK(block_iterator_.Next(&buf)); + if (buf == nullptr) { + // EOF + out->reset(); return Status::OK(); } - - std::shared_ptr new_block = rh.buffer; - uint8_t* new_data = rh.buffer->mutable_data() + rh.left_padding; - int64_t new_size = rh.buffer->size() - rh.left_padding - rh.right_padding; - DCHECK_GT(new_size, 0); // ensured by ReadaheadSpooler - - if (trailing_cr_ && new_data[0] == '\n') { + int64_t offset = 0; + if (first_block) { + const uint8_t* data; + RETURN_NOT_OK(util::SkipUTF8BOM(buf->data(), buf->size(), &data)); + offset += data - buf->data(); + DCHECK_GE(offset, 0); + } + if (trailing_cr_ && buf->data()[offset] == '\n') { // Skip '\r\n' line separator that started at the end of previous block - ++new_data; - --new_size; + ++offset; } - trailing_cr_ = (new_data[new_size - 1] == '\r'); - - if (trailing_data) { - // Try to copy trailing data at the beginning of new block - if (cur_size_ <= rh.left_padding) { - // Can left-extend new block inside padding area - new_data -= cur_size_; - new_size += cur_size_; - std::memcpy(new_data, cur_data_, cur_size_); - } else { - // Need to allocate bigger block and concatenate trailing + present data - RETURN_NOT_OK( - AllocateBuffer(pool_, cur_size_ + new_size + rh.right_padding, &new_block)); - std::memcpy(new_block->mutable_data(), cur_data_, cur_size_); - std::memcpy(new_block->mutable_data() + cur_size_, new_data, new_size); - std::memset(new_block->mutable_data() + cur_size_ + new_size, 0, - rh.right_padding); - new_data = new_block->mutable_data(); - new_size = cur_size_ + new_size; - } + trailing_cr_ = (buf->data()[buf->size() - 1] == '\r'); + buf = SliceBuffer(buf, offset); + if (buf->size() == 0) { + // EOF + out->reset(); + } else { + *out = std::move(buf); } - cur_block_ = new_block; - cur_data_ = new_data; - cur_size_ = new_size; return Status::OK(); } - // Read header and column names from current block, create column builders - Status ProcessHeader() { - DCHECK_GT(cur_size_, 0); + Status ReadNextBlock(std::shared_ptr* out) { return ReadNextBlock(false, out); } + + Status ReadFirstBlock(std::shared_ptr* out) { return ReadNextBlock(true, out); } + + // Read header and column names from buffer, create column builders + Status ProcessHeader(const std::shared_ptr& buf, + std::shared_ptr* rest) { + const uint8_t* data = buf->data(); + const auto data_end = data + buf->size(); + DCHECK_GT(data_end - data, 0); if (read_options_.skip_rows) { // Skip initial rows (potentially invalid CSV data) - auto data = cur_data_; - auto num_skipped_rows = SkipRows(cur_data_, static_cast(cur_size_), + auto num_skipped_rows = SkipRows(data, static_cast(data_end - data), read_options_.skip_rows, &data); - cur_size_ -= data - cur_data_; - cur_data_ = data; if (num_skipped_rows < read_options_.skip_rows) { return Status::Invalid( "Could not skip initial ", read_options_.skip_rows, @@ -165,8 +132,9 @@ class BaseTableReader : public csv::TableReader { // Parse one row (either to read column names or to know the number of columns) BlockParser parser(pool_, parse_options_, num_csv_cols_, 1); uint32_t parsed_size = 0; - RETURN_NOT_OK(parser.Parse(reinterpret_cast(cur_data_), - static_cast(cur_size_), &parsed_size)); + RETURN_NOT_OK(parser.Parse( + util::string_view(reinterpret_cast(data), data_end - data), + &parsed_size)); if (parser.num_rows() != 1) { return Status::Invalid( "Could not read first row from CSV file, either " @@ -187,12 +155,12 @@ class BaseTableReader : public csv::TableReader { RETURN_NOT_OK(parser.VisitLastRow(visit)); DCHECK_EQ(static_cast(parser.num_cols()), column_names_.size()); // Skip parsed header row - cur_data_ += parsed_size; - cur_size_ -= parsed_size; + data += parsed_size; } } else { column_names_ = read_options_.column_names; } + *rest = SliceBuffer(buf, data - buf->data()); num_csv_cols_ = static_cast(column_names_.size()); DCHECK_GT(num_csv_cols_, 0); @@ -287,6 +255,40 @@ class BaseTableReader : public csv::TableReader { return res; } + Status ParseAndInsert(const std::shared_ptr& partial, + const std::shared_ptr& completion, + const std::shared_ptr& block, int64_t block_index, + bool is_final, uint32_t* out_parsed_size = nullptr) { + static constexpr int32_t max_num_rows = std::numeric_limits::max(); + auto parser = + std::make_shared(pool_, parse_options_, num_csv_cols_, max_num_rows); + + std::shared_ptr straddling; + std::vector views; + if (partial->size() != 0 || completion->size() != 0) { + if (partial->size() == 0) { + straddling = completion; + } else if (completion->size() == 0) { + straddling = partial; + } else { + RETURN_NOT_OK(ConcatenateBuffers({partial, completion}, pool_, &straddling)); + } + views = {util::string_view(*straddling), util::string_view(*block)}; + } else { + views = {util::string_view(*block)}; + } + uint32_t parsed_size; + if (is_final) { + RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); + } else { + RETURN_NOT_OK(parser->Parse(views, &parsed_size)); + } + if (out_parsed_size) { + *out_parsed_size = parsed_size; + } + return ProcessData(parser, block_index); + } + // Trigger conversion of parsed block data Status ProcessData(const std::shared_ptr& parser, int64_t block_index) { for (auto& builder : column_builders_) { @@ -325,20 +327,12 @@ class BaseTableReader : public csv::TableReader { // Names of columns, in same order as column_builders_ std::vector builder_names_; - std::shared_ptr readahead_; + std::shared_ptr input_; + Iterator> block_iterator_; std::shared_ptr task_group_; - // Current block and data pointer - std::shared_ptr cur_block_; - const uint8_t* cur_data_ = nullptr; - int64_t cur_size_ = 0; - // Index of current block inside data stream - int64_t cur_block_index_ = 0; // Whether there was a trailing CR at the end of last parsed line bool trailing_cr_ = false; - // Whether we reached input stream EOF. There may still be data left to - // process in current block. - bool eof_ = false; }; ///////////////////////////////////////////////////////////////////////// @@ -346,57 +340,56 @@ class BaseTableReader : public csv::TableReader { class SerialTableReader : public BaseTableReader { public: - SerialTableReader(MemoryPool* pool, std::shared_ptr input, - const ReadOptions& read_options, const ParseOptions& parse_options, - const ConvertOptions& convert_options) - : BaseTableReader(pool, read_options, parse_options, convert_options) { + using BaseTableReader::BaseTableReader; + + Status Init() override { + RETURN_NOT_OK( + io::MakeInputStreamIterator(input_, read_options_.block_size, &block_iterator_)); // Since we're converting serially, no need to readahead more than one block - int32_t block_queue_size = 1; - readahead_ = std::make_shared( - pool_, input, read_options_.block_size, block_queue_size, kDefaultLeftPadding, - kDefaultRightPadding); + int block_queue_size = 1; + return MakeReadaheadIterator(std::move(block_iterator_), block_queue_size, + &block_iterator_); } - Status Read(std::shared_ptr* out) { + Status Read(std::shared_ptr
* out) override { task_group_ = internal::TaskGroup::MakeSerial(); // First block - RETURN_NOT_OK(ReadFirstBlock()); - if (eof_) { + std::shared_ptr block; + RETURN_NOT_OK(ReadFirstBlock(&block)); + if (!block) { return Status::Invalid("Empty CSV file"); } - RETURN_NOT_OK(ProcessHeader()); + RETURN_NOT_OK(ProcessHeader(block, &block)); - static constexpr int32_t max_num_rows = std::numeric_limits::max(); - auto parser = - std::make_shared(pool_, parse_options_, num_csv_cols_, max_num_rows); - while (!eof_) { - // Consume current block - uint32_t parsed_size = 0; - RETURN_NOT_OK(parser->Parse(reinterpret_cast(cur_data_), - static_cast(cur_size_), &parsed_size)); - if (parser->num_rows() > 0) { - // Got some data - RETURN_NOT_OK(ProcessData(parser, cur_block_index_++)); - cur_data_ += parsed_size; - cur_size_ -= parsed_size; - if (!task_group_->ok()) { - // Conversion error => early exit - break; - } + auto chunker = MakeChunker(parse_options_); + auto empty = std::make_shared(""); + auto partial = empty; + int64_t block_index = 0; + + while (block) { + std::shared_ptr next_block, completion; + RETURN_NOT_OK(block_iterator_.Next(&next_block)); + bool is_final = (next_block == nullptr); + + if (is_final) { + // End of file reached => compute completion from penultimate block + RETURN_NOT_OK(chunker->ProcessFinal(partial, block, &completion, &block)); } else { - // Need to fetch more data to get at least one row - RETURN_NOT_OK(ReadNextBlock()); - } - } - if (eof_ && cur_size_ > 0) { - // Parse remaining data - uint32_t parsed_size = 0; - RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast(cur_data_), - static_cast(cur_size_), &parsed_size)); - if (parser->num_rows() > 0) { - RETURN_NOT_OK(ProcessData(parser, cur_block_index_++)); + // Get completion of partial from previous block. + RETURN_NOT_OK(chunker->ProcessWithPartial(partial, block, &completion, &block)); } + + uint32_t parsed_size; + RETURN_NOT_OK(ParseAndInsert(partial, completion, block, block_index, is_final, + &parsed_size)); + ++block_index; + + auto offset = + static_cast(parsed_size) - partial->size() - completion->size(); + DCHECK_GE(offset, 0); // Ensured by chunker + partial = SliceBuffer(block, offset); + block = next_block; } // Finish conversion, create schema and table @@ -410,20 +403,15 @@ class SerialTableReader : public BaseTableReader { class ThreadedTableReader : public BaseTableReader { public: + using BaseTableReader::BaseTableReader; + ThreadedTableReader(MemoryPool* pool, std::shared_ptr input, - ThreadPool* thread_pool, const ReadOptions& read_options, - const ParseOptions& parse_options, - const ConvertOptions& convert_options) - : BaseTableReader(pool, read_options, parse_options, convert_options), - thread_pool_(thread_pool) { - // Readahead one block per worker thread - int32_t block_queue_size = thread_pool->GetCapacity(); - readahead_ = std::make_shared( - pool_, input, read_options_.block_size, block_queue_size, kDefaultLeftPadding, - kDefaultRightPadding); - } + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options, ThreadPool* thread_pool) + : BaseTableReader(pool, input, read_options, parse_options, convert_options), + thread_pool_(thread_pool) {} - ~ThreadedTableReader() { + ~ThreadedTableReader() override { if (task_group_) { // In case of error, make sure all pending tasks are finished before // we start destroying BaseTableReader members @@ -431,76 +419,61 @@ class ThreadedTableReader : public BaseTableReader { } } - Status Read(std::shared_ptr
* out) { + Status Init() override { + RETURN_NOT_OK( + io::MakeInputStreamIterator(input_, read_options_.block_size, &block_iterator_)); + int32_t block_queue_size = thread_pool_->GetCapacity(); + return MakeReadaheadIterator(std::move(block_iterator_), block_queue_size, + &block_iterator_); + } + + Status Read(std::shared_ptr
* out) override { task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_); - static constexpr int32_t max_num_rows = std::numeric_limits::max(); - Chunker chunker(parse_options_); - // Get first block and process header serially - RETURN_NOT_OK(ReadFirstBlock()); - if (eof_) { + // Read first block and process header serially + std::shared_ptr block; + RETURN_NOT_OK(ReadFirstBlock(&block)); + if (!block) { return Status::Invalid("Empty CSV file"); } - RETURN_NOT_OK(ProcessHeader()); - - while (!eof_ && task_group_->ok()) { - // Consume current chunk - uint32_t chunk_size = 0; - RETURN_NOT_OK(chunker.Process(reinterpret_cast(cur_data_), - static_cast(cur_size_), &chunk_size)); - if (chunk_size > 0) { - // Got a chunk of rows - const uint8_t* chunk_data = cur_data_; - std::shared_ptr chunk_buffer = cur_block_; - int64_t chunk_index = cur_block_index_; - - // "mutable" allows to modify captured by-copy chunk_buffer - task_group_->Append([=]() mutable -> Status { - auto parser = std::make_shared(pool_, parse_options_, - num_csv_cols_, max_num_rows); - uint32_t parsed_size = 0; - RETURN_NOT_OK(parser->Parse(reinterpret_cast(chunk_data), - chunk_size, &parsed_size)); - if (parsed_size != chunk_size) { - DCHECK_EQ(parsed_size, chunk_size); - return Status::Invalid("Chunker and parser disagree on block size: ", - chunk_size, " vs ", parsed_size); - } - RETURN_NOT_OK(ProcessData(parser, chunk_index)); - // Keep chunk buffer alive within closure and release it at the end - chunk_buffer.reset(); - return Status::OK(); - }); - cur_data_ += chunk_size; - cur_size_ -= chunk_size; - cur_block_index_++; + RETURN_NOT_OK(ProcessHeader(block, &block)); + + auto chunker = MakeChunker(parse_options_); + auto empty = std::make_shared(""); + auto partial = empty; + int64_t block_index = 0; + + while (block) { + std::shared_ptr next_block, whole, completion, next_partial; + RETURN_NOT_OK(block_iterator_.Next(&next_block)); + bool is_final = (next_block == nullptr); + + if (is_final) { + // End of file reached => compute completion from penultimate block + RETURN_NOT_OK(chunker->ProcessFinal(partial, block, &completion, &whole)); } else { - // Need to fetch more data to get at least one row - RETURN_NOT_OK(ReadNextBlock()); + std::shared_ptr starts_with_whole; + // Get completion of partial from previous block. + RETURN_NOT_OK( + chunker->ProcessWithPartial(partial, block, &completion, &starts_with_whole)); + + // Get a complete CSV block inside `partial + block`, and keep + // the rest for the next iteration. + RETURN_NOT_OK(chunker->Process(starts_with_whole, &whole, &next_partial)); } - } - // Finish all pending parallel tasks - RETURN_NOT_OK(task_group_->Finish()); + // Launch parse task + task_group_->Append([this, partial, completion, whole, block_index, is_final] { + return ParseAndInsert(partial, completion, whole, block_index, is_final); + }); + block_index++; - if (eof_ && cur_size_ > 0) { - // Parse remaining data (serial) - task_group_ = internal::TaskGroup::MakeSerial(); - for (auto& builder : column_builders_) { - builder->SetTaskGroup(task_group_); - } - auto parser = std::make_shared(pool_, parse_options_, num_csv_cols_, - max_num_rows); - uint32_t parsed_size = 0; - RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast(cur_data_), - static_cast(cur_size_), &parsed_size)); - if (parser->num_rows() > 0) { - RETURN_NOT_OK(ProcessData(parser, cur_block_index_++)); - } - RETURN_NOT_OK(task_group_->Finish()); + partial = next_partial; + block = next_block; } - // Create schema and table + // Finish conversion, create schema and table + RETURN_NOT_OK(task_group_->Finish()); return MakeTable(out); } @@ -516,18 +489,17 @@ Status TableReader::Make(MemoryPool* pool, std::shared_ptr inpu const ParseOptions& parse_options, const ConvertOptions& convert_options, std::shared_ptr* out) { - std::shared_ptr result; + std::shared_ptr result; if (read_options.use_threads) { result = std::make_shared( - pool, input, GetCpuThreadPool(), read_options, parse_options, convert_options); - *out = result; - return Status::OK(); + pool, input, read_options, parse_options, convert_options, GetCpuThreadPool()); } else { result = std::make_shared(pool, input, read_options, parse_options, convert_options); - *out = result; - return Status::OK(); } + RETURN_NOT_OK(result->Init()); + *out = result; + return Status::OK(); } } // namespace csv diff --git a/cpp/src/arrow/csv/test_common.h b/cpp/src/arrow/csv/test_common.h index 624023f6037..b616791c2c2 100644 --- a/cpp/src/arrow/csv/test_common.h +++ b/cpp/src/arrow/csv/test_common.h @@ -37,16 +37,21 @@ std::string MakeCSVData(std::vector lines) { } // Make a BlockParser from a vector of lines representing a CSV file -void MakeCSVParser(std::vector lines, ParseOptions options, +void MakeCSVParser(std::vector lines, ParseOptions options, int32_t num_cols, std::shared_ptr* out) { auto csv = MakeCSVData(lines); - auto parser = std::make_shared(options); + auto parser = std::make_shared(options, num_cols); uint32_t out_size; - ASSERT_OK(parser->Parse(csv.data(), static_cast(csv.size()), &out_size)); + ASSERT_OK(parser->Parse(util::string_view(csv), &out_size)); ASSERT_EQ(out_size, csv.size()) << "trailing CSV data not parsed"; *out = parser; } +void MakeCSVParser(std::vector lines, ParseOptions options, + std::shared_ptr* out) { + return MakeCSVParser(lines, options, -1, out); +} + void MakeCSVParser(std::vector lines, std::shared_ptr* out) { MakeCSVParser(lines, ParseOptions::Defaults(), out); } @@ -60,7 +65,7 @@ void MakeColumnParser(std::vector items, std::shared_ptrnum_cols(), 1) << "Should have seen only 1 CSV column"; ASSERT_EQ((*out)->num_rows(), items.size()); } diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index f84c79930f5..ab28aad0ecd 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -27,7 +27,6 @@ if(ARROW_HDFS) endif() add_arrow_test(memory_test PREFIX "arrow-io") -add_arrow_test(readahead_test PREFIX "arrow-io") add_arrow_benchmark(file_benchmark PREFIX "arrow-io") add_arrow_benchmark(memory_benchmark PREFIX "arrow-io") diff --git a/cpp/src/arrow/io/readahead.cc b/cpp/src/arrow/io/readahead.cc deleted file mode 100644 index fd004146b03..00000000000 --- a/cpp/src/arrow/io/readahead.cc +++ /dev/null @@ -1,226 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/io/readahead.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "arrow/buffer.h" -#include "arrow/io/interfaces.h" -#include "arrow/memory_pool.h" -#include "arrow/status.h" -#include "arrow/util/logging.h" -#include "arrow/util/macros.h" - -namespace arrow { -namespace io { -namespace internal { - -// ---------------------------------------------------------------------- -// ReadaheadSpooler implementation - -class ReadaheadSpooler::Impl { - public: - Impl(MemoryPool* pool, std::shared_ptr raw, int64_t read_size, - int32_t readahead_queue_size, int64_t left_padding, int64_t right_padding) - : pool_(pool), - raw_(raw), - read_size_(read_size), - readahead_queue_size_(readahead_queue_size), - left_padding_(left_padding), - right_padding_(right_padding) { - DCHECK_NE(raw, nullptr); - DCHECK_GT(read_size, 0); - DCHECK_GT(readahead_queue_size, 0); - io_worker_ = std::thread([&]() { WorkerLoop(); }); - } - - ~Impl() { ARROW_UNUSED(Close()); } - - Status Close() { - std::unique_lock lock(mutex_); - please_close_ = true; - io_wakeup_.notify_one(); - // Wait for IO thread to finish - if (io_worker_.joinable()) { - lock.unlock(); - io_worker_.join(); - lock.lock(); - } - return Status::OK(); - } - - Status Read(ReadaheadBuffer* out) { - std::unique_lock lock(mutex_); - while (true) { - // Drain queue before querying other flags - if (buffer_queue_.size() > 0) { - *out = std::move(buffer_queue_.front()); - DCHECK_NE(out->buffer, nullptr); - buffer_queue_.pop_front(); - // Need to fill up queue again - io_wakeup_.notify_one(); - return Status::OK(); - } - if (!read_status_.ok()) { - // Got a read error, bail out - return read_status_; - } - if (eof_) { - out->buffer.reset(); - return Status::OK(); - } - // Readahead queue is empty and we're not closed yet, wait for more I/O - io_progress_.wait(lock); - } - } - - int64_t left_padding() { - std::unique_lock lock(mutex_); - return left_padding_; - } - - void left_padding(int64_t size) { - std::unique_lock lock(mutex_); - left_padding_ = size; - } - - int64_t right_padding() { - std::unique_lock lock(mutex_); - return right_padding_; - } - - void right_padding(int64_t size) { - std::unique_lock lock(mutex_); - right_padding_ = size; - } - - protected: - // The background thread's main function - void WorkerLoop() { - std::unique_lock lock(mutex_); - Status st; - - while (true) { - if (please_close_) { - goto eof; - } - // Fill up readahead queue until desired size - while (buffer_queue_.size() < static_cast(readahead_queue_size_)) { - ReadaheadBuffer buf = {nullptr, left_padding_, right_padding_}; - lock.unlock(); - Status st = ReadOneBufferUnlocked(&buf); - lock.lock(); - if (!st.ok()) { - read_status_ = st; - goto error; - } - // Close() could have been called while unlocked above - if (please_close_) { - goto eof; - } - // Got empty read? - if (buf.buffer->size() == buf.left_padding + buf.right_padding) { - goto eof; - } - buffer_queue_.push_back(std::move(buf)); - io_progress_.notify_one(); - } - // Wait for Close() or Read() call - io_wakeup_.wait(lock); - } - eof: - eof_ = true; - error: - // Make sure any pending Read() doesn't block indefinitely - io_progress_.notify_one(); - } - - Status ReadOneBufferUnlocked(ReadaheadBuffer* buf) { - // Note that left_padding_ and right_padding_ may be modified while unlocked - std::shared_ptr buffer; - int64_t bytes_read; - RETURN_NOT_OK(AllocateResizableBuffer( - pool_, read_size_ + buf->left_padding + buf->right_padding, &buffer)); - DCHECK_NE(buffer->mutable_data(), nullptr); - RETURN_NOT_OK( - raw_->Read(read_size_, &bytes_read, buffer->mutable_data() + buf->left_padding)); - if (bytes_read < read_size_) { - // Got a short read - RETURN_NOT_OK(buffer->Resize(bytes_read + buf->left_padding + buf->right_padding)); - DCHECK_NE(buffer->mutable_data(), nullptr); - } - // Zero padding areas - memset(buffer->mutable_data(), 0, buf->left_padding); - memset(buffer->mutable_data() + bytes_read + buf->left_padding, 0, - buf->right_padding); - buf->buffer = std::move(buffer); - return Status::OK(); - } - - MemoryPool* pool_; - std::shared_ptr raw_; - int64_t read_size_; - int32_t readahead_queue_size_; - int64_t left_padding_ = 0; - int64_t right_padding_ = 0; - - std::mutex mutex_; - std::condition_variable io_wakeup_; - std::condition_variable io_progress_; - std::thread io_worker_; - bool please_close_ = false; - bool eof_ = false; - std::deque buffer_queue_; - Status read_status_; -}; - -ReadaheadSpooler::ReadaheadSpooler(MemoryPool* pool, std::shared_ptr raw, - int64_t read_size, int32_t readahead_queue_size, - int64_t left_padding, int64_t right_padding) - : impl_(new ReadaheadSpooler::Impl(pool, raw, read_size, readahead_queue_size, - left_padding, right_padding)) {} - -ReadaheadSpooler::ReadaheadSpooler(std::shared_ptr raw, int64_t read_size, - int32_t readahead_queue_size, int64_t left_padding, - int64_t right_padding) - : ReadaheadSpooler(default_memory_pool(), raw, read_size, readahead_queue_size, - left_padding, right_padding) {} - -int64_t ReadaheadSpooler::GetLeftPadding() { return impl_->left_padding(); } - -void ReadaheadSpooler::SetLeftPadding(int64_t size) { impl_->left_padding(size); } - -int64_t ReadaheadSpooler::GetRightPadding() { return impl_->right_padding(); } - -void ReadaheadSpooler::SetRightPadding(int64_t size) { impl_->right_padding(size); } - -Status ReadaheadSpooler::Close() { return impl_->Close(); } - -Status ReadaheadSpooler::Read(ReadaheadBuffer* out) { return impl_->Read(out); } - -ReadaheadSpooler::~ReadaheadSpooler() {} - -} // namespace internal -} // namespace io -} // namespace arrow diff --git a/cpp/src/arrow/io/readahead.h b/cpp/src/arrow/io/readahead.h deleted file mode 100644 index 950520ba597..00000000000 --- a/cpp/src/arrow/io/readahead.h +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef ARROW_IO_READAHEAD_H -#define ARROW_IO_READAHEAD_H - -#include -#include - -#include "arrow/util/visibility.h" - -namespace arrow { - -class MemoryPool; -class ResizableBuffer; -class Status; - -namespace io { - -class InputStream; - -namespace internal { - -struct ARROW_EXPORT ReadaheadBuffer { - std::shared_ptr buffer; - int64_t left_padding; - int64_t right_padding; -}; - -class ARROW_EXPORT ReadaheadSpooler { - public: - /// \brief EXPERIMENTAL: Create a readahead spooler wrapping the given input stream. - /// - /// The spooler launches a background thread that reads up to a given number - /// of fixed-size blocks in advance from the underlying stream. - /// The buffers returned by Read() will be padded at the beginning and the end - /// with the configured amount of (zeroed) bytes. - ReadaheadSpooler(MemoryPool* pool, std::shared_ptr raw, - int64_t read_size = kDefaultReadSize, int32_t readahead_queue_size = 1, - int64_t left_padding = 0, int64_t right_padding = 0); - - explicit ReadaheadSpooler(std::shared_ptr raw, - int64_t read_size = kDefaultReadSize, - int32_t readahead_queue_size = 1, int64_t left_padding = 0, - int64_t right_padding = 0); - - ~ReadaheadSpooler(); - - /// Configure zero-padding at beginning and end of buffers (default 0 bytes). - /// The buffers returned by Read() will be padded at the beginning and the end - /// with the configured amount of (zeroed) bytes. - /// Note that, as reading happens in background and in advance, changing the - /// configured values might not affect Read() results immediately. - int64_t GetLeftPadding(); - void SetLeftPadding(int64_t size); - - int64_t GetRightPadding(); - void SetRightPadding(int64_t size); - - /// \brief Close the spooler. This implicitly closes the underlying input stream. - Status Close(); - - /// \brief Read a buffer from the queue. - /// - /// If the buffer pointer in the ReadaheadBuffer is null, then EOF was - /// reached and/or the spooler was explicitly closed. - /// Otherwise, the buffer will contain at most read_size bytes in addition - /// to the configured padding (short reads are possible at the end of a file). - // How do we allow reusing the buffer in ReadaheadBuffer? perhaps by using - // a caching memory pool? - Status Read(ReadaheadBuffer* out); - - private: - static constexpr int64_t kDefaultReadSize = 1 << 20; // 1 MB - - class ARROW_NO_EXPORT Impl; - std::unique_ptr impl_; -}; - -} // namespace internal -} // namespace io -} // namespace arrow - -#endif // ARROW_IO_READAHEAD_H diff --git a/cpp/src/arrow/io/readahead_test.cc b/cpp/src/arrow/io/readahead_test.cc deleted file mode 100644 index fadd4dfc635..00000000000 --- a/cpp/src/arrow/io/readahead_test.cc +++ /dev/null @@ -1,277 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "arrow/buffer.h" -#include "arrow/io/interfaces.h" -#include "arrow/io/memory.h" -#include "arrow/io/readahead.h" -#include "arrow/memory_pool.h" -#include "arrow/status.h" -#include "arrow/testing/gtest_util.h" -#include "arrow/testing/util.h" -#include "arrow/util/checked_cast.h" - -namespace arrow { - -using internal::checked_cast; - -namespace io { -namespace internal { - -class LockedInputStream : public InputStream { - public: - explicit LockedInputStream(const std::shared_ptr& stream) - : stream_(stream) {} - - Status Close() override { - std::lock_guard lock(mutex_); - return stream_->Close(); - } - - bool closed() const override { - std::lock_guard lock(mutex_); - return stream_->closed(); - } - - Status Tell(int64_t* position) const override { - std::lock_guard lock(mutex_); - return stream_->Tell(position); - } - - Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) override { - std::lock_guard lock(mutex_); - return stream_->Read(nbytes, bytes_read, buffer); - } - - Status Read(int64_t nbytes, std::shared_ptr* out) override { - std::lock_guard lock(mutex_); - return stream_->Read(nbytes, out); - } - - bool supports_zero_copy() const override { - std::lock_guard lock(mutex_); - return stream_->supports_zero_copy(); - } - - Status Peek(int64_t nbytes, util::string_view* out) override { - std::lock_guard lock(mutex_); - return stream_->Peek(nbytes, out); - } - - protected: - std::shared_ptr stream_; - mutable std::mutex mutex_; -}; - -static void sleep_for(double seconds) { - std::this_thread::sleep_for( - std::chrono::nanoseconds(static_cast(seconds * 1e9))); -} - -static void busy_wait(double seconds, std::function predicate) { - const double period = 0.001; - for (int i = 0; !predicate() && i * period < seconds; ++i) { - sleep_for(period); - } -} - -std::shared_ptr DataReader(const std::string& data) { - std::shared_ptr buffer; - ABORT_NOT_OK(Buffer::FromString(data, &buffer)); - return std::make_shared(std::make_shared(buffer)); -} - -static int64_t WaitForPosition(const FileInterface& file, int64_t expected, - double seconds = 0.2) { - int64_t pos = -1; - busy_wait(seconds, [&]() -> bool { - ABORT_NOT_OK(file.Tell(&pos)); - return pos >= expected; - }); - return pos; -} - -static void AssertEventualPosition(const FileInterface& file, int64_t expected) { - int64_t pos = WaitForPosition(file, expected); - ASSERT_EQ(pos, expected) << "File didn't reach expected position"; -} - -template -static void AssertReadaheadBuffer(const ReadaheadBuffer& buf, - std::set left_paddings, - std::set right_paddings, - const Expected& expected_data) { - ASSERT_TRUE(left_paddings.count(buf.left_padding)) - << "Left padding (" << buf.left_padding << ") not amongst expected values"; - ASSERT_TRUE(right_paddings.count(buf.right_padding)) - << "Right padding (" << buf.right_padding << ") not amongst expected values"; - auto actual_data = - SliceBuffer(buf.buffer, buf.left_padding, - buf.buffer->size() - buf.left_padding - buf.right_padding); - AssertBufferEqual(*actual_data, expected_data); -} - -static void AssertReadaheadBufferEOF(const ReadaheadBuffer& buf) { - ASSERT_EQ(buf.buffer.get(), nullptr) << "Expected EOF signalled by null buffer pointer"; -} - -TEST(ReadaheadSpooler, BasicReads) { - // Test basic reads - auto data_reader = DataReader("0123456789"); - ReadaheadSpooler spooler(data_reader, 2, 3); - ReadaheadBuffer buf; - - AssertEventualPosition(*data_reader, 3 * 2); - - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {0}, {0}, "01"); - AssertEventualPosition(*data_reader, 4 * 2); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {0}, {0}, "23"); - AssertEventualPosition(*data_reader, 5 * 2); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {0}, {0}, "45"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {0}, {0}, "67"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {0}, {0}, "89"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBufferEOF(buf); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBufferEOF(buf); -} - -TEST(ReadaheadSpooler, ShortReadAtEnd) { - auto data_reader = DataReader("01234"); - ReadaheadSpooler spooler(data_reader, 3, 2); - ReadaheadBuffer buf; - - AssertEventualPosition(*data_reader, 5); - - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {0}, {0}, "012"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {0}, {0}, "34"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBufferEOF(buf); -} - -TEST(ReadaheadSpooler, Close) { - // Closing should stop reads - auto data_reader = DataReader("0123456789"); - ReadaheadSpooler spooler(data_reader, 2, 2); - ReadaheadBuffer buf; - - AssertEventualPosition(*data_reader, 2 * 2); - ASSERT_OK(spooler.Close()); - - // Idempotency - ASSERT_OK(spooler.Close()); - - // ARROW-4823: does not close raw - ASSERT_FALSE(data_reader->closed()); -} - -TEST(ReadaheadSpooler, Paddings) { - auto data_reader = DataReader("0123456789"); - ReadaheadSpooler spooler(data_reader, 2, 2, 1 /* left_padding */, - 4 /* right_padding */); - ReadaheadBuffer buf; - - AssertEventualPosition(*data_reader, 2 * 2); - ASSERT_EQ(spooler.GetLeftPadding(), 1); - ASSERT_EQ(spooler.GetRightPadding(), 4); - spooler.SetLeftPadding(3); - spooler.SetRightPadding(2); - ASSERT_EQ(spooler.GetLeftPadding(), 3); - ASSERT_EQ(spooler.GetRightPadding(), 2); - - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {1}, {4}, "01"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {1}, {4}, "23"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {3}, {2}, "45"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {3}, {2}, "67"); - spooler.SetLeftPadding(4); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBuffer(buf, {3, 4}, {2}, "89"); - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBufferEOF(buf); -} - -TEST(ReadaheadSpooler, StressReads) { - // NBYTES % READ_SIZE != 0 ensures a short read at end -#if defined(ARROW_VALGRIND) - const int64_t NBYTES = 101; -#else - const int64_t NBYTES = 50001; -#endif - const int64_t READ_SIZE = 2; - - std::shared_ptr data; - ASSERT_OK(MakeRandomByteBuffer(NBYTES, default_memory_pool(), &data)); - auto data_reader = std::make_shared(data); - - ReadaheadSpooler spooler(data_reader, READ_SIZE, 7); - int64_t pos = 0; - std::vector readahead_buffers; - - // Stress Read() calls while the background thread is reading ahead - while (pos < NBYTES) { - ReadaheadBuffer buf; - ASSERT_OK(spooler.Read(&buf)); - ASSERT_NE(buf.buffer.get(), nullptr) << "Got premature EOF at index " << pos; - pos += buf.buffer->size() - buf.left_padding - buf.right_padding; - readahead_buffers.push_back(std::move(buf)); - } - // At EOF - { - ReadaheadBuffer buf; - ASSERT_OK(spooler.Read(&buf)); - AssertReadaheadBufferEOF(buf); - } - - pos = 0; - for (const auto& buf : readahead_buffers) { - auto expected_data = SliceBuffer(data, pos, std::min(READ_SIZE, NBYTES - pos)); - AssertReadaheadBuffer(buf, {0}, {0}, *expected_data); - pos += expected_data->size(); - } - // Got exactly the total bytes length - ASSERT_EQ(pos, NBYTES); -} - -} // namespace internal -} // namespace io -} // namespace arrow diff --git a/cpp/src/arrow/json/chunker.cc b/cpp/src/arrow/json/chunker.cc index cd21ca10c74..d96b8ed80bc 100644 --- a/cpp/src/arrow/json/chunker.cc +++ b/cpp/src/arrow/json/chunker.cc @@ -37,90 +37,21 @@ namespace rj = arrow::rapidjson; using internal::make_unique; using util::string_view; -static Status StraddlingTooLarge() { - return Status::Invalid( - "straddling object straddles two block boundaries (try to increase block size?)"); -} - -static size_t ConsumeWhitespace(std::shared_ptr* buf) { +static size_t ConsumeWhitespace(string_view view) { #if defined(ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD) - auto data = reinterpret_cast((*buf)->data()); - auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + (*buf)->size()); - auto ws_count = nonws_begin - data; - *buf = SliceBuffer(*buf, ws_count); - return static_cast(ws_count); + auto data = view.data(); + auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size()); + return nonws_begin - data; #else - auto ws_count = string_view(**buf).find_first_not_of(" \t\r\n"); + auto ws_count = view.find_first_not_of(" \t\r\n"); if (ws_count == string_view::npos) { - ws_count = (*buf)->size(); + return 0; + } else { + return ws_count; } - *buf = SliceBuffer(*buf, ws_count); - return ws_count; #endif } -// A chunker implementation that assumes JSON objects don't contain raw newlines. -// This allows fast chunk delimitation using a simple newline search. -class NewlinesStrictlyDelimitChunker : public Chunker { - public: - Status Process(std::shared_ptr block, std::shared_ptr* whole, - std::shared_ptr* partial) override { - auto last_newline = string_view(*block).find_last_of("\n\r"); - if (last_newline == string_view::npos) { - // no newlines in this block, return empty chunk - *whole = SliceBuffer(block, 0, 0); - *partial = block; - } else { - *whole = SliceBuffer(block, 0, last_newline + 1); - *partial = SliceBuffer(block, last_newline + 1); - } - return Status::OK(); - } - - Status ProcessWithPartial(std::shared_ptr partial_original, - std::shared_ptr block, - std::shared_ptr* completion, - std::shared_ptr* rest) override { - return DoProcessWithPartial(partial_original, block, false, completion, rest); - } - - Status ProcessFinal(std::shared_ptr partial_original, - std::shared_ptr block, std::shared_ptr* completion, - std::shared_ptr* rest) override { - return DoProcessWithPartial(partial_original, block, true, completion, rest); - } - - protected: - Status DoProcessWithPartial(std::shared_ptr partial, - std::shared_ptr block, bool is_final, - std::shared_ptr* completion, - std::shared_ptr* rest) { - ConsumeWhitespace(&partial); - if (partial->size() == 0) { - // if partial is empty, don't bother looking for completion - *completion = SliceBuffer(block, 0, 0); - *rest = block; - return Status::OK(); - } - auto first_newline = string_view(*block).find_first_of("\n\r"); - if (first_newline == string_view::npos) { - // no newlines in this block - if (is_final) { - // => it's entirely a completion of partial - *completion = block; - *rest = SliceBuffer(block, 0, 0); - return Status::OK(); - } else { - // => the current object is too large for block size - return StraddlingTooLarge(); - } - } - *completion = SliceBuffer(block, 0, first_newline + 1); - *rest = SliceBuffer(block, first_newline + 1); - return Status::OK(); - } -}; - /// RapidJson custom stream for reading JSON stored in multiple buffers /// http://rapidjson.org/md_doc_stream.html#CustomStream class MultiStringStream { @@ -187,88 +118,61 @@ static size_t ConsumeWholeObject(Stream&& stream) { } } -// A chunker implementation that assumes JSON objects can contain raw newlines, -// and uses actual JSON parsing to delimit chunks. -class ParsingChunker : public Chunker { +namespace { + +// A BoundaryFinder implementation that assumes JSON objects can contain raw newlines, +// and uses actual JSON parsing to delimit them. +class ParsingBoundaryFinder : public BoundaryFinder { public: - Status Process(std::shared_ptr block, std::shared_ptr* whole, - std::shared_ptr* partial) override { - if (block->size() == 0) { - *whole = SliceBuffer(block, 0, 0); - *partial = block; - return Status::OK(); + Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override { + // NOTE: We could bubble up JSON parse errors here, but the actual parsing + // step will detect them later anyway. + auto length = ConsumeWholeObject(MultiStringStream({partial, block})); + if (length == string_view::npos) { + *out_pos = -1; + } else { + DCHECK_GE(length, partial.size()); + DCHECK_LE(length, partial.size() + block.size()); + *out_pos = static_cast(length - partial.size()); } - size_t total_length = 0; - for (auto consumed = block;; consumed = SliceBuffer(block, total_length)) { - rj::MemoryStream ms(reinterpret_cast(consumed->data()), - consumed->size()); + return Status::OK(); + } + + Status FindLast(util::string_view block, int64_t* out_pos) override { + const size_t block_length = block.size(); + size_t consumed_length = 0; + while (consumed_length < block_length) { + rj::MemoryStream ms(reinterpret_cast(block.data()), block.size()); using InputStream = rj::EncodedInputStream, rj::MemoryStream>; auto length = ConsumeWholeObject(InputStream(ms)); if (length == string_view::npos || length == 0) { - // found incomplete object or consumed is empty + // found incomplete object or block is empty break; } - if (static_cast(length) > consumed->size()) { - total_length += consumed->size(); - break; - } - total_length += length; + consumed_length += length; + block = block.substr(length); } - *whole = SliceBuffer(block, 0, total_length); - *partial = SliceBuffer(block, total_length); - return Status::OK(); - } - - Status ProcessWithPartial(std::shared_ptr partial_original, - std::shared_ptr block, - std::shared_ptr* completion, - std::shared_ptr* rest) override { - return DoProcessWithPartial(partial_original, block, false, completion, rest); - } - - Status ProcessFinal(std::shared_ptr partial_original, - std::shared_ptr block, std::shared_ptr* completion, - std::shared_ptr* rest) override { - return DoProcessWithPartial(partial_original, block, true, completion, rest); - } - - protected: - Status DoProcessWithPartial(std::shared_ptr partial, - std::shared_ptr block, bool is_final, - std::shared_ptr* completion, - std::shared_ptr* rest) { - ConsumeWhitespace(&partial); - if (partial->size() == 0) { - // if partial is empty, don't bother looking for completion - *completion = SliceBuffer(block, 0, 0); - *rest = block; - return Status::OK(); - } - auto length = ConsumeWholeObject(MultiStringStream({partial, block})); - if (length == string_view::npos) { - // no newlines in this block - if (is_final) { - // => it's entirely a completion of partial - *completion = block; - *rest = SliceBuffer(block, 0, 0); - return Status::OK(); - } else { - // => the current object is too large for block size - return StraddlingTooLarge(); - } + if (consumed_length == 0) { + *out_pos = -1; + } else { + consumed_length += ConsumeWhitespace(block); + DCHECK_LE(consumed_length, block_length); + *out_pos = static_cast(consumed_length); } - auto completion_length = length - partial->size(); - *completion = SliceBuffer(block, 0, completion_length); - *rest = SliceBuffer(block, completion_length); return Status::OK(); } }; -std::unique_ptr Chunker::Make(const ParseOptions& options) { - if (!options.newlines_in_values) { - return make_unique(); +} // namespace + +std::unique_ptr MakeChunker(const ParseOptions& options) { + std::shared_ptr delimiter; + if (options.newlines_in_values) { + delimiter = std::make_shared(); + } else { + delimiter = MakeNewlineBoundaryFinder(); } - return make_unique(); + return std::unique_ptr(new Chunker(std::move(delimiter))); } } // namespace json diff --git a/cpp/src/arrow/json/chunker.h b/cpp/src/arrow/json/chunker.h index 7df1b60a4e7..9ed85126da1 100644 --- a/cpp/src/arrow/json/chunker.h +++ b/cpp/src/arrow/json/chunker.h @@ -19,76 +19,17 @@ #include -#include "arrow/status.h" +#include "arrow/util/delimiting.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" namespace arrow { - -class Buffer; - namespace json { struct ParseOptions; -/// \class Chunker -/// \brief A reusable block-based chunker for JSON data -/// -/// The chunker takes a block of JSON data and finds a suitable place -/// to cut it up without splitting an object. -class ARROW_EXPORT Chunker { - public: - virtual ~Chunker() = default; - - /// \brief Carve up a chunk in a block of data to contain only whole objects - /// - /// Post-conditions: - /// - block == whole + partial - /// - `whole` is a valid block of JSON data - /// - `partial` doesn't contain an entire JSON object - /// - /// \param[in] block json data to be chunked - /// \param[out] whole subrange of block containing whole json objects - /// \param[out] partial subrange of block a partial json object - virtual Status Process(std::shared_ptr block, std::shared_ptr* whole, - std::shared_ptr* partial) = 0; - - /// \brief Carve the completion of a partial object out of a block - /// - /// Post-conditions: - /// - block == completion + rest - /// - `partial + completion` is a valid block of JSON data - /// - `completion` doesn't contain an entire JSON object - /// - /// \param[in] partial incomplete json object - /// \param[in] block json data - /// \param[out] completion subrange of block containing the completion of partial - /// \param[out] rest subrange of block containing what completion does not cover - virtual Status ProcessWithPartial(std::shared_ptr partial, - std::shared_ptr block, - std::shared_ptr* completion, - std::shared_ptr* rest) = 0; - - /// \brief Like ProcessWithPartial, but for the lastblock of a file - /// - /// This method allows for a final JSON object without a trailing newline - /// (ProcessWithPartial would return an error in that case). - /// - /// Post-conditions: - /// - block == completion + rest - /// - `partial + completion` is a valid block of JSON data - /// - `completion` doesn't contain an entire JSON object - virtual Status ProcessFinal(std::shared_ptr partial, - std::shared_ptr block, - std::shared_ptr* completion, - std::shared_ptr* rest) = 0; - - static std::unique_ptr Make(const ParseOptions& options); - - protected: - Chunker() = default; - ARROW_DISALLOW_COPY_AND_ASSIGN(Chunker); -}; +ARROW_EXPORT +std::unique_ptr MakeChunker(const ParseOptions& options); } // namespace json } // namespace arrow diff --git a/cpp/src/arrow/json/chunker_test.cc b/cpp/src/arrow/json/chunker_test.cc index 70bca68426c..d6b2c9fd5f6 100644 --- a/cpp/src/arrow/json/chunker_test.cc +++ b/cpp/src/arrow/json/chunker_test.cc @@ -175,7 +175,7 @@ void AssertStraddledChunking(Chunker& chunker, const std::shared_ptr& bu std::unique_ptr MakeChunker(bool newlines_in_values) { auto options = ParseOptions::Defaults(); options.newlines_in_values = newlines_in_values; - return Chunker::Make(options); + return MakeChunker(options); } class BaseChunkerTest : public ::testing::TestWithParam { diff --git a/cpp/src/arrow/json/parser_benchmark.cc b/cpp/src/arrow/json/parser_benchmark.cc index d0e0af635f5..28b7123bbc9 100644 --- a/cpp/src/arrow/json/parser_benchmark.cc +++ b/cpp/src/arrow/json/parser_benchmark.cc @@ -53,7 +53,7 @@ std::string TestJsonData(int num_rows, bool pretty = false) { static void BenchmarkJSONChunking(benchmark::State& state, const std::shared_ptr& json, ParseOptions options) { // NOLINT non-const reference - auto chunker = Chunker::Make(options); + auto chunker = MakeChunker(options); for (auto _ : state) { std::shared_ptr chunked, partial; diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc index 3e0e22464e2..624c7879a0d 100644 --- a/cpp/src/arrow/json/reader.cc +++ b/cpp/src/arrow/json/reader.cc @@ -54,7 +54,7 @@ class TableReaderImpl : public TableReader, : pool_(pool), read_options_(read_options), parse_options_(parse_options), - chunker_(Chunker::Make(parse_options_)), + chunker_(MakeChunker(parse_options_)), task_group_(std::move(task_group)) {} Status Init(std::shared_ptr input) { diff --git a/cpp/src/arrow/util/delimiting.cc b/cpp/src/arrow/util/delimiting.cc new file mode 100644 index 00000000000..1b23c377052 --- /dev/null +++ b/cpp/src/arrow/util/delimiting.cc @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/delimiting.h" +#include "arrow/buffer.h" + +namespace arrow { + +BoundaryFinder::~BoundaryFinder() {} + +namespace { + +Status StraddlingTooLarge() { + return Status::Invalid( + "straddling object straddles two block boundaries (try to increase block size?)"); +} + +class NewlineBoundaryFinder : public BoundaryFinder { + public: + Status FindFirst(util::string_view partial, util::string_view block, + int64_t* out_pos) override { + auto pos = block.find_first_of(newline_delimiters); + if (pos == util::string_view::npos) { + *out_pos = kNoDelimiterFound; + } else { + auto end = block.find_first_not_of(newline_delimiters, pos); + if (end == util::string_view::npos) { + end = block.length(); + } + *out_pos = static_cast(end); + } + return Status::OK(); + } + + Status FindLast(util::string_view block, int64_t* out_pos) override { + auto pos = block.find_last_of(newline_delimiters); + if (pos == util::string_view::npos) { + *out_pos = kNoDelimiterFound; + } else { + auto end = block.find_first_not_of(newline_delimiters, pos); + if (end == util::string_view::npos) { + end = block.length(); + } + *out_pos = static_cast(end); + } + return Status::OK(); + } + + protected: + static constexpr const char* newline_delimiters = "\r\n"; +}; + +} // namespace + +std::shared_ptr MakeNewlineBoundaryFinder() { + return std::make_shared(); +} + +Chunker::~Chunker() {} + +Chunker::Chunker(std::shared_ptr delimiter) + : boundary_finder_(delimiter) {} + +Status Chunker::Process(std::shared_ptr block, std::shared_ptr* whole, + std::shared_ptr* partial) { + int64_t last_pos = -1; + RETURN_NOT_OK(boundary_finder_->FindLast(util::string_view(*block), &last_pos)); + if (last_pos == BoundaryFinder::kNoDelimiterFound) { + // No delimiter found + *whole = SliceBuffer(block, 0, 0); + *partial = block; + return Status::OK(); + } else { + *whole = SliceBuffer(block, 0, last_pos); + *partial = SliceBuffer(block, last_pos); + } + return Status::OK(); +} + +Status Chunker::ProcessWithPartial(std::shared_ptr partial, + std::shared_ptr block, + std::shared_ptr* completion, + std::shared_ptr* rest) { + if (partial->size() == 0) { + // If partial is empty, don't bother looking for completion + *completion = SliceBuffer(block, 0, 0); + *rest = block; + return Status::OK(); + } + int64_t first_pos = -1; + RETURN_NOT_OK(boundary_finder_->FindFirst(util::string_view(*partial), + util::string_view(*block), &first_pos)); + if (first_pos == BoundaryFinder::kNoDelimiterFound) { + // No delimiter in block => the current object is too large for block size + return StraddlingTooLarge(); + } else { + *completion = SliceBuffer(block, 0, first_pos); + *rest = SliceBuffer(block, first_pos); + return Status::OK(); + } +} + +Status Chunker::ProcessFinal(std::shared_ptr partial, + std::shared_ptr block, + std::shared_ptr* completion, + std::shared_ptr* rest) { + if (partial->size() == 0) { + // If partial is empty, don't bother looking for completion + *completion = SliceBuffer(block, 0, 0); + *rest = block; + return Status::OK(); + } + int64_t first_pos = -1; + RETURN_NOT_OK(boundary_finder_->FindFirst(util::string_view(*partial), + util::string_view(*block), &first_pos)); + if (first_pos == BoundaryFinder::kNoDelimiterFound) { + // No delimiter in block => it's entirely a completion of partial + *completion = block; + *rest = SliceBuffer(block, 0, 0); + } else { + *completion = SliceBuffer(block, 0, first_pos); + *rest = SliceBuffer(block, first_pos); + } + return Status::OK(); +} + +} // namespace arrow diff --git a/cpp/src/arrow/util/delimiting.h b/cpp/src/arrow/util/delimiting.h new file mode 100644 index 00000000000..061050df42f --- /dev/null +++ b/cpp/src/arrow/util/delimiting.h @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/status.h" +#include "arrow/util/macros.h" +#include "arrow/util/string_view.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; + +class ARROW_EXPORT BoundaryFinder { + public: + BoundaryFinder() = default; + + virtual ~BoundaryFinder(); + + /// \brief Find the position of the first delimiter inside block + /// + /// `partial` is taken to be the beginning of the block, and `block` + /// its continuation. Also, `partial` doesn't contain a delimiter. + /// + /// The returned `out_pos` is relative to `block`'s start and should point + /// to the first character after the first delimiter. + /// `out_pos` will be -1 if no delimiter is found. + virtual Status FindFirst(util::string_view partial, util::string_view block, + int64_t* out_pos) = 0; + + /// \brief Find the position of the last delimiter inside block + /// + /// The returned `out_pos` is relative to `block`'s start and should point + /// to the first character after the last delimiter. + /// `out_pos` will be -1 if no delimiter is found. + virtual Status FindLast(util::string_view block, int64_t* out_pos) = 0; + + static constexpr int64_t kNoDelimiterFound = -1; + + protected: + ARROW_DISALLOW_COPY_AND_ASSIGN(BoundaryFinder); +}; + +ARROW_EXPORT +std::shared_ptr MakeNewlineBoundaryFinder(); + +/// \brief A reusable block-based chunker for delimited data +/// +/// The chunker takes a block of delimited data and helps carve a sub-block +/// which begins and ends on delimiters (suitable for consumption by parsers +/// which can only parse whole objects). +class ARROW_EXPORT Chunker { + public: + explicit Chunker(std::shared_ptr delimiter); + ~Chunker(); + + /// \brief Carve up a chunk in a block of data to contain only whole objects + /// + /// Pre-conditions: + /// - `block` is the start of a valid block of delimited data + /// (i.e. starts just after a delimiter) + /// + /// Post-conditions: + /// - block == whole + partial + /// - `whole` is a valid block of delimited data + /// (i.e. starts just after a delimiter and ends with a delimiter) + /// - `partial` doesn't contain an entire delimited object + /// (IOW: `partial` is generally small) + /// + /// This method will look for the last delimiter in `block` and may + /// therefore be costly. + /// + /// \param[in] block data to be chunked + /// \param[out] whole subrange of block containing whole delimited objects + /// \param[out] partial subrange of block starting with a partial delimited object + Status Process(std::shared_ptr block, std::shared_ptr* whole, + std::shared_ptr* partial); + + /// \brief Carve the completion of a partial object out of a block + /// + /// Pre-conditions: + /// - `partial` is the start of a valid block of delimited data + /// (i.e. starts just after a delimiter) + /// - `block` follows `partial` in file order + /// + /// Post-conditions: + /// - block == completion + rest + /// - `partial + completion` is a valid block of delimited data + /// (i.e. starts just after a delimiter and ends with a delimiter) + /// - `completion` doesn't contain an entire delimited object + /// (IOW: `completion` is generally small) + /// + /// This method will look for the first delimiter in `block` and should + /// therefore be reasonably cheap. + /// + /// \param[in] partial incomplete delimited data + /// \param[in] block delimited data following partial + /// \param[out] completion subrange of block containing the completion of partial + /// \param[out] rest subrange of block containing what completion does not cover + Status ProcessWithPartial(std::shared_ptr partial, + std::shared_ptr block, + std::shared_ptr* completion, + std::shared_ptr* rest); + + /// \brief Like ProcessWithPartial, but for the last block of a file + /// + /// This method allows for a final delimited object without a trailing delimiter + /// (ProcessWithPartial would return an error in that case). + /// + /// Pre-conditions: + /// - `partial` is the start of a valid block of delimited data + /// - `block` follows `partial` in file order and is the last data block + /// + /// Post-conditions: + /// - block == completion + rest + /// - `partial + completion` is a valid block of delimited data + /// - `completion` doesn't contain an entire delimited object + /// (IOW: `completion` is generally small) + /// + Status ProcessFinal(std::shared_ptr partial, std::shared_ptr block, + std::shared_ptr* completion, std::shared_ptr* rest); + + protected: + ARROW_DISALLOW_COPY_AND_ASSIGN(Chunker); + + std::shared_ptr boundary_finder_; +}; + +} // namespace arrow