diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc index 85e527c8bda..dae06d5bf61 100644 --- a/cpp/src/arrow/json/reader.cc +++ b/cpp/src/arrow/json/reader.cc @@ -42,11 +42,198 @@ namespace arrow { using std::string_view; using internal::checked_cast; +using internal::Executor; using internal::GetCpuThreadPool; using internal::TaskGroup; using internal::ThreadPool; namespace json { +namespace { + +struct ChunkedBlock { + std::shared_ptr partial; + std::shared_ptr completion; + std::shared_ptr whole; + int64_t index = -1; +}; + +struct DecodedBlock { + std::shared_ptr record_batch; + int64_t num_bytes = 0; +}; + +} // namespace +} // namespace json + +template <> +struct IterationTraits { + static json::ChunkedBlock End() { return json::ChunkedBlock{}; } + static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; } +}; + +template <> +struct IterationTraits { + static json::DecodedBlock End() { return json::DecodedBlock{}; } + static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; } +}; + +namespace json { +namespace { + +// Holds related parameters for parsing and type conversion +class DecodeContext { + public: + explicit DecodeContext(MemoryPool* pool) + : DecodeContext(ParseOptions::Defaults(), pool) {} + explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(), + MemoryPool* pool = default_memory_pool()) + : pool_(pool) { + SetParseOptions(std::move(options)); + } + + void SetParseOptions(ParseOptions options) { + parse_options_ = std::move(options); + if (parse_options_.explicit_schema) { + conversion_type_ = struct_(parse_options_.explicit_schema->fields()); + } else { + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + conversion_type_ = struct_({}); + } + promotion_graph_ = + parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType + ? GetPromotionGraph() + : nullptr; + } + + void SetSchema(std::shared_ptr explicit_schema, + UnexpectedFieldBehavior unexpected_field_behavior) { + parse_options_.explicit_schema = std::move(explicit_schema); + parse_options_.unexpected_field_behavior = unexpected_field_behavior; + SetParseOptions(std::move(parse_options_)); + } + void SetSchema(std::shared_ptr explicit_schema) { + SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior); + } + // Set the schema but ensure unexpected fields won't be accepted + void SetStrictSchema(std::shared_ptr explicit_schema) { + auto unexpected_field_behavior = parse_options_.unexpected_field_behavior; + if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) { + unexpected_field_behavior = UnexpectedFieldBehavior::Error; + } + SetSchema(std::move(explicit_schema), unexpected_field_behavior); + } + + [[nodiscard]] MemoryPool* pool() const { return pool_; } + [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; } + [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; } + [[nodiscard]] const std::shared_ptr& conversion_type() const { + return conversion_type_; + } + + private: + ParseOptions parse_options_; + std::shared_ptr conversion_type_; + const PromotionGraph* promotion_graph_; + MemoryPool* pool_; +}; + +Result> ParseBlock(const ChunkedBlock& block, + const ParseOptions& parse_options, + MemoryPool* pool, int64_t* out_size = nullptr) { + std::unique_ptr parser; + RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser)); + + int64_t size = block.partial->size() + block.completion->size() + block.whole->size(); + RETURN_NOT_OK(parser->ReserveScalarStorage(size)); + + if (block.partial->size() || block.completion->size()) { + std::shared_ptr straddling; + if (!block.completion->size()) { + straddling = block.partial; + } else if (!block.partial->size()) { + straddling = block.completion; + } else { + ARROW_ASSIGN_OR_RAISE(straddling, + ConcatenateBuffers({block.partial, block.completion}, pool)); + } + RETURN_NOT_OK(parser->Parse(straddling)); + } + if (block.whole->size()) { + RETURN_NOT_OK(parser->Parse(block.whole)); + } + + std::shared_ptr parsed; + RETURN_NOT_OK(parser->Finish(&parsed)); + + if (out_size) *out_size = size; + + return parsed; +} + +class ChunkingTransformer { + public: + explicit ChunkingTransformer(std::unique_ptr chunker) + : chunker_(std::move(chunker)) {} + + template + static Transformer, ChunkedBlock> Make(Args&&... args) { + return [self = std::make_shared(std::forward(args)...)]( + std::shared_ptr buffer) { return (*self)(std::move(buffer)); }; + } + + private: + Result> operator()(std::shared_ptr next_buffer) { + if (!buffer_) { + if (ARROW_PREDICT_TRUE(!next_buffer)) { + DCHECK_EQ(partial_, nullptr) << "Logic error: non-null partial with null buffer"; + return TransformFinish(); + } + partial_ = std::make_shared(""); + buffer_ = std::move(next_buffer); + return TransformSkip(); + } + DCHECK_NE(partial_, nullptr); + + std::shared_ptr whole, completion, next_partial; + if (!next_buffer) { + // End of file reached => compute completion from penultimate block + RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &whole)); + } else { + std::shared_ptr starts_with_whole; + // Get completion of partial from previous block. + RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, &completion, + &starts_with_whole)); + // Get all whole objects entirely inside the current buffer + RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial)); + } + + buffer_ = std::move(next_buffer); + return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial), + std::move(completion), std::move(whole), + index_++}); + } + + std::unique_ptr chunker_; + std::shared_ptr partial_; + std::shared_ptr buffer_; + int64_t index_ = 0; +}; + +template +Iterator MakeChunkingIterator(Iterator> source, + Args&&... args) { + return MakeTransformedIterator(std::move(source), + ChunkingTransformer::Make(std::forward(args)...)); +} + +// NOTE: Not reentrant. Incoming buffers are processed sequentially and the transformer's +// internal state gets updated on each call. +template +AsyncGenerator MakeChunkingGenerator( + AsyncGenerator> source, Args&&... args) { + return MakeTransformedGenerator(std::move(source), + ChunkingTransformer::Make(std::forward(args)...)); +} class TableReaderImpl : public TableReader, public std::enable_shared_from_this { @@ -54,120 +241,262 @@ class TableReaderImpl : public TableReader, TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options, const ParseOptions& parse_options, std::shared_ptr task_group) - : pool_(pool), + : decode_context_(parse_options, pool), read_options_(read_options), - parse_options_(parse_options), - chunker_(MakeChunker(parse_options_)), task_group_(std::move(task_group)) {} Status Init(std::shared_ptr input) { ARROW_ASSIGN_OR_RAISE(auto it, io::MakeInputStreamIterator(input, read_options_.block_size)); return MakeReadaheadIterator(std::move(it), task_group_->parallelism()) - .Value(&block_iterator_); + .Value(&buffer_iterator_); } Result> Read() override { - RETURN_NOT_OK(MakeBuilder()); - - ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next()); - if (block == nullptr) { + auto block_it = MakeChunkingIterator(std::move(buffer_iterator_), + MakeChunker(decode_context_.parse_options())); + + bool did_read = false; + while (true) { + ARROW_ASSIGN_OR_RAISE(auto block, block_it.Next()); + if (IsIterationEnd(block)) break; + if (!did_read) { + did_read = true; + RETURN_NOT_OK(MakeBuilder()); + } + task_group_->Append( + [self = shared_from_this(), block] { return self->ParseAndInsert(block); }); + } + if (!did_read) { return Status::Invalid("Empty JSON file"); } - auto self = shared_from_this(); - auto empty = std::make_shared(""); + std::shared_ptr array; + RETURN_NOT_OK(builder_->Finish(&array)); + return Table::FromChunkedStructArray(array); + } - int64_t block_index = 0; - std::shared_ptr partial = empty; + private: + Status MakeBuilder() { + return MakeChunkedArrayBuilder(task_group_, decode_context_.pool(), + decode_context_.promotion_graph(), + decode_context_.conversion_type(), &builder_); + } - while (block != nullptr) { - std::shared_ptr next_block, whole, completion, next_partial; + Status ParseAndInsert(const ChunkedBlock& block) { + ARROW_ASSIGN_OR_RAISE(auto parsed, ParseBlock(block, decode_context_.parse_options(), + decode_context_.pool())); + builder_->Insert(block.index, field("", parsed->type()), parsed); + return Status::OK(); + } - ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next()); + DecodeContext decode_context_; + ReadOptions read_options_; + std::shared_ptr task_group_; + Iterator> buffer_iterator_; + std::shared_ptr builder_; +}; - if (next_block == nullptr) { - // End of file reached => compute completion from penultimate block - RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole)); - } else { - std::shared_ptr starts_with_whole; - // Get completion of partial from previous block. - RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion, - &starts_with_whole)); +// Callable object for parsing/converting individual JSON blocks. The class itself can be +// called concurrently but reads from the `DecodeContext` aren't synchronized +class DecodingOperator { + public: + explicit DecodingOperator(std::shared_ptr context) + : context_(std::move(context)) {} - // Get all whole objects entirely inside the current buffer - RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial)); - } + Result operator()(const ChunkedBlock& block) const { + int64_t num_bytes; + ARROW_ASSIGN_OR_RAISE(auto unconverted, ParseBlock(block, context_->parse_options(), + context_->pool(), &num_bytes)); - // Launch parse task - task_group_->Append([self, partial, completion, whole, block_index] { - return self->ParseAndInsert(partial, completion, whole, block_index); - }); - block_index++; + std::shared_ptr builder; + RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), context_->pool(), + context_->promotion_graph(), + context_->conversion_type(), &builder)); + builder->Insert(0, field("", unconverted->type()), unconverted); - partial = next_partial; - block = next_block; - } + std::shared_ptr chunked; + RETURN_NOT_OK(builder->Finish(&chunked)); + ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0))); - std::shared_ptr array; - RETURN_NOT_OK(builder_->Finish(&array)); - return Table::FromChunkedStructArray(array); + return DecodedBlock{std::move(batch), num_bytes}; } private: - Status MakeBuilder() { - auto type = parse_options_.explicit_schema - ? struct_(parse_options_.explicit_schema->fields()) - : struct_({}); + std::shared_ptr context_; +}; - auto promotion_graph = - parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType - ? GetPromotionGraph() - : nullptr; +// Reads from a source iterator, completes the subsequent decode task on the calling +// thread. This is only really used for compatibility with the async pipeline when CPU +// threading is disabled +AsyncGenerator MakeDecodingGenerator( + Iterator source, + std::function(const ChunkedBlock&)> decoder) { + struct State { + Iterator source; + std::function(const ChunkedBlock&)> decoder; + } state{std::move(source), std::move(decoder)}; + return [state = std::make_shared(std::move(state))] { + auto maybe_block = state->source.Next(); + if (!maybe_block.ok()) { + return Future::MakeFinished(maybe_block.status()); + } + const auto& block = maybe_block.ValueUnsafe(); + if (IsIterationEnd(block)) { + return ToFuture(IterationEnd()); + } + return ToFuture(state->decoder(block)); + }; +} + +class StreamingReaderImpl : public StreamingReader { + public: + StreamingReaderImpl(DecodedBlock first_block, AsyncGenerator source, + const std::shared_ptr& context, int max_readahead) + : first_block_(std::move(first_block)), + schema_(first_block_->record_batch->schema()), + bytes_processed_(std::make_shared>(0)) { + // Set the final schema for future invocations of the source generator + context->SetStrictSchema(schema_); + if (max_readahead > 0) { + source = MakeReadaheadGenerator(std::move(source), max_readahead); + } + generator_ = MakeMappedGenerator( + std::move(source), [counter = bytes_processed_](const DecodedBlock& out) { + counter->fetch_add(out.num_bytes); + return out.record_batch; + }); + } - return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_); - } - - Status ParseAndInsert(const std::shared_ptr& partial, - const std::shared_ptr& completion, - const std::shared_ptr& whole, int64_t block_index) { - std::unique_ptr parser; - RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser)); - RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() + - whole->size())); - - if (partial->size() != 0 || completion->size() != 0) { - std::shared_ptr straddling; - if (partial->size() == 0) { - straddling = completion; - } else if (completion->size() == 0) { - straddling = partial; - } else { - ARROW_ASSIGN_OR_RAISE(straddling, - ConcatenateBuffers({partial, completion}, pool_)); + static Future> MakeAsync( + std::shared_ptr context, std::shared_ptr stream, + io::IOContext io_context, Executor* cpu_executor, const ReadOptions& read_options) { + ARROW_ASSIGN_OR_RAISE( + auto buffer_it, + io::MakeInputStreamIterator(std::move(stream), read_options.block_size)); + ARROW_ASSIGN_OR_RAISE( + auto buffer_gen, + MakeBackgroundGenerator(std::move(buffer_it), io_context.executor())); + + AsyncGenerator decoding_gen; + int max_readahead = 0; + if (read_options.use_threads) { + // Prepare a source generator capable of async-reentrancy and parallel execution + if (!cpu_executor) { + cpu_executor = GetCpuThreadPool(); } - RETURN_NOT_OK(parser->Parse(straddling)); + max_readahead = cpu_executor->GetCapacity(); + + // Since the chunking/decoding steps are heavy we want to schedule them as a + // separate task so as to maximize task distribution across CPU cores + // + // TODO: Add an `always_transfer` parameter to `MakeTransferredGenerator`? + buffer_gen = [source = std::move(buffer_gen), cpu_executor] { + return cpu_executor->TransferAlways(source()); + }; + auto chunking_gen = MakeChunkingGenerator(std::move(buffer_gen), + MakeChunker(context->parse_options())); + + // At this stage, we want to allow the decoding tasks for each chunked block to run + // in parallel on the CPU executor. However: + // - Chunking is inherently serial and not thread-safe + // - The chunking generator is not async-reentrant, won't play well with readahead + // + // Fortunately, `MappingGenerator` queues pending jobs and keeps only one future + // from its source active at a time - which takes care of those concerns. In + // addition, it will start the next job within the continuation of the previous one, + // but before invoking its map function (in our case, `DecodingOperator`). This + // allows for decoding tasks to gradually saturate multiple CPU cores over multiple + // iterations. At a high level, this is how the full pipeline would operate in cases + // where decoding tasks are disproportionately expensive: + // + // -------------------------------------------------------------------------- + // Reading: IoThread(?) --> Chunking: CpuThread(0) ... Decoding: CpuThread(0) + // -------------------------------------------------------------------------- + // Decoding: CpuThread(0) + // Reading: IoThread(?) --> Chunking: CpuThread(1) ... Decoding: CpuThread(1) + // -------------------------------------------------------------------------- + // Decoding: CpuThread(0) + // Decoding: CpuThread(1) + // Reading: IoThread(?) --> Chunking: CpuThread(2) ... Decoding: CpuThread(2) + // -------------------------------------------------------------------------- + // + // Remember that we should already be on the CPU executor following chunking, so the + // decoding task simply continues to use that thread rather than spawning a new one. + decoding_gen = + MakeMappedGenerator(std::move(chunking_gen), DecodingOperator(context)); + } else { + buffer_gen = MakeTransferredGenerator(std::move(buffer_gen), io_context.executor()); + // We convert the background generator back to an iterator so its work can remain on + // the IO pool while we process its buffers on the calling thread + auto chunking_it = + MakeChunkingIterator(MakeGeneratorIterator(std::move(buffer_gen)), + MakeChunker(context->parse_options())); + decoding_gen = + MakeDecodingGenerator(std::move(chunking_it), DecodingOperator(context)); } - if (whole->size() != 0) { - RETURN_NOT_OK(parser->Parse(whole)); + return FirstBlock(decoding_gen) + .Then([source = std::move(decoding_gen), context = std::move(context), + max_readahead](const DecodedBlock& block) { + return std::make_shared(block, std::move(source), context, + max_readahead); + }); + } + + [[nodiscard]] std::shared_ptr schema() const override { return schema_; } + + Status ReadNext(std::shared_ptr* out) override { + auto result = ReadNextAsync().result(); + return std::move(result).Value(out); + } + + Future> ReadNextAsync() override { + // On the first call, return the batch we used for initialization + if (ARROW_PREDICT_FALSE(first_block_)) { + bytes_processed_->fetch_add(first_block_->num_bytes); + auto batch = std::exchange(first_block_, std::nullopt)->record_batch; + return ToFuture(std::move(batch)); } + return generator_(); + } - std::shared_ptr parsed; - RETURN_NOT_OK(parser->Finish(&parsed)); - builder_->Insert(block_index, field("", parsed->type()), parsed); - return Status::OK(); + [[nodiscard]] int64_t bytes_processed() const override { + return bytes_processed_->load(); } - MemoryPool* pool_; - ReadOptions read_options_; - ParseOptions parse_options_; - std::unique_ptr chunker_; - std::shared_ptr task_group_; - Iterator> block_iterator_; - std::shared_ptr builder_; + private: + static Future FirstBlock(AsyncGenerator gen) { + // Read from the stream until we get a non-empty record batch that we can use to + // declare the schema. Along the way, accumulate the bytes read so they can be + // recorded on the first `ReadNextAsync` + auto loop_body = + [gen = std::move(gen), + out = std::make_shared()]() -> Future> { + return gen().Then( + [out](const DecodedBlock& block) -> Result> { + if (IsIterationEnd(block)) { + return Status::Invalid("Empty JSON stream"); + } + out->num_bytes += block.num_bytes; + if (block.record_batch->num_rows() == 0) { + return Continue(); + } + out->record_batch = block.record_batch; + return Break(*out); + }); + }; + return Loop(std::move(loop_body)); + } + + std::optional first_block_; + std::shared_ptr schema_; + std::shared_ptr> bytes_processed_; + AsyncGenerator> generator_; }; +} // namespace + Result> TableReader::Make( MemoryPool* pool, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options) { @@ -183,35 +512,47 @@ Result> TableReader::Make( return ptr; } +Future> StreamingReader::MakeAsync( + std::shared_ptr stream, const ReadOptions& read_options, + const ParseOptions& parse_options, const io::IOContext& io_context, + Executor* cpu_executor) { + auto future = StreamingReaderImpl::MakeAsync( + std::make_shared(parse_options, io_context.pool()), + std::move(stream), io_context, cpu_executor, read_options); + return future.Then([](const std::shared_ptr& reader) { + return std::static_pointer_cast(reader); + }); +} + +Result> StreamingReader::Make( + std::shared_ptr stream, const ReadOptions& read_options, + const ParseOptions& parse_options, const io::IOContext& io_context, + Executor* cpu_executor) { + auto future = + MakeAsync(std::move(stream), read_options, parse_options, io_context, cpu_executor); + return future.result(); +} + Result> ParseOne(ParseOptions options, std::shared_ptr json) { + DecodeContext context(std::move(options)); + std::unique_ptr parser; - RETURN_NOT_OK(BlockParser::Make(options, &parser)); + RETURN_NOT_OK(BlockParser::Make(context.parse_options(), &parser)); RETURN_NOT_OK(parser->Parse(json)); std::shared_ptr parsed; RETURN_NOT_OK(parser->Finish(&parsed)); - auto type = - options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({}); - auto promotion_graph = - options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType - ? GetPromotionGraph() - : nullptr; std::shared_ptr builder; - RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), default_memory_pool(), - promotion_graph, type, &builder)); + RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), context.pool(), + context.promotion_graph(), + context.conversion_type(), &builder)); - builder->Insert(0, field("", type), parsed); + builder->Insert(0, field("", context.conversion_type()), parsed); std::shared_ptr converted_chunked; RETURN_NOT_OK(builder->Finish(&converted_chunked)); - const auto& converted = checked_cast(*converted_chunked->chunk(0)); - std::vector> columns(converted.num_fields()); - for (int i = 0; i < converted.num_fields(); ++i) { - columns[i] = converted.field(i); - } - return RecordBatch::Make(schema(converted.type()->fields()), converted.length(), - std::move(columns)); + return RecordBatch::FromStructArray(converted_chunked->chunk(0)); } } // namespace json diff --git a/cpp/src/arrow/json/reader.h b/cpp/src/arrow/json/reader.h index 3374931a043..7776cb0b7d8 100644 --- a/cpp/src/arrow/json/reader.h +++ b/cpp/src/arrow/json/reader.h @@ -19,25 +19,16 @@ #include +#include "arrow/io/type_fwd.h" #include "arrow/json/options.h" +#include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/macros.h" +#include "arrow/util/type_fwd.h" #include "arrow/util/visibility.h" namespace arrow { - -class Buffer; -class MemoryPool; -class Table; -class RecordBatch; -class Array; -class DataType; - -namespace io { -class InputStream; -} // namespace io - namespace json { /// A class that reads an entire JSON file into a Arrow Table @@ -60,5 +51,68 @@ class ARROW_EXPORT TableReader { ARROW_EXPORT Result> ParseOne(ParseOptions options, std::shared_ptr json); +/// \brief A class that reads a JSON file incrementally +/// +/// JSON data is read from a stream in fixed-size blocks (configurable with +/// `ReadOptions::block_size`). Each block is converted to a `RecordBatch`. Yielded +/// batches have a consistent schema but may differ in row count. +/// +/// The supplied `ParseOptions` are used to determine a schema, based either on a +/// provided explicit schema or inferred from the first non-empty block. +/// Afterwards, the target schema is frozen. If `UnexpectedFieldBehavior::InferType` is +/// specified, unexpected fields will only be inferred for the first block. Afterwards +/// they'll be treated as errors. +/// +/// If `ReadOptions::use_threads` is `true`, each block's parsing/decoding task will be +/// parallelized on the given `cpu_executor` (with readahead corresponding to the +/// executor's capacity). If an executor isn't provided, the global thread pool will be +/// used. +/// +/// If `ReadOptions::use_threads` is `false`, computations will be run on the calling +/// thread and `cpu_executor` will be ignored. +class ARROW_EXPORT StreamingReader : public RecordBatchReader { + public: + virtual ~StreamingReader() = default; + + /// \brief Read the next `RecordBatch` asynchronously + /// This function is async-reentrant (but not synchronously reentrant). However, if + /// threading is disabled, this will block until completion. + virtual Future> ReadNextAsync() = 0; + + /// Get the number of bytes which have been succesfully converted to record batches + /// and consumed + [[nodiscard]] virtual int64_t bytes_processed() const = 0; + + /// \brief Create a `StreamingReader` from an `InputStream` + /// Blocks until the initial batch is loaded + /// + /// \param[in] stream JSON source stream + /// \param[in] read_options Options for reading + /// \param[in] parse_options Options for chunking, parsing, and conversion + /// \param[in] io_context Context for IO operations (optional) + /// \param[in] cpu_executor Executor for computation tasks (optional) + /// \return The initialized reader + static Result> Make( + std::shared_ptr stream, const ReadOptions& read_options, + const ParseOptions& parse_options, + const io::IOContext& io_context = io::default_io_context(), + ::arrow::internal::Executor* cpu_executor = NULLPTR); + + /// \brief Create a `StreamingReader` from an `InputStream` asynchronously + /// Returned future completes after loading the first batch + /// + /// \param[in] stream JSON source stream + /// \param[in] read_options Options for reading + /// \param[in] parse_options Options for chunking, parsing, and conversion + /// \param[in] io_context Context for IO operations (optional) + /// \param[in] cpu_executor Executor for computation tasks (optional) + /// \return Future for the initialized reader + static Future> MakeAsync( + std::shared_ptr stream, const ReadOptions& read_options, + const ParseOptions& parse_options, + const io::IOContext& io_context = io::default_io_context(), + ::arrow::internal::Executor* cpu_executor = NULLPTR); +}; + } // namespace json } // namespace arrow diff --git a/cpp/src/arrow/json/reader_test.cc b/cpp/src/arrow/json/reader_test.cc index 83f5956a64c..7758da29a52 100644 --- a/cpp/src/arrow/json/reader_test.cc +++ b/cpp/src/arrow/json/reader_test.cc @@ -19,15 +19,20 @@ #include #include +#include #include #include "arrow/io/interfaces.h" +#include "arrow/io/slow.h" #include "arrow/json/options.h" #include "arrow/json/reader.h" #include "arrow/json/test_common.h" #include "arrow/table.h" +#include "arrow/testing/async_test_util.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/type_fwd.h" +#include "arrow/util/vector.h" namespace arrow { namespace json { @@ -320,5 +325,581 @@ TEST(ReaderTest, FailOnInvalidEOF) { } } +class StreamingReaderTestBase { + public: + virtual ~StreamingReaderTestBase() = default; + + protected: + static std::shared_ptr MakeTestStream(const std::string& str) { + auto buffer = std::make_shared(str); + return std::make_shared(std::move(buffer)); + } + // Stream with simulated latency + static std::shared_ptr MakeTestStream(const std::string& str, + double latency) { + return std::make_shared(MakeTestStream(str), latency); + } + + Result> MakeReader( + std::shared_ptr stream) { + return StreamingReader::Make(std::move(stream), read_options_, parse_options_, + io_context_, executor_); + } + template + Result> MakeReader(Args&&... args) { + return MakeReader(MakeTestStream(std::forward(args)...)); + } + + AsyncGenerator> MakeGenerator( + std::shared_ptr reader) { + return [reader = std::move(reader)] { return reader->ReadNextAsync(); }; + } + template + Result>> MakeGenerator(Args&&... args) { + ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward(args)...)); + return MakeGenerator(std::move(reader)); + } + + static void AssertReadNext(const std::shared_ptr& reader, + std::shared_ptr* out) { + ASSERT_OK(reader->ReadNext(out)); + ASSERT_FALSE(IsIterationEnd(*out)); + ASSERT_OK((**out).ValidateFull()); + } + static void AssertReadEnd(const std::shared_ptr& reader) { + std::shared_ptr out; + ASSERT_OK(reader->ReadNext(&out)); + ASSERT_TRUE(IsIterationEnd(out)); + } + + static void AssertBatchSequenceEquals(const RecordBatchVector& expected_batches, + const RecordBatchVector& sequence) { + ASSERT_OK_AND_ASSIGN(auto expected_table, Table::FromRecordBatches(expected_batches)); + ASSERT_OK(expected_table->ValidateFull()); + + auto first_null = std::find(sequence.cbegin(), sequence.cend(), nullptr); + for (auto it = first_null; it != sequence.cend(); ++it) { + ASSERT_EQ(*it, nullptr); + } + + RecordBatchVector batches(sequence.cbegin(), first_null); + EXPECT_EQ(batches.size(), expected_batches.size()); + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches)); + ASSERT_OK(table->ValidateFull()); + ASSERT_TABLES_EQUAL(*expected_table, *table); + } + + struct TestCase { + std::string json; + int json_size; + int block_size; + int num_rows; + int num_batches; + std::shared_ptr schema; + RecordBatchVector batches; + }; + + // Creates a test case from valid JSON objects with a human-readable index field and a + // struct field of random data. `block_size_multiplier` is applied to the largest + // generated row length to determine the target block_size. i.e - higher multiplier + // means fewer batches + static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) { + FieldVector data_fields = {field("s", utf8()), field("f", float64()), + field("b", boolean())}; + FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))}; + TestCase out; + out.schema = schema(fields); + out.num_rows = num_rows; + + constexpr int kSeed = 0x432432; + std::default_random_engine engine(kSeed); + std::vector rows(num_rows); + size_t max_row_size = 1; + + auto options = GenerateOptions::Defaults(); + options.null_probability = 0; + for (int i = 0; i < num_rows; ++i) { + StringBuffer string_buffer; + Writer writer(string_buffer); + ABORT_NOT_OK(Generate(data_fields, engine, &writer, options)); + std::string json = string_buffer.GetString(); + rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"}); + max_row_size = std::max(max_row_size, rows[i].size()); + } + + auto block_size = static_cast(max_row_size * block_size_multiplier); + // Deduce the expected record batches from the target block size. + std::vector batch_rows; + size_t pos = 0; + for (const auto& row : rows) { + pos += row.size(); + if (pos > block_size) { + out.batches.push_back( + RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"}))); + batch_rows.clear(); + pos -= block_size; + } + batch_rows.push_back(row); + out.json += row; + } + if (!batch_rows.empty()) { + out.batches.push_back( + RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"}))); + } + + out.json_size = static_cast(out.json.size()); + out.block_size = static_cast(block_size); + out.num_batches = static_cast(out.batches.size()); + + return out; + } + + static std::string Join(const std::vector& strings, + const std::string& delim = "", bool trailing_delim = false) { + std::string out; + for (size_t i = 0; i < strings.size();) { + out += strings[i++]; + if (i != strings.size() || trailing_delim) { + out += delim; + } + } + return out; + } + + internal::Executor* executor_ = nullptr; + ParseOptions parse_options_ = ParseOptions::Defaults(); + ReadOptions read_options_ = ReadOptions::Defaults(); + io::IOContext io_context_ = io::default_io_context(); +}; + +class AsyncStreamingReaderTest : public StreamingReaderTestBase, public ::testing::Test { + protected: + void SetUp() override { read_options_.use_threads = true; } +}; + +class StreamingReaderTest : public StreamingReaderTestBase, + public ::testing::TestWithParam { + protected: + void SetUp() override { read_options_.use_threads = GetParam(); } +}; + +INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest, + ::testing::Values(false, true)); + +TEST_P(StreamingReaderTest, ErrorOnEmptyStream) { + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: Empty JSON stream"), MakeReader("")); + std::string data(100, '\n'); + for (auto block_size : {25, 49, 50, 100, 200}) { + read_options_.block_size = block_size; + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: Empty JSON stream"), MakeReader(data)); + } +} + +TEST_P(StreamingReaderTest, PropagateChunkingErrors) { + constexpr double kIoLatency = 1e-3; + + auto test_schema = schema({field("i", int64())}); + // Object straddles multiple blocks + auto bad_first_chunk = Join( + { + R"({"i": 0 })", + R"({"i": 1})", + }, + "\n"); + auto bad_middle_chunk = Join( + { + R"({"i": 0})", + R"({"i": 1})", + R"({"i": 2})", + }, + "\n"); + + read_options_.block_size = 10; + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::StartsWith("Invalid: straddling object straddles two block boundaries"), + MakeReader(bad_first_chunk)); + + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency)); + + std::shared_ptr batch; + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_processed(), 9); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::StartsWith("Invalid: straddling object straddles two block boundaries"), + reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_processed(), 9); + AssertReadEnd(reader); + AssertReadEnd(reader); + EXPECT_EQ(reader->bytes_processed(), 9); +} + +TEST_P(StreamingReaderTest, PropagateParsingErrors) { + auto test_schema = schema({field("n", int64())}); + auto bad_first_block = Join( + { + R"({"n": })", + R"({"n": 10000})", + }, + "\n"); + auto bad_first_block_after_empty = Join( + { + R"( )", + R"({"n": })", + R"({"n": 10000})", + }, + "\n"); + auto bad_middle_block = Join( + { + R"({"n": 10000})", + R"({"n": 200 0})", + R"({"n": 30000})", + }, + "\n"); + + read_options_.block_size = 16; + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: JSON parse error: Invalid value"), + MakeReader(bad_first_block)); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: JSON parse error: Invalid value"), + MakeReader(bad_first_block_after_empty)); + + std::shared_ptr batch; + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block)); + EXPECT_EQ(reader->bytes_processed(), 0); + AssertSchemaEqual(reader->schema(), test_schema); + + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_processed(), 13); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::StartsWith( + "Invalid: JSON parse error: Missing a comma or '}' after an object member"), + reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_processed(), 13); + AssertReadEnd(reader); + EXPECT_EQ(reader->bytes_processed(), 13); +} + +TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) { + auto test_schema = schema({field("i", int64())}); + auto bad_first_block = Join( + { + R"({"i":0}{1})", + R"({"i":2})", + }, + "\n"); + auto bad_middle_blocks = Join( + { + R"({"i": 0})", + R"({"i": 1})", + R"({}"i":2})", + R"({"i": 3})", + }, + "\n"); + + std::shared_ptr batch; + std::shared_ptr reader; + Status status; + read_options_.block_size = 10; + parse_options_.newlines_in_values = true; + + ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block)); + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_processed(), 7); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch); + + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + ::testing::StartsWith("Invalid: JSON parse error"), + reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_processed(), 7); + AssertReadEnd(reader); + + ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks)); + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_processed(), 9); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch); + // Chunker doesn't require newline delimiters, so this should be valid + AssertReadNext(reader, &batch); + EXPECT_EQ(reader->bytes_processed(), 20); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), *batch); + + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + ::testing::StartsWith("Invalid: JSON parse error"), + reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_processed(), 20); + // Incoming chunker error from ":2}" shouldn't leak through after the first failure, + // which is a possibility if async tasks are still outstanding due to readahead. + AssertReadEnd(reader); + AssertReadEnd(reader); + EXPECT_EQ(reader->bytes_processed(), 20); +} + +TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) { + std::string test_json(32, '\n'); + test_json += R"({"b": true, "s": "foo"})"; + ASSERT_EQ(test_json.length(), 55); + + parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())}); + read_options_.block_size = 24; + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json)); + EXPECT_EQ(reader->bytes_processed(), 0); + + auto expected_schema = parse_options_.explicit_schema; + auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])"); + + AssertSchemaEqual(reader->schema(), expected_schema); + + std::shared_ptr actual_batch; + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_processed(), 55); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + AssertReadEnd(reader); +} + +TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) { + std::string test_json = + Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "bar", "t": "2022-01-02"})", + R"({"s": "baz", "t": "2022-01-03", "b": true})"}, + "\n"); + + FieldVector expected_fields = {field("s", utf8())}; + std::shared_ptr expected_schema = schema(expected_fields); + + parse_options_.explicit_schema = expected_schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = 48; + + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: JSON parse error: unexpected field"), + MakeReader(test_json)); + + expected_fields.push_back(field("t", utf8())); + expected_schema = schema(expected_fields); + + parse_options_.explicit_schema = expected_schema; + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json)); + AssertSchemaEqual(reader->schema(), expected_schema); + + std::shared_ptr batch; + AssertReadNext(reader, &batch); + ASSERT_BATCHES_EQUAL( + *RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])"), *batch); + EXPECT_EQ(reader->bytes_processed(), 32); + + AssertReadNext(reader, &batch); + ASSERT_BATCHES_EQUAL( + *RecordBatchFromJSON(expected_schema, R"([{"s":"bar","t":"2022-01-02"}])"), *batch); + EXPECT_EQ(reader->bytes_processed(), 64); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: JSON parse error: unexpected field"), + reader->ReadNext(&batch)); + EXPECT_EQ(reader->bytes_processed(), 64); + AssertReadEnd(reader); +} + +TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) { + std::string test_json = + Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "bar", "t": "2022-01-02"})", + R"({"s": "baz", "t": "2022-01-03", "b": true})"}, + "\n"); + + FieldVector expected_fields = {field("s", utf8()), field("t", utf8())}; + std::shared_ptr expected_schema = schema(expected_fields); + + parse_options_.explicit_schema = expected_schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore; + read_options_.block_size = 48; + + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json)); + AssertSchemaEqual(reader->schema(), expected_schema); + + std::shared_ptr batch; + AssertReadNext(reader, &batch); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":null}])"), + *batch); + EXPECT_EQ(reader->bytes_processed(), 32); + + AssertReadNext(reader, &batch); + ASSERT_BATCHES_EQUAL( + *RecordBatchFromJSON(expected_schema, R"([{"s":"bar","t":"2022-01-02"}])"), *batch); + EXPECT_EQ(reader->bytes_processed(), 64); + + AssertReadNext(reader, &batch); + ASSERT_BATCHES_EQUAL( + *RecordBatchFromJSON(expected_schema, R"([{"s":"baz","t":"2022-01-03"}])"), *batch); + EXPECT_EQ(reader->bytes_processed(), 106); + AssertReadEnd(reader); +} + +TEST_P(StreamingReaderTest, InferredSchema) { + auto test_json = Join( + { + R"({"a": 0, "b": "foo" })", + R"({"a": 1, "c": true })", + R"({"a": 2, "d": "2022-01-01"})", + }, + "\n", true); + + std::shared_ptr reader; + std::shared_ptr expected_schema; + std::shared_ptr expected_batch; + std::shared_ptr actual_batch; + + FieldVector fields = {field("a", int64()), field("b", utf8())}; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType; + parse_options_.explicit_schema = nullptr; + + // Schema derived from the first line + expected_schema = schema(fields); + + read_options_.block_size = 32; + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + AssertSchemaEqual(reader->schema(), expected_schema); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": "foo"}])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_processed(), 28); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: JSON parse error: unexpected field"), + reader->ReadNext(&actual_batch)); + + // Schema derived from the first 2 lines + fields.push_back(field("c", boolean())); + expected_schema = schema(fields); + + read_options_.block_size = 64; + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + AssertSchemaEqual(reader->schema(), expected_schema); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([ + {"a": 0, "b": "foo", "c": null}, + {"a": 1, "b": null, "c": true} + ])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_processed(), 56); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::StartsWith("Invalid: JSON parse error: unexpected field"), + reader->ReadNext(&actual_batch)); + + // Schema derived from all 3 lines + fields.push_back(field("d", timestamp(TimeUnit::SECOND))); + expected_schema = schema(fields); + + read_options_.block_size = 96; + ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json)); + AssertSchemaEqual(reader->schema(), expected_schema); + + expected_batch = RecordBatchFromJSON(expected_schema, R"([ + {"a": 0, "b": "foo", "c": null, "d": null}, + {"a": 1, "b": null, "c": true, "d": null}, + {"a": 2, "b": null, "c": null, "d": "2022-01-01"} + ])"); + AssertReadNext(reader, &actual_batch); + EXPECT_EQ(reader->bytes_processed(), 84); + ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch); + + AssertReadEnd(reader); +} + +TEST_F(AsyncStreamingReaderTest, AsyncReentrancy) { + constexpr int kNumRows = 16; + constexpr double kIoLatency = 1e-2; + + auto expected = GenerateTestCase(kNumRows); + parse_options_.explicit_schema = expected.schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = expected.block_size; + + std::vector>> futures(expected.num_batches + 2); + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency)); + EXPECT_EQ(reader->bytes_processed(), 0); + for (auto& future : futures) { + future = reader->ReadNextAsync(); + } + + ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures))); + EXPECT_EQ(reader->bytes_processed(), expected.json_size); + ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results))); + AssertBatchSequenceEquals(expected.batches, batches); +} + +TEST_F(AsyncStreamingReaderTest, FuturesOutliveReader) { + constexpr int kNumRows = 16; + constexpr double kIoLatency = 1e-2; + + auto expected = GenerateTestCase(kNumRows); + parse_options_.explicit_schema = expected.schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = expected.block_size; + + auto stream = MakeTestStream(expected.json, kIoLatency); + std::vector>> futures(expected.num_batches + 2); + { + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream)); + EXPECT_EQ(reader->bytes_processed(), 0); + for (auto& future : futures) { + future = reader->ReadNextAsync(); + } + } + + ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures))); + ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results))); + AssertBatchSequenceEquals(expected.batches, batches); +} + +TEST_F(AsyncStreamingReaderTest, StressBufferedReads) { + constexpr int kNumRows = 500; + + auto expected = GenerateTestCase(kNumRows); + parse_options_.explicit_schema = expected.schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = expected.block_size; + + std::vector>> futures(expected.num_batches + 2); + ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json)); + EXPECT_EQ(reader->bytes_processed(), 0); + for (auto& future : futures) { + future = reader->ReadNextAsync(); + } + + ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures))); + ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(results)); + AssertBatchSequenceEquals(expected.batches, batches); +} + +TEST_F(AsyncStreamingReaderTest, StressSharedIoAndCpuExecutor) { + constexpr int kNumRows = 500; + constexpr double kIoLatency = 1e-4; + + auto expected = GenerateTestCase(kNumRows); + parse_options_.explicit_schema = expected.schema; + parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error; + read_options_.block_size = expected.block_size; + + // Force the serial -> parallel pipeline to contend for a single thread + ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); + io_context_ = io::IOContext(thread_pool.get()); + executor_ = thread_pool.get(); + + ASSERT_OK_AND_ASSIGN(auto generator, MakeGenerator(expected.json, kIoLatency)); + ASSERT_FINISHES_OK_AND_ASSIGN(auto batches, CollectAsyncGenerator(generator)); + AssertBatchSequenceEquals(expected.batches, batches); +} + } // namespace json } // namespace arrow diff --git a/docs/source/cpp/api/formats.rst b/docs/source/cpp/api/formats.rst index 49fa5645f8b..264b9e4e7c6 100644 --- a/docs/source/cpp/api/formats.rst +++ b/docs/source/cpp/api/formats.rst @@ -67,6 +67,9 @@ Line-separated JSON .. doxygenclass:: arrow::json::TableReader :members: +.. doxygenclass:: arrow::json::StreamingReader + :members: + .. _cpp-api-parquet: Parquet reader diff --git a/docs/source/cpp/json.rst b/docs/source/cpp/json.rst index cdb742e6ce1..e2c55d00d10 100644 --- a/docs/source/cpp/json.rst +++ b/docs/source/cpp/json.rst @@ -24,17 +24,24 @@ Reading JSON files ================== -Arrow allows reading line-separated JSON files as Arrow tables. Each -independent JSON object in the input file is converted to a row in -the target Arrow table. +Line-separated JSON files can either be read as a single Arrow Table +with a :class:`~TableReader` or streamed as RecordBatches with a +:class:`~StreamingReader`. + +Both of these readers require an :class:`arrow::io::InputStream` instance +representing the input file. Their behavior can be customized using a +combination of :class:`~ReadOptions`, :class:`~ParseOptions`, and +other parameters. .. seealso:: :ref:`JSON reader API reference `. -Basic usage +TableReader =========== -A JSON file is read from a :class:`~arrow::io::InputStream`. +:class:`~TableReader` reads an entire file in one shot as a :class:`~arrow::Table`. Each +independent JSON object in the input file is converted to a row in +the output table. .. code-block:: cpp @@ -66,6 +73,44 @@ A JSON file is read from a :class:`~arrow::io::InputStream`. } } +StreamingReader +=============== + +:class:`~StreamingReader` reads a file incrementally from blocks of a roughly equal byte size, each yielding a +:class:`~arrow::RecordBatch`. Each independent JSON object in a block +is converted to a row in the output batch. + +All batches adhere to a consistent :class:`~arrow:Schema`, which is +derived from the first loaded batch. Alternatively, an explicit schema +may be passed via :class:`~ParseOptions`. + +.. code-block:: cpp + + #include "arrow/json/api.h" + + { + // ... + auto read_options = arrow::json::ReadOptions::Defaults(); + auto parse_options = arrow::json::ParseOptions::Defaults(); + + std::shared_ptr stream; + auto result = arrow::json::StreamingReader::Make(stream, + read_options, + parse_options); + if (!result.ok()) { + // Handle instantiation error + } + std::shared_ptr reader = *result; + + for (arrow::Result> maybe_batch : *reader) { + if (!maybe_batch.ok()) { + // Handle read/parse error + } + std::shared_ptr batch = *maybe_batch; + // Operate on each batch... + } + } + Data types ========== @@ -75,7 +120,7 @@ objects. The fields of top-level objects are taken to represent columns in the Arrow data. For each name/value pair in a JSON object, there are two possible modes of deciding the output data type: -* if the name is in :class:`ConvertOptions::explicit_schema`, +* if the name is in :member:`ParseOptions::explicit_schema`, conversion of the JSON value to the corresponding Arrow data type is attempted;