Skip to content
This repository has been archived by the owner on Feb 17, 2023. It is now read-only.

Commit

Permalink
SKYHOOK-254: [Cls] Multiple updates to Arrow CLS
Browse files Browse the repository at this point in the history
  • Loading branch information
JayjeetAtGithub authored Aug 2, 2021
1 parent 31586a7 commit dfa7f84
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ set -x
set -u

ARROW_BUILD_DIR=${1}/cpp
DIR=/tmp/
DIR=/tmp/integration-skyhook

# reset
pkill ceph || true
Expand All @@ -32,7 +32,7 @@ MON_DATA=${DIR}/mon
MDS_DATA=${DIR}/mds
MOUNTPT=${MDS_DATA}/mnt
OSD_DATA=${DIR}/osd
mkdir ${LOG_DIR} ${MON_DATA} ${OSD_DATA} ${MDS_DATA} ${MOUNTPT}
mkdir -p ${LOG_DIR} ${MON_DATA} ${OSD_DATA} ${MDS_DATA} ${MOUNTPT}
MDS_NAME="Z"
MON_NAME="a"
MGR_NAME="x"
Expand Down
98 changes: 52 additions & 46 deletions cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ CLS_NAME(arrow)
cls_handle_t h_class;
cls_method_handle_t h_scan_op;

void LogArrowError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); }

/// \class RandomAccessObject
/// \brief An interface to provide a file-like view over RADOS objects.
class RandomAccessObject : public arrow::io::RandomAccessFile {
Expand Down Expand Up @@ -70,9 +72,12 @@ class RandomAccessObject : public arrow::io::RandomAccessFile {
return arrow::Status::OK();
}

arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; }
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
return arrow::Status::NotImplemented(
"ReadAt has not been implemented in RandomAccessObject");
}

/// Read at a specified number of bytes from a specified position.
/// Read a specified number of bytes from a specified position.
arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPosition(position, "read"));
Expand Down Expand Up @@ -145,6 +150,30 @@ class RandomAccessObject : public arrow::io::RandomAccessFile {
std::vector<ceph::bufferlist*> chunks_;
};

arrow::Result<std::shared_ptr<arrow::Table>> GetResultTableFromScanner(
arrow::dataset::FileSource source, arrow::compute::Expression filter,
arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> projection_schema,
std::shared_ptr<arrow::Schema> dataset_schema,
std::shared_ptr<arrow::dataset::FileFormat> format,
std::shared_ptr<arrow::dataset::FragmentScanOptions> fragment_scan_options) {
ARROW_ASSIGN_OR_RAISE(auto fragment,
format->MakeFragment(source, partition_expression));
auto options = std::make_shared<arrow::dataset::ScanOptions>();
auto builder =
std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options);

ARROW_RETURN_NOT_OK(builder->Filter(filter));
ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names()));
ARROW_RETURN_NOT_OK(builder->UseThreads(true));
ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options));

ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());

return table;
}

/// \brief Scan RADOS objects containing Arrow IPC data.
/// \param[in] hctx RADOS object context.
/// \param[in] filter The filter expression to apply.
Expand All @@ -159,27 +188,19 @@ static arrow::Status ScanIpcObject(cls_method_context_t hctx,
arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> projection_schema,
std::shared_ptr<arrow::Schema> dataset_schema,
std::shared_ptr<arrow::Table>& result_table,
std::shared_ptr<arrow::Table>* result_table,
int64_t object_size) {
auto file = std::make_shared<RandomAccessObject>(hctx, object_size);
arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME);
auto source =
std::make_shared<arrow::dataset::FileSource>(file, arrow::Compression::LZ4_FRAME);

auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
ARROW_ASSIGN_OR_RAISE(auto fragment,
format->MakeFragment(source, partition_expression));
auto fragment_scan_options = std::make_shared<arrow::dataset::IpcFragmentScanOptions>();

auto options = std::make_shared<arrow::dataset::ScanOptions>();
auto builder =
std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options);

ARROW_RETURN_NOT_OK(builder->Filter(filter));
ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names()));
ARROW_RETURN_NOT_OK(builder->UseThreads(false));

ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());

result_table = table;
ARROW_ASSIGN_OR_RAISE(
*result_table,
GetResultTableFromScanner(*source, filter, partition_expression, projection_schema,
dataset_schema, format, fragment_scan_options));

ARROW_RETURN_NOT_OK(file->Close());
return arrow::Status::OK();
Expand All @@ -199,31 +220,19 @@ static arrow::Status ScanParquetObject(cls_method_context_t hctx,
arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> projection_schema,
std::shared_ptr<arrow::Schema> dataset_schema,
std::shared_ptr<arrow::Table>& result_table,
std::shared_ptr<arrow::Table>* result_table,
int64_t object_size) {
auto file = std::make_shared<RandomAccessObject>(hctx, object_size);
arrow::dataset::FileSource source(file);
auto source = std::make_shared<arrow::dataset::FileSource>(file);

auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();

auto fragment_scan_options =
std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();

ARROW_ASSIGN_OR_RAISE(auto fragment,
format->MakeFragment(source, partition_expression));
auto options = std::make_shared<arrow::dataset::ScanOptions>();
auto builder =
std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options);

ARROW_RETURN_NOT_OK(builder->Filter(filter));
ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names()));
ARROW_RETURN_NOT_OK(builder->UseThreads(false));
ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options));

ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish());
ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());

result_table = table;
ARROW_ASSIGN_OR_RAISE(
*result_table,
GetResultTableFromScanner(*source, filter, partition_expression, projection_schema,
dataset_schema, format, fragment_scan_options));

ARROW_RETURN_NOT_OK(file->Close());
return arrow::Status::OK();
Expand Down Expand Up @@ -252,30 +261,30 @@ static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in,
&projection_schema, &dataset_schema,
file_size, file_format, *in))
.ok()) {
CLS_LOG(0, "error: %s", s.message().c_str());
LogArrowError(s.message());
return SCAN_REQ_DESER_ERR_CODE;
}

// Scan the object
std::shared_ptr<arrow::Table> table;
if (file_format == 0) {
if (file_format == arrow::dataset::SkyhookFileType::PARQUET) {
s = ScanParquetObject(hctx, filter, partition_expression, projection_schema,
dataset_schema, table, file_size);
} else if (file_format == 1) {
dataset_schema, &table, file_size);
} else if (file_format == arrow::dataset::SkyhookFileType::IPC) {
s = ScanIpcObject(hctx, filter, partition_expression, projection_schema,
dataset_schema, table, file_size);
dataset_schema, &table, file_size);
} else {
s = arrow::Status::Invalid("Unsupported file format");
}
if (!s.ok()) {
CLS_LOG(0, "error: %s", s.message().c_str());
LogArrowError(s.message());
return SCAN_ERR_CODE;
}

// Serialize the resultant table to send back to the client
ceph::bufferlist bl;
if (!(s = arrow::dataset::SerializeTable(table, bl)).ok()) {
CLS_LOG(0, "error: %s", s.message().c_str());
LogArrowError(s.message());
return SCAN_RES_SER_ERR_CODE;
}

Expand All @@ -284,9 +293,6 @@ static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in,
}

void __cls_init() {
CLS_LOG(0, "info: loading cls_arrow");

cls_register("arrow", &h_class);

cls_register_cxx_method(h_class, "scan_op", CLS_METHOD_RD, scan_op, &h_scan_op);
}
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_skyhook.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
namespace arrow {
namespace dataset {

enum SkyhookFileType { PARQUET, IPC };

/// \addtogroup dataset-file-formats
///
/// @{
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ services:
command: &cpp-skyhook-command >
/bin/bash -c "
/arrow/ci/scripts/cpp_build.sh /arrow /build &&
/arrow/ci/scripts/integration_ceph.sh /build &&
/arrow/ci/scripts/integration_skyhook.sh /build &&
/arrow/ci/scripts/cpp_test.sh /arrow /build"

ubuntu-python-skyhook:
Expand All @@ -928,7 +928,7 @@ services:
/bin/bash -c "
/arrow/ci/scripts/cpp_build.sh /arrow /build &&
/arrow/ci/scripts/python_build.sh /arrow /build &&
/arrow/ci/scripts/integration_ceph.sh /build &&
/arrow/ci/scripts/integration_skyhook.sh /build &&
/arrow/ci/scripts/python_test.sh /arrow"

conda-python-pandas:
Expand Down

0 comments on commit dfa7f84

Please sign in to comment.