Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ set(ARROW_SRCS
io/hdfs_internal.cc
io/interfaces.cc
io/memory.cc
io/readahead.cc
io/slow.cc
testing/util.cc
util/basic_decimal.cc
util/bit_util.cc
util/compression.cc
util/cpu_info.cc
util/decimal.cc
util/delimiting.cc
util/formatting.cc
util/int_util.cc
util/io_util.cc
Expand Down
300 changes: 190 additions & 110 deletions cpp/src/arrow/csv/chunker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,168 +18,248 @@
#include "arrow/csv/chunker.h"

#include <cstdint>
#include <memory>
#include <utility>

#include "arrow/status.h"
#include "arrow/util/logging.h"
#include "arrow/util/stl.h"
#include "arrow/util/string_view.h"

namespace arrow {
namespace csv {

namespace {

// Find the last newline character in the given data block.
// nullptr is returned if not found (like memchr()).
const char* FindNewlineReverse(const char* data, uint32_t size) {
if (size == 0) {
return nullptr;
}
const char* s = data + size - 1;
while (size > 0) {
if (*s == '\r' || *s == '\n') {
return s;
}
--s;
--size;
}
return nullptr;
}
// NOTE: csvmonkey (https://github.com/dw/csvmonkey) has optimization ideas

} // namespace
template <bool quoting, bool escaping>
class Lexer {
public:
enum State {
FIELD_START,
IN_FIELD,
AT_ESCAPE,
IN_QUOTED_FIELD,
AT_QUOTED_QUOTE,
AT_QUOTED_ESCAPE
};

Chunker::Chunker(ParseOptions options) : options_(options) {}
explicit Lexer(const ParseOptions& options) : options_(options) {
DCHECK_EQ(quoting, options_.quoting);
DCHECK_EQ(escaping, options_.escaping);
}

// NOTE: cvsmonkey (https://github.com/dw/csvmonkey) has optimization ideas
const char* ReadLine(const char* data, const char* data_end) {
// The parsing state machine
char c;
DCHECK_GT(data_end - data, 0);
if (ARROW_PREDICT_TRUE(state_ == FIELD_START)) {
goto FieldStart;
}
switch (state_) {
case FIELD_START:
goto FieldStart;
case IN_FIELD:
goto InField;
case AT_ESCAPE:
goto AtEscape;
case IN_QUOTED_FIELD:
goto InQuotedField;
case AT_QUOTED_QUOTE:
goto AtQuotedQuote;
case AT_QUOTED_ESCAPE:
goto AtQuotedEscape;
}

template <bool quoting, bool escaping>
inline const char* Chunker::ReadLine(const char* data, const char* data_end) {
DCHECK_EQ(quoting, options_.quoting);
DCHECK_EQ(escaping, options_.escaping);
FieldStart:
// At the start of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = FIELD_START;
goto AbortLine;
}
// Quoting is only recognized at start of field
if (quoting && *data == options_.quote_char) {
data++;
goto InQuotedField;
} else {
goto InField;
}

// The parsing state machine
char c;
InField:
// Inside a non-quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = IN_FIELD;
goto AbortLine;
}
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = AT_ESCAPE;
goto AbortLine;
}
data++;
goto InField;
}
if (ARROW_PREDICT_FALSE(c == '\r')) {
if (ARROW_PREDICT_TRUE(data != data_end) && *data == '\n') {
data++;
}
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == '\n')) {
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
goto FieldEnd;
}
goto InField;

FieldStart:
// At the start of a field
// Quoting is only recognized at start of field
if (quoting && ARROW_PREDICT_TRUE(data != data_end) && *data == options_.quote_char) {
AtEscape:
// Coming here if last block ended on a non-quoted escape
data++;
goto InQuotedField;
} else {
goto InField;
}

InField:
// Inside a non-quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
InQuotedField:
// Inside a quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = IN_QUOTED_FIELD;
goto AbortLine;
}
data++;
goto InField;
}
if (ARROW_PREDICT_FALSE(c == '\r')) {
if (ARROW_PREDICT_TRUE(data != data_end) && *data == '\n') {
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = AT_QUOTED_ESCAPE;
goto AbortLine;
}
data++;
goto InQuotedField;
}
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == '\n')) {
goto LineEnd;
}
if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
goto FieldEnd;
}
goto InField;

InQuotedField:
// Inside a quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
if (escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (data == data_end) {
goto AbortLine;
if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
state_ = AT_QUOTED_QUOTE;
goto AbortLine;
}
if (options_.double_quote && *data == options_.quote_char) {
// Double-quoting
data++;
} else {
// End of single-quoting
goto InField;
}
}
goto InQuotedField;

AtQuotedEscape:
// Coming here if last block ended on a quoted escape
data++;
goto InQuotedField;
}
if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
if (options_.double_quote && data != data_end && *data == options_.quote_char) {

AtQuotedQuote:
// Coming here if last block ended on a quoted quote
if (options_.double_quote && *data == options_.quote_char) {
// Double-quoting
data++;
goto InQuotedField;
} else {
// End of single-quoting
goto InField;
}
}
goto InQuotedField;

FieldEnd:
// At the end of a field
goto FieldStart;
FieldEnd:
// At the end of a field
goto FieldStart;

LineEnd:
return data;
LineEnd:
return data;

AbortLine:
// Truncated line at end of block
return nullptr;
}
AbortLine:
// Truncated line
return nullptr;
}

protected:
const ParseOptions& options_;
State state_ = FIELD_START;
};

// A BoundaryFinder implementation that assumes CSV cells can contain raw newlines,
// and uses actual CSV lexing to delimit them.
template <bool quoting, bool escaping>
Status Chunker::ProcessSpecialized(const char* start, uint32_t size, uint32_t* out_size) {
DCHECK_EQ(quoting, options_.quoting);
DCHECK_EQ(escaping, options_.escaping);
class LexingBoundaryFinder : public BoundaryFinder {
public:
explicit LexingBoundaryFinder(ParseOptions options) : options_(std::move(options)) {}

Status FindFirst(util::string_view partial, util::string_view block,
int64_t* out_pos) override {
Lexer<quoting, escaping> lexer(options_);

const char* data = start;
const char* data_end = start + size;
const char* line_end =
lexer.ReadLine(partial.data(), partial.data() + partial.size());
DCHECK_EQ(line_end, nullptr); // Otherwise `partial` is a whole CSV line
line_end = lexer.ReadLine(block.data(), block.data() + block.size());

while (data < data_end) {
const char* line_end = ReadLine<quoting, escaping>(data, data_end);
if (line_end == nullptr) {
// Cannot read any further
break;
// No complete CSV line
*out_pos = -1;
} else {
*out_pos = static_cast<int64_t>(line_end - block.data());
DCHECK_GT(*out_pos, 0);
}
data = line_end;
return Status::OK();
}
*out_size = static_cast<uint32_t>(data - start);
return Status::OK();
}

Status Chunker::Process(const char* start, uint32_t size, uint32_t* out_size) {
if (!options_.newlines_in_values) {
// In newlines are not accepted in CSV values, we can simply search for
// the last newline character.
// For common block sizes and CSV row sizes, this avoids reading
// most of the data block, making the chunker extremely fast compared
// to the rest of the CSV reading pipeline.
const char* nl = FindNewlineReverse(start, size);
if (nl == nullptr) {
*out_size = 0;
Status FindLast(util::string_view block, int64_t* out_pos) override {
Lexer<quoting, escaping> lexer(options_);

const char* data = block.data();
const char* const data_end = block.data() + block.size();

while (data < data_end) {
const char* line_end = lexer.ReadLine(data, data_end);
if (line_end == nullptr) {
// Cannot read any further
break;
}
DCHECK_GT(line_end, data);
data = line_end;
}
if (data == block.data()) {
// No complete CSV line
*out_pos = -1;
} else {
*out_size = static_cast<uint32_t>(nl - start + 1);
*out_pos = static_cast<int64_t>(data - block.data());
DCHECK_GT(*out_pos, 0);
}
return Status::OK();
}

if (options_.quoting) {
if (options_.escaping) {
return ProcessSpecialized<true, true>(start, size, out_size);
} else {
return ProcessSpecialized<true, false>(start, size, out_size);
}
protected:
ParseOptions options_;
};

} // namespace

std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) {
std::shared_ptr<BoundaryFinder> delimiter;
if (!options.newlines_in_values) {
delimiter = MakeNewlineBoundaryFinder();
} else {
if (options_.escaping) {
return ProcessSpecialized<false, true>(start, size, out_size);
if (options.quoting) {
if (options.escaping) {
delimiter = std::make_shared<LexingBoundaryFinder<true, true>>(options);
} else {
delimiter = std::make_shared<LexingBoundaryFinder<true, false>>(options);
}
} else {
return ProcessSpecialized<false, false>(start, size, out_size);
if (options.escaping) {
delimiter = std::make_shared<LexingBoundaryFinder<false, true>>(options);
} else {
delimiter = std::make_shared<LexingBoundaryFinder<false, false>>(options);
}
}
}
return internal::make_unique<Chunker>(std::move(delimiter));
}

} // namespace csv
Expand Down
Loading