Skip to content
This repository has been archived by the owner on Feb 17, 2023. It is now read-only.

Commit

Permalink
SKYHOOK-195: [Cls] Add support for scanning IPC objects and return pa…
Browse files Browse the repository at this point in the history
…rseable error codes
  • Loading branch information
JayjeetAtGithub committed Jul 24, 2021
1 parent bdc4854 commit 9e9d6dd
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 55 deletions.
11 changes: 4 additions & 7 deletions cpp/cmake_modules/FindArrowCls.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,10 @@ mark_as_advanced(ARROW_CLS_IMPORT_LIB
ARROW_CLS_VERSION
ARROW_CLS_VERSION_MATCH)

find_package_handle_standard_args(ArrowCls
REQUIRED_VARS
ARROW_CLS_INCLUDE_DIR
ARROW_CLS_LIB_DIR
ARROW_CLS_VERSION_MATCH
VERSION_VAR
ARROW_CLS_VERSION)
find_package_handle_standard_args(
ArrowCls
REQUIRED_VARS ARROW_CLS_INCLUDE_DIR ARROW_CLS_LIB_DIR ARROW_CLS_VERSION_MATCH
VERSION_VAR ARROW_CLS_VERSION)
set(ARROW_RADOS_CLS_FOUND ${ArrowCls_FOUND})

if(ArrowCls_FOUND AND NOT ArrowCls_FIND_QUIETLY)
Expand Down
92 changes: 75 additions & 17 deletions cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
#include "arrow/api.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/file_rados_parquet.h"
#include "arrow/io/api.h"
#include "arrow/ipc/api.h"
#include "arrow/util/compression.h"
#include "parquet/api/reader.h"
#include "parquet/arrow/reader.h"
#include "parquet/file_reader.h"

#define SCAN_ERR_CODE 25
#define SCAN_REQ_DESER_ERR_CODE 26
#define SCAN_RES_SER_ERR_CODE 27

CLS_VER(1, 0)
CLS_NAME(arrow)

Expand Down Expand Up @@ -139,13 +145,53 @@ class RandomAccessObject : public arrow::io::RandomAccessFile {
std::vector<ceph::bufferlist*> chunks_;
};

/// \brief Scan RADOS objects containing Arrow IPC data.
/// \param[in] hctx RADOS object context.
/// \param[in] filter The filter expression to apply.
/// \param[in] partition_expression The partition expression to use.
/// \param[in] projection_schema The projection schema.
/// \param[in] dataset_schema The dataset schema.
/// \param[out] result_table Table to store the resultant data.
/// \param[in] object_size The size of the object.
/// \return Status.
static arrow::Status ScanIpcObject(cls_method_context_t hctx,
arrow::compute::Expression filter,
arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> projection_schema,
std::shared_ptr<arrow::Schema> dataset_schema,
std::shared_ptr<arrow::Table>& result_table,
int64_t object_size) {
auto file = std::make_shared<RandomAccessObject>(hctx, object_size);
arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME);

auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
ARROW_ASSIGN_OR_RAISE(auto fragment,
format->MakeFragment(source, partition_expression));

auto options = std::make_shared<arrow::dataset::ScanOptions>();
auto builder =
std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options);

ARROW_RETURN_NOT_OK(builder->Filter(filter));
ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names()));
ARROW_RETURN_NOT_OK(builder->UseThreads(false));

ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());

result_table = table;

ARROW_RETURN_NOT_OK(file->Close());
return arrow::Status::OK();
}

/// \brief Scan RADOS objects containing Parquet binary data.
/// \param[in] hctx RADOS object context.
/// \param[in] filter The filter expression to apply.
/// \param[in] partition_expression The partition expression to use.
/// \param[in] projection_schema The projection schema.
/// \param[in] dataset_schema The dataset schema.
/// \param[out] table Table to store the resultant data.
/// \param[out] result_table Table to store the resultant data.
/// \param[in] object_size The size of the object.
/// \return Status.
static arrow::Status ScanParquetObject(cls_method_context_t hctx,
Expand All @@ -156,7 +202,6 @@ static arrow::Status ScanParquetObject(cls_method_context_t hctx,
std::shared_ptr<arrow::Table>& result_table,
int64_t object_size) {
auto file = std::make_shared<RandomAccessObject>(hctx, object_size);

arrow::dataset::FileSource source(file);

auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
Expand Down Expand Up @@ -193,40 +238,53 @@ static arrow::Status ScanParquetObject(cls_method_context_t hctx,
/// \return Status code.
static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in,
ceph::bufferlist* out) {
// the components required to construct a ParquetFragment.
// Components required to construct a File fragment.
arrow::Status s;
arrow::compute::Expression filter;
arrow::compute::Expression partition_expression;
std::shared_ptr<arrow::Schema> projection_schema;
std::shared_ptr<arrow::Schema> dataset_schema;
int64_t file_size;
int64_t file_format = 0; // 0 = Parquet, 1 = Ipc

// deserialize the scan request
if (!arrow::dataset::DeserializeScanRequest(&filter, &partition_expression,
&projection_schema, &dataset_schema,
file_size, *in)
.ok())
return -1;
// Deserialize the scan request
if (!(s = arrow::dataset::DeserializeScanRequest(&filter, &partition_expression,
&projection_schema, &dataset_schema,
file_size, file_format, *in))
.ok()) {
CLS_LOG(0, "error: %s", s.message().c_str());
return SCAN_REQ_DESER_ERR_CODE;
}

// scan the parquet object
// Scan the object
std::shared_ptr<arrow::Table> table;
arrow::Status s =
ScanParquetObject(hctx, filter, partition_expression, projection_schema,
dataset_schema, table, file_size);
if (file_format == 0) {
s = ScanParquetObject(hctx, filter, partition_expression, projection_schema,
dataset_schema, table, file_size);
} else if (file_format == 1) {
s = ScanIpcObject(hctx, filter, partition_expression, projection_schema,
dataset_schema, table, file_size);
} else {
s = arrow::Status::Invalid("Invalid file format");
}
if (!s.ok()) {
CLS_LOG(0, "error: %s", s.message().c_str());
return -1;
return SCAN_ERR_CODE;
}

// serialize the resultant table to send back to the client
// Serialize the resultant table to send back to the client
ceph::bufferlist bl;
if (!arrow::dataset::SerializeTable(table, bl).ok()) return -1;
if (!(s = arrow::dataset::SerializeTable(table, bl)).ok()) {
CLS_LOG(0, "error: %s", s.message().c_str());
return SCAN_RES_SER_ERR_CODE;
}

*out = bl;
return 0;
}

void __cls_init() {
CLS_LOG(0, "loading cls_arrow");
CLS_LOG(0, "info: loading cls_arrow");

cls_register("arrow", &h_class);

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/adapters/arrow-rados-cls/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ EOF
OSD_ID=$(ceph osd create)
ceph osd crush add osd.${OSD_ID} 1 root=default host=localhost
ceph-osd --id ${OSD_ID} --mkjournal --mkfs
ceph-osd --id ${OSD_ID}
ceph-osd --id ${OSD_ID} || ceph-osd --id ${OSD_ID} || ceph-osd --id ${OSD_ID}

# start a MDS daemon
MDS_DATA=${TEST_DIR}/mds
Expand Down Expand Up @@ -104,7 +104,7 @@ EOF
ceph -s
sleep 2

run the end-to-end C++ tests
# run the end-to-end C++ tests
TESTS=release/arrow-cls-cls-arrow-test
if [ -f "$TESTS" ]; then
release/arrow-cls-cls-arrow-test
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/arrow/dataset/file_rados_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "parquet/arrow/reader.h"
#include "parquet/file_reader.h"

#include <flatbuffers/flatbuffers.h>

#include "arrow/util/compression.h"
#include "generated/ScanRequest_generated.h"

namespace arrow {
Expand Down Expand Up @@ -124,8 +124,9 @@ Status SerializeScanRequest(std::shared_ptr<ScanOptions>& options, int64_t& file
auto dataset_schema_vec =
builder.CreateVector(dataset_schema->data(), dataset_schema->size());

auto request = flatbuf::CreateScanRequest(builder, file_size, filter_vec, partition_vec,
dataset_schema_vec, projected_schema_vec);
auto request =
flatbuf::CreateScanRequest(builder, file_size, options->file_format, filter_vec,
partition_vec, dataset_schema_vec, projected_schema_vec);
builder.Finish(request);
uint8_t* buf = builder.GetBufferPointer();
int size = builder.GetSize();
Expand All @@ -137,7 +138,7 @@ Status SerializeScanRequest(std::shared_ptr<ScanOptions>& options, int64_t& file
Status DeserializeScanRequest(compute::Expression* filter, compute::Expression* partition,
std::shared_ptr<Schema>* projected_schema,
std::shared_ptr<Schema>* dataset_schema, int64_t& file_size,
ceph::bufferlist& bl) {
int64_t& file_format, ceph::bufferlist& bl) {
auto request = flatbuf::GetScanRequest((uint8_t*)bl.c_str());

ARROW_ASSIGN_OR_RAISE(auto filter_,
Expand Down Expand Up @@ -165,6 +166,7 @@ Status DeserializeScanRequest(compute::Expression* filter, compute::Expression*
*dataset_schema = dataset_schema_;

file_size = request->file_size();
file_format = request->file_format();
return Status::OK();
}

Expand Down Expand Up @@ -194,8 +196,7 @@ Status SerializeTable(std::shared_ptr<Table>& table, ceph::bufferlist& bl,
return Status::OK();
}

Status DeserializeTable(RecordBatchVector& batches,
ceph::bufferlist& bl,
Status DeserializeTable(RecordBatchVector& batches, ceph::bufferlist& bl,
bool use_threads) {
auto buffer = std::make_shared<Buffer>((uint8_t*)bl.c_str(), bl.length());
auto buffer_reader = std::make_shared<io::BufferReader>(buffer);
Expand Down
28 changes: 19 additions & 9 deletions cpp/src/arrow/dataset/file_rados_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,18 @@
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"
#include "arrow/util/macros.h"

#include "parquet/arrow/writer.h"
#include "parquet/exception.h"

#define SCAN_ERR_CODE 25
#define SCAN_ERR_MSG "failed to scan file fragment"

#define SCAN_REQ_DESER_ERR_CODE 26
#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"

#define SCAN_RES_SER_ERR_CODE 27
#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"

namespace arrow {
namespace dataset {

Expand Down Expand Up @@ -108,8 +116,7 @@ class ARROW_DS_EXPORT RadosConnection : public Connection {

// Locks the mutex. Only one thread can pass here at a time.
// Another thread handled the connection already.
std::unique_lock<std::mutex> lock(
connection_mutex);
std::unique_lock<std::mutex> lock(connection_mutex);
if (connected) {
return Status::OK();
}
Expand Down Expand Up @@ -162,7 +169,7 @@ class ARROW_DS_EXPORT DirectObjectAccess {
Status Stat(const std::string& path, struct stat& st) {
struct stat file_st;
if (stat(path.c_str(), &file_st) < 0)
return Status::ExecutionError("stat returned non-zero exit code.");
return Status::Invalid("stat returned non-zero exit code.");
st = file_st;
return Status::OK();
}
Expand All @@ -186,10 +193,11 @@ class ARROW_DS_EXPORT DirectObjectAccess {
Status Exec(uint64_t inode, const std::string& fn, ceph::bufferlist& in,
ceph::bufferlist& out) {
std::string oid = ConvertFileInodeToObjectID(inode);
if (connection_->ioCtx->exec(oid.c_str(), connection_->ctx.cls_name.c_str(),
fn.c_str(), in, out)) {
return Status::ExecutionError("librados::exec returned non-zero exit code.");
}
int e = connection_->ioCtx->exec(oid.c_str(), connection_->ctx.cls_name.c_str(),
fn.c_str(), in, out);
if (e == SCAN_ERR_CODE) return Status::Invalid(SCAN_ERR_MSG);
if (e == SCAN_REQ_DESER_ERR_CODE) return Status::Invalid(SCAN_REQ_DESER_ERR_MSG);
if (e == SCAN_RES_SER_ERR_CODE) return Status::Invalid(SCAN_RES_SER_ERR_MSG);
return Status::OK();
}

Expand Down Expand Up @@ -261,13 +269,15 @@ ARROW_DS_EXPORT Status SerializeScanRequest(std::shared_ptr<ScanOptions>& option
/// \param[out] projected_schema The schema to project the filtered record batches.
/// \param[out] dataset_schema The dataset schema to use.
/// \param[out] file_size The size of the file.
/// \param[out] file_format The file format to use.
/// \param[in] bl Input Ceph bufferlist.
/// \return Status.
ARROW_DS_EXPORT Status DeserializeScanRequest(compute::Expression* filter,
compute::Expression* partition,
std::shared_ptr<Schema>* projected_schema,
std::shared_ptr<Schema>* dataset_schema,
int64_t& file_size, ceph::bufferlist& bl);
int64_t& file_size, int64_t& file_format,
ceph::bufferlist& bl);

/// \brief Serialize the result Table to a bufferlist.
/// \param[in] table The table to serialize.
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/dataset/file_rados_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ TEST(TestRadosParquetFileFormat, ScanRequestSerializeDeserialize) {
std::shared_ptr<ScanOptions> options = std::make_shared<ScanOptions>();
options->projected_schema = arrow::schema({arrow::field("a", arrow::int64())});
options->dataset_schema = arrow::schema({arrow::field("a", arrow::int64())});
options->file_format = 1;

ceph::bufferlist bl;
int64_t file_size = 1000000;
Expand All @@ -66,13 +67,16 @@ TEST(TestRadosParquetFileFormat, ScanRequestSerializeDeserialize) {
std::shared_ptr<Schema> projected_schema_;
std::shared_ptr<Schema> dataset_schema_;
int64_t file_size_;
int64_t file_format_;
DeserializeScanRequest(&filter_, &partition_expression_, &projected_schema_,
&dataset_schema_, file_size_, bl);
&dataset_schema_, file_size_, file_format_, bl);

ASSERT_EQ(options->filter.Equals(filter_), 1);
ASSERT_EQ(options->partition_expression.Equals(partition_expression_), 1);
ASSERT_EQ(options->projected_schema->Equals(projected_schema_), 1);
ASSERT_EQ(options->dataset_schema->Equals(dataset_schema_), 1);
ASSERT_EQ(file_size_, file_size);
ASSERT_EQ(file_format_, options->file_format);
}

TEST(TestRadosParquetFileFormat, SerializeDeserializeTable) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ struct ARROW_DS_EXPORT ScanOptions {
// sub-selection optimization.
std::vector<std::string> MaterializedFields() const;

int64_t file_format = 0; // 0 = Parquet, 1 = Ipc

// Return a threaded or serial TaskGroup according to use_threads.
std::shared_ptr<internal::TaskGroup> TaskGroup() const;
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/gandiva/jni/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ if(MSVC)
endif()

# Find JNI
set(JAVA_AWT_INCLUDE_PATH
NotNeeded) # We don't care about Java AWT, only about JNI functionality.
set(JAVA_AWT_INCLUDE_PATH NotNeeded
)# We don't care about Java AWT, only about JNI functionality.
find_package(JNI REQUIRED)

set(PROTO_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR})
Expand Down
Loading

0 comments on commit 9e9d6dd

Please sign in to comment.