From dfa7f843b8f628a950940ccae873545e32d40817 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Tue, 3 Aug 2021 01:38:33 +0530 Subject: [PATCH] SKYHOOK-254: [Cls] Multiple updates to Arrow CLS --- ...gration_ceph.sh => integration_skyhook.sh} | 4 +- .../adapters/arrow-rados-cls/cls_arrow.cc | 98 ++++++++++--------- cpp/src/arrow/dataset/file_skyhook.h | 2 + docker-compose.yml | 4 +- 4 files changed, 58 insertions(+), 50 deletions(-) rename ci/scripts/{integration_ceph.sh => integration_skyhook.sh} (97%) diff --git a/ci/scripts/integration_ceph.sh b/ci/scripts/integration_skyhook.sh similarity index 97% rename from ci/scripts/integration_ceph.sh rename to ci/scripts/integration_skyhook.sh index 1583facde1fd7..39df3d38ecd90 100755 --- a/ci/scripts/integration_ceph.sh +++ b/ci/scripts/integration_skyhook.sh @@ -22,7 +22,7 @@ set -x set -u ARROW_BUILD_DIR=${1}/cpp -DIR=/tmp/ +DIR=/tmp/integration-skyhook # reset pkill ceph || true @@ -32,7 +32,7 @@ MON_DATA=${DIR}/mon MDS_DATA=${DIR}/mds MOUNTPT=${MDS_DATA}/mnt OSD_DATA=${DIR}/osd -mkdir ${LOG_DIR} ${MON_DATA} ${OSD_DATA} ${MDS_DATA} ${MOUNTPT} +mkdir -p ${LOG_DIR} ${MON_DATA} ${OSD_DATA} ${MDS_DATA} ${MOUNTPT} MDS_NAME="Z" MON_NAME="a" MGR_NAME="x" diff --git a/cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc b/cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc index 96aeb9bb0b548..c4f4f1a305764 100644 --- a/cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc +++ b/cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc @@ -41,6 +41,8 @@ CLS_NAME(arrow) cls_handle_t h_class; cls_method_handle_t h_scan_op; +void LogArrowError(const std::string& msg) { CLS_LOG(0, "error: %s", msg.c_str()); } + /// \class RandomAccessObject /// \brief An interface to provide a file-like view over RADOS objects. class RandomAccessObject : public arrow::io::RandomAccessFile { @@ -70,9 +72,12 @@ class RandomAccessObject : public arrow::io::RandomAccessFile { return arrow::Status::OK(); } - arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) { + return arrow::Status::NotImplemented( + "ReadAt has not been implemented in RandomAccessObject"); + } - /// Read at a specified number of bytes from a specified position. + /// Read a specified number of bytes from a specified position. arrow::Result> ReadAt(int64_t position, int64_t nbytes) { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(CheckPosition(position, "read")); @@ -145,6 +150,30 @@ class RandomAccessObject : public arrow::io::RandomAccessFile { std::vector chunks_; }; +arrow::Result> GetResultTableFromScanner( + arrow::dataset::FileSource source, arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr projection_schema, + std::shared_ptr dataset_schema, + std::shared_ptr format, + std::shared_ptr fragment_scan_options) { + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + auto options = std::make_shared(); + auto builder = + std::make_shared(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(true)); + ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + return table; +} + /// \brief Scan RADOS objects containing Arrow IPC data. /// \param[in] hctx RADOS object context. /// \param[in] filter The filter expression to apply. @@ -159,27 +188,19 @@ static arrow::Status ScanIpcObject(cls_method_context_t hctx, arrow::compute::Expression partition_expression, std::shared_ptr projection_schema, std::shared_ptr dataset_schema, - std::shared_ptr& result_table, + std::shared_ptr* result_table, int64_t object_size) { auto file = std::make_shared(hctx, object_size); - arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME); + auto source = + std::make_shared(file, arrow::Compression::LZ4_FRAME); auto format = std::make_shared(); - ARROW_ASSIGN_OR_RAISE(auto fragment, - format->MakeFragment(source, partition_expression)); + auto fragment_scan_options = std::make_shared(); - auto options = std::make_shared(); - auto builder = - std::make_shared(dataset_schema, fragment, options); - - ARROW_RETURN_NOT_OK(builder->Filter(filter)); - ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); - ARROW_RETURN_NOT_OK(builder->UseThreads(false)); - - ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); - ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); - - result_table = table; + ARROW_ASSIGN_OR_RAISE( + *result_table, + GetResultTableFromScanner(*source, filter, partition_expression, projection_schema, + dataset_schema, format, fragment_scan_options)); ARROW_RETURN_NOT_OK(file->Close()); return arrow::Status::OK(); @@ -199,31 +220,19 @@ static arrow::Status ScanParquetObject(cls_method_context_t hctx, arrow::compute::Expression partition_expression, std::shared_ptr projection_schema, std::shared_ptr dataset_schema, - std::shared_ptr& result_table, + std::shared_ptr* result_table, int64_t object_size) { auto file = std::make_shared(hctx, object_size); - arrow::dataset::FileSource source(file); + auto source = std::make_shared(file); auto format = std::make_shared(); - auto fragment_scan_options = std::make_shared(); - ARROW_ASSIGN_OR_RAISE(auto fragment, - format->MakeFragment(source, partition_expression)); - auto options = std::make_shared(); - auto builder = - std::make_shared(dataset_schema, fragment, options); - - ARROW_RETURN_NOT_OK(builder->Filter(filter)); - ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); - ARROW_RETURN_NOT_OK(builder->UseThreads(false)); - ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options)); - - ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); - ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); - - result_table = table; + ARROW_ASSIGN_OR_RAISE( + *result_table, + GetResultTableFromScanner(*source, filter, partition_expression, projection_schema, + dataset_schema, format, fragment_scan_options)); ARROW_RETURN_NOT_OK(file->Close()); return arrow::Status::OK(); @@ -252,30 +261,30 @@ static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in, &projection_schema, &dataset_schema, file_size, file_format, *in)) .ok()) { - CLS_LOG(0, "error: %s", s.message().c_str()); + LogArrowError(s.message()); return SCAN_REQ_DESER_ERR_CODE; } // Scan the object std::shared_ptr table; - if (file_format == 0) { + if (file_format == arrow::dataset::SkyhookFileType::PARQUET) { s = ScanParquetObject(hctx, filter, partition_expression, projection_schema, - dataset_schema, table, file_size); - } else if (file_format == 1) { + dataset_schema, &table, file_size); + } else if (file_format == arrow::dataset::SkyhookFileType::IPC) { s = ScanIpcObject(hctx, filter, partition_expression, projection_schema, - dataset_schema, table, file_size); + dataset_schema, &table, file_size); } else { s = arrow::Status::Invalid("Unsupported file format"); } if (!s.ok()) { - CLS_LOG(0, "error: %s", s.message().c_str()); + LogArrowError(s.message()); return SCAN_ERR_CODE; } // Serialize the resultant table to send back to the client ceph::bufferlist bl; if (!(s = arrow::dataset::SerializeTable(table, bl)).ok()) { - CLS_LOG(0, "error: %s", s.message().c_str()); + LogArrowError(s.message()); return SCAN_RES_SER_ERR_CODE; } @@ -284,9 +293,6 @@ static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in, } void __cls_init() { - CLS_LOG(0, "info: loading cls_arrow"); - cls_register("arrow", &h_class); - cls_register_cxx_method(h_class, "scan_op", CLS_METHOD_RD, scan_op, &h_scan_op); } diff --git a/cpp/src/arrow/dataset/file_skyhook.h b/cpp/src/arrow/dataset/file_skyhook.h index 837fb7834cccf..58e54db54f1e8 100644 --- a/cpp/src/arrow/dataset/file_skyhook.h +++ b/cpp/src/arrow/dataset/file_skyhook.h @@ -62,6 +62,8 @@ namespace arrow { namespace dataset { +enum SkyhookFileType { PARQUET, IPC }; + /// \addtogroup dataset-file-formats /// /// @{ diff --git a/docker-compose.yml b/docker-compose.yml index 496eea784daaa..d1de5db5e112e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -905,7 +905,7 @@ services: command: &cpp-skyhook-command > /bin/bash -c " /arrow/ci/scripts/cpp_build.sh /arrow /build && - /arrow/ci/scripts/integration_ceph.sh /build && + /arrow/ci/scripts/integration_skyhook.sh /build && /arrow/ci/scripts/cpp_test.sh /arrow /build" ubuntu-python-skyhook: @@ -928,7 +928,7 @@ services: /bin/bash -c " /arrow/ci/scripts/cpp_build.sh /arrow /build && /arrow/ci/scripts/python_build.sh /arrow /build && - /arrow/ci/scripts/integration_ceph.sh /build && + /arrow/ci/scripts/integration_skyhook.sh /build && /arrow/ci/scripts/python_test.sh /arrow" conda-python-pandas: