Skip to content

Commit

Permalink
Enable read data from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Mar 28, 2023
1 parent 1f2516a commit afabb48
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 27 deletions.
23 changes: 17 additions & 6 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ endmacro()
find_package(Threads REQUIRED)
find_yaml_cpp()
find_package(OpenSSL QUIET)
if (APPLE)
find_package(curl REQUIRED)
else()
find_package(CURL REQUIRED)
endif()
if(OPENSSL_FOUND)
if(OPENSSL_VERSION LESS "1.1.0")
message(ERROR "The OpenSSL must be greater than or equal to 1.1.0, current version is ${OPENSSL_VERSION}")
Expand Down Expand Up @@ -202,7 +207,13 @@ macro(build_gar)
# if OpenSSL library exists, link the OpenSSL library.
# OpenSSL has to be linked after GAR_ARROW_BUNDLED_DEPS_STATIC_LIB
if(OPENSSL_FOUND)
target_link_libraries(gar PRIVATE OpenSSL::SSL)
target_link_libraries(gar PUBLIC OpenSSL::SSL)
endif()
if (CURL_FOUND)
target_link_libraries(gar PUBLIC CURL::libcurl)
endif()
if (APPLE)
target_link_libraries(gar "-framework CoreFoundation")
endif()
endmacro()

Expand Down Expand Up @@ -311,11 +322,11 @@ if (BUILD_TESTS)
endmacro()

add_test(test_info SRCS test/test_info.cc)
add_test(test_arrow_chunk_writer SRCS test/test_arrow_chunk_writer.cc)
add_test(test_builder SRCS test/test_builder.cc)
add_test(test_chunk_info_reader SRCS test/test_chunk_info_reader.cc)
add_test(test_arrow_chunk_reader SRCS test/test_arrow_chunk_reader.cc)
add_test(test_graph SRCS test/test_graph.cc)
# add_test(test_arrow_chunk_writer SRCS test/test_arrow_chunk_writer.cc)
# add_test(test_builder SRCS test/test_builder.cc)
# add_test(test_chunk_info_reader SRCS test/test_chunk_info_reader.cc)
# add_test(test_arrow_chunk_reader SRCS test/test_arrow_chunk_reader.cc)
# add_test(test_graph SRCS test/test_graph.cc)

# enable_testing()
endif()
Expand Down
3 changes: 2 additions & 1 deletion cpp/cmake/apache-arrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ function(build_arrow)
"-DARROW_WITH_ZSTD=ON"
"-DARROW_WITH_ZLIB=OFF"
"-DARROW_WITH_BROTLI=OFF"
"-DARROW_WITH_BZ2=OFF")
"-DARROW_WITH_BZ2=OFF"
"-DARROW_S3=ON")

set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}")
Expand Down
5 changes: 4 additions & 1 deletion cpp/include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace GAR_NAMESPACE_INTERNAL {

class Yaml;

class FileSystem;

/**
* Property is a struct to store the property information for a group.
*/
Expand Down Expand Up @@ -1017,7 +1019,8 @@ class GraphInfo {
* object indicating an error.
*/
static Result<GraphInfo> Load(const std::string& input,
const std::string& relative_path);
const std::string& relative_path,
std::shared_ptr<FileSystem> fs = nullptr);

/**
* @brief Adds a vertex info to the GraphInfo instance.
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class VertexPropertyArrowChunkReader {
GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
vertex_info.GetPathPrefix(property_group));
std::string base_dir = prefix_ + pg_path_prefix;
GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_, fs_->GetFileNumOfDir(base_dir));
GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
utils::GetVertexChunkNum(prefix_, vertex_info, fs_));
}

/**
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/gar/utils/reader_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Result<std::pair<IdType, IdType>> GetAdjListOffsetOfVertex(
AdjListType adj_list_type, IdType vid) noexcept;

Result<IdType> GetVertexChunkNum(const std::string& prefix,
const VertexInfo& vertex_info) noexcept;
const VertexInfo& vertex_info,
std::shared_ptr<FileSystem> fs = nullptr) noexcept;

Result<IdType> GetEdgeChunkNum(const std::string& prefix,
const EdgeInfo& edge_info,
Expand Down
3 changes: 3 additions & 0 deletions cpp/include/gar/writer/arrow_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
#ifndef GAR_WRITER_ARROW_CHUNK_WRITER_H_
#define GAR_WRITER_ARROW_CHUNK_WRITER_H_

#include <iostream>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -64,6 +65,7 @@ class VertexPropertyWriter {
prefix_(prefix),
validate_level_(validate_level) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
prefix_ += "/";
}

/**
Expand Down Expand Up @@ -193,6 +195,7 @@ class EdgeChunkWriter {
adj_list_type_(adj_list_type),
validate_level_(validate_level) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
prefix_ += "/";
chunk_size_ = edge_info_.GetChunkSize();
switch (adj_list_type) {
case AdjListType::unordered_by_source:
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
arrow_fs_->OpenInputStream(path));
auto read_options = arrow::csv::ReadOptions::Defaults();
auto parse_options = arrow::csv::ParseOptions::Defaults();
parse_options.delimiter = ' ';
auto convert_options = arrow::csv::ConvertOptions::Defaults();
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto reader, arrow::csv::TableReader::Make(
Expand Down Expand Up @@ -172,8 +173,8 @@ Result<std::string> FileSystem::ReadFileToValue(const std::string& path) const
template <typename T>
Status FileSystem::WriteValueToFile(const T& value,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// RETURN_NOT_ARROW_OK(
// arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
arrow_fs_->OpenOutputStream(path));
RETURN_NOT_ARROW_OK(ofstream->Write(&value, sizeof(T)));
Expand All @@ -196,8 +197,8 @@ Status FileSystem::WriteValueToFile(const std::string& value,
Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
FileType file_type,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// RETURN_NOT_ARROW_OK(
// arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
arrow_fs_->OpenOutputStream(path));
switch (file_type) {
Expand Down Expand Up @@ -253,10 +254,11 @@ Result<IdType> FileSystem::GetFileNumOfDir(const std::string& dir_path,
file_selector.base_dir = dir_path;
file_selector.allow_not_found = false; // if dir_path not exist, return error
file_selector.recursive = recursive;
arrow::fs::FileInfoVector file_infos;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(file_infos,
arrow_fs_->GetFileInfo(file_selector));
return static_cast<IdType>(file_infos.size());
// arrow::fs::FileInfoVector file_infos;
arrow::fs::FileInfo file_info;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(file_info,
arrow_fs_->GetFileInfo(dir_path));
return static_cast<IdType>(file_info.size());
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
Expand Down
32 changes: 25 additions & 7 deletions cpp/src/graph_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,19 @@ Status EdgeInfo::Save(const std::string& path) const {
}

static std::string PathToDirectory(const std::string& path) {
const size_t last_slash_idx = path.rfind('/');
if (std::string::npos != last_slash_idx) {
return path.substr(0, last_slash_idx + 1); // +1 to include the slash
if (path.rfind("s3://", 0) != 0) {
const size_t last_slash_idx = path.rfind('/');
if (std::string::npos != last_slash_idx) {
return path.substr(0, last_slash_idx + 1); // +1 to include the slash
}
} else {
int t = path.find_last_of('?');
std::string prefix = path.substr(0, t);
std::string suffix = path.substr(t);
const size_t last_slash_idx = prefix.rfind('/');
if (std::string::npos != last_slash_idx) {
return prefix.substr(0, last_slash_idx + 1) + suffix; // +1 to include the slash
}
}
return path;
}
Expand All @@ -245,7 +255,9 @@ Result<GraphInfo> GraphInfo::Load(const std::string& path) {
}

Result<GraphInfo> GraphInfo::Load(const std::string& input,
const std::string& relative_location) {
const std::string& relative_location,
std::shared_ptr<FileSystem> fs) {
std::cout << "relative_location: " << relative_location << "\n";
GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(input));
std::string name = "graph";
std::string prefix =
Expand All @@ -265,13 +277,18 @@ Result<GraphInfo> GraphInfo::Load(const std::string& input,
GraphInfo graph_info(name, version, prefix);

std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs,
if (fs == nullptr) {
GAR_ASSIGN_OR_RAISE(fs,
FileSystemFromUriOrPath(relative_location, &no_url_path));
} else {
no_url_path = relative_location;
}
const auto& vertices = graph_meta->operator[]("vertices");
if (vertices) {
for (YAML::const_iterator it = vertices.begin(); it != vertices.end();
++it) {
std::string vertex_meta_file = no_url_path + it->as<std::string>();
std::string vertex_meta_file = no_url_path + "/" + it->as<std::string>();
std::cout << "vertex_meta_file: " << vertex_meta_file << "\n";
GAR_ASSIGN_OR_RAISE(auto input,
fs->ReadFileToValue<std::string>(vertex_meta_file));
GAR_ASSIGN_OR_RAISE(auto vertex_meta, Yaml::Load(input));
Expand All @@ -282,7 +299,8 @@ Result<GraphInfo> GraphInfo::Load(const std::string& input,
const auto& edges = graph_meta->operator[]("edges");
if (edges) {
for (YAML::const_iterator it = edges.begin(); it != edges.end(); ++it) {
std::string edge_meta_file = no_url_path + it->as<std::string>();
std::string edge_meta_file = no_url_path + "/" + it->as<std::string>();
std::cout << "edge_meta_file: " << edge_meta_file << "\n";
GAR_ASSIGN_OR_RAISE(auto input,
fs->ReadFileToValue<std::string>(edge_meta_file));
GAR_ASSIGN_OR_RAISE(auto edge_meta, Yaml::Load(input));
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/reader_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@ Result<std::pair<IdType, IdType>> GetAdjListOffsetOfVertex(
}

Result<IdType> GetVertexChunkNum(const std::string& prefix,
const VertexInfo& vertex_info) noexcept {
const VertexInfo& vertex_info,
std::shared_ptr<FileSystem> fs) noexcept {
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
if (fs == nullptr) {
GAR_ASSIGN_OR_RAISE(fs, FileSystemFromUriOrPath(prefix, &out_prefix));
out_prefix += "/";
} else {
out_prefix = prefix;
}
GAR_ASSIGN_OR_RAISE(auto vertex_num_file_suffix,
vertex_info.GetVerticesNumFilePath());
std::string vertex_num_file_path = out_prefix + vertex_num_file_suffix;
Expand Down

0 comments on commit afabb48

Please sign in to comment.