Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ endif()

add_arrow_benchmark(read_write_benchmark PREFIX "arrow-ipc")

if(ARROW_FUZZING)
if(ARROW_FUZZING
OR (ARROW_BUILD_UTILITIES
AND ARROW_TESTING
AND ARROW_WITH_LZ4
AND ARROW_WITH_ZSTD
))
add_executable(arrow-ipc-generate-fuzz-corpus generate_fuzz_corpus.cc)
target_link_libraries(arrow-ipc-generate-fuzz-corpus ${ARROW_UTIL_LIB}
${ARROW_TEST_LINK_LIBS})
Expand Down
44 changes: 31 additions & 13 deletions cpp/src/arrow/ipc/generate_fuzz_corpus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/testing/extension_type.h"
#include "arrow/util/compression.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"

namespace arrow {
namespace ipc {
namespace arrow::ipc {

using ::arrow::internal::CreateDir;
using ::arrow::internal::PlatformFilename;
Expand Down Expand Up @@ -88,6 +88,13 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> Batches() {
batches.push_back(batch);
RETURN_NOT_OK(test::MakeFixedSizeListRecordBatch(&batch));
batches.push_back(batch);
RETURN_NOT_OK(test::MakeStringTypesRecordBatch(&batch));
batches.push_back(batch);
RETURN_NOT_OK(test::MakeUuid(&batch));
batches.push_back(batch);
RETURN_NOT_OK(test::MakeRunEndEncoded(&batch));
batches.push_back(batch);

ARROW_ASSIGN_OR_RAISE(batch, MakeExtensionBatch());
batches.push_back(batch);
ARROW_ASSIGN_OR_RAISE(batch, MakeMapBatch());
Expand All @@ -97,13 +104,14 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> Batches() {
}

Result<std::shared_ptr<Buffer>> SerializeRecordBatch(
const std::shared_ptr<RecordBatch>& batch, bool is_stream_format) {
const std::shared_ptr<RecordBatch>& batch, const IpcWriteOptions& options,
bool is_stream_format) {
ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create(1024));
std::shared_ptr<RecordBatchWriter> writer;
if (is_stream_format) {
ARROW_ASSIGN_OR_RAISE(writer, MakeStreamWriter(sink, batch->schema()));
ARROW_ASSIGN_OR_RAISE(writer, MakeStreamWriter(sink, batch->schema(), options));
} else {
ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(sink, batch->schema()));
ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(sink, batch->schema(), options));
}
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
RETURN_NOT_OK(writer->Close());
Expand All @@ -119,16 +127,27 @@ Status DoMain(bool is_stream_format, const std::string& out_dir) {
return "batch-" + std::to_string(sample_num++);
};

// codec 0 is uncompressed
std::vector<std::shared_ptr<util::Codec>> codecs(3, nullptr);
ARROW_ASSIGN_OR_RAISE(codecs[1], util::Codec::Create(Compression::LZ4_FRAME));
ARROW_ASSIGN_OR_RAISE(codecs[2], util::Codec::Create(Compression::ZSTD));

ARROW_ASSIGN_OR_RAISE(auto batches, Batches());

// Emit a separate file for each (batch, codec) pair
for (const auto& batch : batches) {
RETURN_NOT_OK(batch->ValidateFull());
ARROW_ASSIGN_OR_RAISE(auto buf, SerializeRecordBatch(batch, is_stream_format));
ARROW_ASSIGN_OR_RAISE(auto sample_fn, dir_fn.Join(sample_name()));
std::cerr << sample_fn.ToString() << std::endl;
ARROW_ASSIGN_OR_RAISE(auto file, io::FileOutputStream::Open(sample_fn.ToString()));
RETURN_NOT_OK(file->Write(buf));
RETURN_NOT_OK(file->Close());
for (const auto& codec : codecs) {
IpcWriteOptions options = IpcWriteOptions::Defaults();
options.codec = codec;
ARROW_ASSIGN_OR_RAISE(auto buf,
SerializeRecordBatch(batch, options, is_stream_format));
ARROW_ASSIGN_OR_RAISE(auto sample_fn, dir_fn.Join(sample_name()));
std::cerr << sample_fn.ToString() << std::endl;
ARROW_ASSIGN_OR_RAISE(auto file, io::FileOutputStream::Open(sample_fn.ToString()));
RETURN_NOT_OK(file->Write(buf));
RETURN_NOT_OK(file->Close());
}
}
return Status::OK();
}
Expand Down Expand Up @@ -157,7 +176,6 @@ int Main(int argc, char** argv) {
return 0;
}

} // namespace ipc
} // namespace arrow
} // namespace arrow::ipc

int main(int argc, char** argv) { return arrow::ipc::Main(argc, argv); }
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/generate_tensor_fuzz_corpus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ using ::arrow::internal::PlatformFilename;
Result<PlatformFilename> PrepareDirectory(const std::string& dir) {
ARROW_ASSIGN_OR_RAISE(auto dir_fn, PlatformFilename::FromString(dir));
RETURN_NOT_OK(::arrow::internal::CreateDir(dir_fn));
return std::move(dir_fn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this would meet same problem ( #41025 ) in gcc 7.5.0, sigh...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. #41025 involved an implicit conversion from unique_ptr to shared_ptr, IIUC, which isn't the case here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this would not casting implicit cast PlatformFilename to Result<PlatformFilename> or did I miss something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #41025 the cast is from std::unique_ptr to Result<std::shared_ptr>, which is more complex (there are two cascaded casts).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try anyway.

return dir_fn;
}

Result<std::shared_ptr<Buffer>> MakeSerializedBuffer(
Expand Down