Skip to content
This repository has been archived by the owner on Feb 17, 2023. It is now read-only.

Commit

Permalink
Change Micro OSD script to use Bluestore and update Skyhook-CI workfl…
Browse files Browse the repository at this point in the history
…ow (#93)
  • Loading branch information
JayjeetAtGithub committed Feb 15, 2021
1 parent d10e1ee commit 8419ba1
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 43 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/skyhook-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ jobs:
- name: push development image
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/rados-dataset-dev' }}
run: |
docker tag $DOCKER_ORG/skyhookdm-arrow:latest $DOCKER_ORG/skyhookdm-arrow:dev
docker push $DOCKER_ORG/skyhookdm-arrow:dev
docker tag $DOCKER_ORG/skyhookdm-arrow:latest $DOCKER_ORG/skyhookdm-arrow:dev-$GITHUB_SHA
docker push $DOCKER_ORG/skyhookdm-arrow:dev-$GITHUB_SHA
- name: push production image
if: ${{ github.event_name == 'release' && startsWith(github.ref, 'refs/tags/v') }}
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/integration_ceph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ chdir = ""
osd data = ${OSD_DATA}
osd journal = ${OSD_DATA}.journal
osd journal size = 100
osd objectstore = memstore
osd class load list = lock log numops refcount replica_log statelog timeindex user version arrow
osd objectstore = bluestore
osd class load list = *
EOF

OSD_ID=$(ceph osd create)
Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ class RandomAccessObject : public arrow::io::RandomAccessFile {
nbytes = std::min(nbytes, content_length_ - position);

if (nbytes > 0) {
ceph::bufferlist bl;
cls_cxx_read(hctx_, position, nbytes, &bl);
return std::make_shared<arrow::Buffer>((uint8_t*)bl.c_str(), bl.length());
ceph::bufferlist* bl = new ceph::bufferlist();
cls_cxx_read(hctx_, position, nbytes, bl);
return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length());
}
return std::make_shared<arrow::Buffer>("");
}
Expand Down Expand Up @@ -139,10 +139,6 @@ static arrow::Status ScanParquetObject(cls_method_context_t hctx,

arrow::dataset::FileSource source(file);

std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(
parquet::arrow::OpenFile(file, arrow::default_memory_pool(), &reader));

auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
ARROW_ASSIGN_OR_RAISE(auto fragment,
format->MakeFragment(source, partition_expression));
Expand Down Expand Up @@ -183,14 +179,13 @@ static int read_schema(cls_method_context_t hctx, ceph::bufferlist* in,
return 0;
}

static int scan(cls_method_context_t hctx, ceph::bufferlist* in,
ceph::bufferlist* out) {
static int scan(cls_method_context_t hctx, ceph::bufferlist* in, ceph::bufferlist* out) {
// the components required to construct a ParquetFragment.
arrow::dataset::Expression filter;
arrow::dataset::Expression partition_expression;
std::shared_ptr<arrow::Schema> projection_schema;
std::shared_ptr<arrow::Schema> dataset_schema;

// deserialize the scan request
if (!arrow::dataset::DeserializeScanRequestFromBufferlist(
&filter, &partition_expression, &projection_schema, &dataset_schema, *in)
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"


std::shared_ptr<arrow::dataset::RadosParquetFileFormat> GetFormat() {
std::string ceph_config_path = "/etc/ceph/ceph.conf";
std::string data_pool = "cephfs_data";
std::string user_name = "client.admin";
std::string cluster_name = "ceph";
return std::make_shared<arrow::dataset::RadosParquetFileFormat>(
ceph_config_path, data_pool, user_name, cluster_name
);
ceph_config_path, data_pool, user_name, cluster_name);
}

std::shared_ptr<arrow::dataset::Dataset> GetDatasetFromDirectory(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/adapters/arrow-rados-cls/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM jcnitdgp25/skyhookdm-arrow:stable-0.1
FROM uccross/skyhookdm-arrow:stable-0.1

RUN yum -y update && \
yum -y install python3 \
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/arrow/dataset/file_rados_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class RadosParquetScanTask : public ScanTask {
doa_(std::move(doa)) {}

Result<RecordBatchIterator> Execute() override {
ceph::bufferlist *in = new ceph::bufferlist();
ceph::bufferlist *out = new ceph::bufferlist();
ceph::bufferlist* in = new ceph::bufferlist();
ceph::bufferlist* out = new ceph::bufferlist();

ARROW_RETURN_NOT_OK(SerializeScanRequestToBufferlist(
options_->filter, options_->partition_expression, options_->projector.schema(),
Expand All @@ -66,10 +66,12 @@ class RadosParquetScanTask : public ScanTask {
std::shared_ptr<DirectObjectAccess> doa_;
};

RadosParquetFileFormat::RadosParquetFileFormat(
const std::string& ceph_config_path, const std::string& data_pool, const std::string& user_name, const std::string& cluster_name) {
auto cluster = std::make_shared<RadosCluster>(
ceph_config_path, data_pool, user_name, cluster_name);
RadosParquetFileFormat::RadosParquetFileFormat(const std::string& ceph_config_path,
const std::string& data_pool,
const std::string& user_name,
const std::string& cluster_name) {
auto cluster = std::make_shared<RadosCluster>(ceph_config_path, data_pool, user_name,
cluster_name);
cluster->Connect();
auto doa = std::make_shared<arrow::dataset::DirectObjectAccess>(cluster);
doa_ = doa;
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/dataset/file_rados_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ namespace dataset {

class ARROW_DS_EXPORT RadosCluster {
public:
explicit RadosCluster(std::string ceph_config_path_, std::string data_pool_, std::string user_name_, std::string cluster_name_)
explicit RadosCluster(std::string ceph_config_path_, std::string data_pool_,
std::string user_name_, std::string cluster_name_)
: data_pool(data_pool_),
user_name(user_name_),
cluster_name(cluster_name_),
Expand Down Expand Up @@ -98,8 +99,7 @@ class ARROW_DS_EXPORT DirectObjectAccess {
explicit DirectObjectAccess(const std::shared_ptr<RadosCluster>& cluster)
: cluster_(std::move(cluster)) {}

Status Exec(const std::string& path, const std::string& fn,
ceph::bufferlist& in,
Status Exec(const std::string& path, const std::string& fn, ceph::bufferlist& in,
ceph::bufferlist& out) {
struct stat dir_st;
if (stat(path.c_str(), &dir_st) < 0)
Expand All @@ -125,8 +125,8 @@ class ARROW_DS_EXPORT DirectObjectAccess {

class ARROW_DS_EXPORT RadosParquetFileFormat : public FileFormat {
public:
explicit RadosParquetFileFormat(
const std::string&, const std::string&, const std::string&, const std::string&);
explicit RadosParquetFileFormat(const std::string&, const std::string&,
const std::string&, const std::string&);

explicit RadosParquetFileFormat(std::shared_ptr<DirectObjectAccess> doa)
: doa_(std::move(doa)) {}
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/dataset/rados_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ Status DeserializeScanRequestFromBufferlist(Expression* filter, Expression* part
return Status::OK();
}

Status SerializeTableToBufferlist(std::shared_ptr<Table>& table,
ceph::bufferlist& bl) {
Status SerializeTableToBufferlist(std::shared_ptr<Table>& table, ceph::bufferlist& bl) {
ARROW_ASSIGN_OR_RAISE(auto buffer_output_stream, io::BufferOutputStream::Create());

const auto options = ipc::IpcWriteOptions::Defaults();
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/rados_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ ARROW_DS_EXPORT Status DeserializeScanRequestFromBufferlist(
std::shared_ptr<Schema>* dataset_schema, ceph::bufferlist& bl);

/// \brief Serialize a Table to an Arrow IPC binary buffer.
ARROW_DS_EXPORT Status SerializeTableToBufferlist(
std::shared_ptr<Table>& table, ceph::bufferlist& bl);
ARROW_DS_EXPORT Status SerializeTableToBufferlist(std::shared_ptr<Table>& table,
ceph::bufferlist& bl);

} // namespace dataset
} // namespace arrow
7 changes: 6 additions & 1 deletion python/pyarrow/_rados.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ cdef extern from "arrow/dataset/file_rados_parquet.h" \
cdef cppclass CRadosParquetFileFormat \
"arrow::dataset::RadosParquetFileFormat"(
CFileFormat):
CRadosParquetFileFormat(c_string ceph_config_path, c_string data_pool, c_string user_name, c_string cluster_name)
CRadosParquetFileFormat(
c_string ceph_config_path,
c_string data_pool,
c_string user_name,
c_string cluster_name
)
18 changes: 9 additions & 9 deletions python/pyarrow/_rados.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ cdef class RadosParquetFileFormat(FileFormat):
CRadosParquetFileFormat* rados_parquet_format

def __init__(
self,
ceph_config_path="/etc/ceph/ceph.conf",
data_pool="cephfs_data",
user_name="client.admin",
cluster_name="ceph"
):
self,
ceph_config_path="/etc/ceph/ceph.conf",
data_pool="cephfs_data",
user_name="client.admin",
cluster_name="ceph"
):
self.init(shared_ptr[CFileFormat](
new CRadosParquetFileFormat(
tobytes(ceph_config_path),
tobytes(data_pool),
tobytes(user_name),
tobytes(ceph_config_path),
tobytes(data_pool),
tobytes(user_name),
tobytes(cluster_name)
)
))
Expand Down

0 comments on commit 8419ba1

Please sign in to comment.