Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable arrow S3 support to support reading and writing file with S3/OSS #125

Merged
merged 17 commits into from
Apr 18, 2023
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update -y
sudo apt-get install -y libboost-graph-dev ccache
sudo apt-get install -y libboost-graph-dev ccache libcurl4-openssl-dev

- name: CMake
run: |
Expand Down
11 changes: 11 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ endmacro()
find_package(Threads REQUIRED)
find_yaml_cpp()
find_package(OpenSSL QUIET)
if (APPLE)
find_package(curl REQUIRED)
else()
find_package(CURL REQUIRED)
endif()
if(OPENSSL_FOUND)
if(OPENSSL_VERSION LESS "1.1.0")
message(ERROR "The OpenSSL must be greater than or equal to 1.1.0, current version is ${OPENSSL_VERSION}")
Expand Down Expand Up @@ -204,6 +209,12 @@ macro(build_gar)
if(OPENSSL_FOUND)
target_link_libraries(gar PRIVATE OpenSSL::SSL)
endif()
if (CURL_FOUND)
target_link_libraries(gar PUBLIC CURL::libcurl)
endif()
if (APPLE)
target_link_libraries(gar "-framework CoreFoundation")
endif()
endmacro()

build_gar()
Expand Down
1 change: 1 addition & 0 deletions cpp/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Building requires:
sufficient. For MacOS, at least clang 5 is required
* CMake 3.5 or higher
* On Linux and macOS, ``make`` build utilities
* curl-devel (Linux) or curl (macOS), for s3 filesystem support

Dependencies for optional features:

Expand Down
3 changes: 2 additions & 1 deletion cpp/cmake/apache-arrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ function(build_arrow)
"-DARROW_WITH_ZSTD=ON"
"-DARROW_WITH_ZLIB=OFF"
"-DARROW_WITH_BROTLI=OFF"
"-DARROW_WITH_BZ2=OFF")
"-DARROW_WITH_BZ2=OFF"
"-DARROW_S3=ON")

set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}")
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace GAR_NAMESPACE_INTERNAL {

class Yaml;

class FileSystem;

/**
* Property is a struct to store the property information for a group.
*/
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class VertexPropertyArrowChunkReader {
vertex_info.GetPathPrefix(property_group));
std::string base_dir = prefix_ + pg_path_prefix;
GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
utils::GetVertexChunkNum(prefix_, vertex_info_));
utils::GetVertexChunkNum(prefix_, vertex_info));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/gar/utils/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class FileSystem {
/**
* @brief Create a new FileSystem by URI
*
* wrapper of arrow::fs::FileSystemFromUri
* wrapper of arrow::fs::FileSystemFromUriOrPath
*
* Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
* "gs" and "gcs".
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/arrow_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ Status VertexPropertyWriter::WriteChunk(
auto schema = input_table->schema();
for (auto& property : property_group.GetProperties()) {
int indice = schema->GetFieldIndex(property.name);
if (indice == -1)
return Status::InvalidOperation("invalid property");
if (indice == -1) {
std::string msg = "invalid property: " + property.name +
" vertex: " + vertex_info_.GetLabel();
return Status::InvalidOperation(msg);
}
indices.push_back(indice);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table,
Expand Down Expand Up @@ -274,8 +277,11 @@ Status EdgeChunkWriter::WritePropertyChunk(
auto schema = input_table->schema();
for (auto& property : property_group.GetProperties()) {
int indice = schema->GetFieldIndex(property.name);
if (indice == -1)
return Status::InvalidOperation("invalid property");
if (indice == -1) {
std::string msg = "invalid property: " + property.name +
" edge: " + edge_info_.GetEdgeLabel();
return Status::InvalidOperation(msg);
}
indices.push_back(indice);
}
auto in_table = input_table->SelectColumns(indices).ValueOrDie();
Expand Down
47 changes: 38 additions & 9 deletions cpp/src/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ static Status CastToLargeOffsetArray(
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
return Status::OK();
}

Result<arrow::internal::Uri> ParseFileSystemUri(const std::string& uri_string) {
arrow::internal::Uri uri;
RETURN_NOT_ARROW_OK(uri.Parse(uri_string));
return std::move(uri);
}
} // namespace detail

Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
Expand Down Expand Up @@ -173,8 +179,8 @@ Result<std::string> FileSystem::ReadFileToValue(const std::string& path) const
template <typename T>
Status FileSystem::WriteValueToFile(const T& value,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
arrow_fs_->OpenOutputStream(path));
RETURN_NOT_ARROW_OK(ofstream->Write(&value, sizeof(T)));
Expand All @@ -185,8 +191,8 @@ Status FileSystem::WriteValueToFile(const T& value,
template <>
Status FileSystem::WriteValueToFile(const std::string& value,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
arrow_fs_->OpenOutputStream(path));
RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size()));
Expand All @@ -197,8 +203,8 @@ Status FileSystem::WriteValueToFile(const std::string& value,
Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
FileType file_type,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
arrow_fs_->OpenOutputStream(path));
switch (file_type) {
Expand Down Expand Up @@ -242,7 +248,8 @@ Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,

Status FileSystem::CopyFile(const std::string& src_path,
const std::string& dst_path) const noexcept {
RETURN_NOT_ARROW_OK(
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(
arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of("/"))));
RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path));
return Status::OK();
Expand All @@ -261,9 +268,31 @@ Result<IdType> FileSystem::GetFileNumOfDir(const std::string& dir_path,
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
const std::string& uri, std::string* out_path) {
const std::string& uri_string, std::string* out_path) {
if (arrow::fs::internal::DetectAbsolutePath(uri_string)) {
// if the uri_string is an absolute path, we need to create a local file
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto arrow_fs,
arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
return std::make_shared<FileSystem>(arrow_fs);
}

GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri, out_path));
auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string));
GAR_ASSIGN_OR_RAISE(auto uri, detail::ParseFileSystemUri(uri_string));
if (out_path != nullptr) {
if (uri.scheme() == "file" || uri.scheme() == "hdfs" ||
uri.scheme().empty()) {
*out_path = uri.path();
} else if (uri.scheme() == "s3" || uri.scheme() == "gs") {
// bucket name is the host, path is the path
// the arrow parser would delete the trailing slash which we don't want to
*out_path = uri.host() + uri.path();
} else {
return Status::Invalid("Unrecognized filesystem type in URI: " +
uri_string);
}
}
return std::make_shared<FileSystem>(arrow_fs);
}

Expand Down
70 changes: 49 additions & 21 deletions cpp/src/graph_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,29 +227,32 @@ Status EdgeInfo::Save(const std::string& path) const {
return fs->WriteValueToFile(yaml_content, path);
}

namespace {

static std::string PathToDirectory(const std::string& path) {
const size_t last_slash_idx = path.rfind('/');
if (std::string::npos != last_slash_idx) {
return path.substr(0, last_slash_idx + 1); // +1 to include the slash
if (path.rfind("s3://", 0) == 0) {
int t = path.find_last_of('?');
std::string prefix = path.substr(0, t);
std::string suffix = path.substr(t);
const size_t last_slash_idx = prefix.rfind('/');
if (std::string::npos != last_slash_idx) {
return prefix.substr(0, last_slash_idx + 1) + suffix;
}
} else {
const size_t last_slash_idx = path.rfind('/');
if (std::string::npos != last_slash_idx) {
return path.substr(0, last_slash_idx + 1); // +1 to include the slash
}
}
return path;
}

Result<GraphInfo> GraphInfo::Load(const std::string& path) {
std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(path, &no_url_path));
GAR_ASSIGN_OR_RAISE(auto yaml_content,
fs->ReadFileToValue<std::string>(no_url_path));
std::string path_dir = PathToDirectory(path);
return GraphInfo::Load(yaml_content, path_dir);
}

Result<GraphInfo> GraphInfo::Load(const std::string& input,
const std::string& relative_location) {
GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(input));
std::string name = "graph";
std::string prefix =
relative_location; // default chunk file prefix is relative location
static Result<GraphInfo> ConstructGraphInfo(
std::shared_ptr<Yaml> graph_meta, const std::string& default_name,
const std::string& default_prefix, const std::shared_ptr<FileSystem> fs,
const std::string& no_url_path) {
std::string name = default_name;
std::string prefix = default_prefix;
if (graph_meta->operator[]("name")) {
name = graph_meta->operator[]("name").as<std::string>();
}
Expand All @@ -264,9 +267,6 @@ Result<GraphInfo> GraphInfo::Load(const std::string& input,
}
GraphInfo graph_info(name, version, prefix);

std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs,
FileSystemFromUriOrPath(relative_location, &no_url_path));
const auto& vertices = graph_meta->operator[]("vertices");
if (vertices) {
for (YAML::const_iterator it = vertices.begin(); it != vertices.end();
Expand All @@ -293,6 +293,34 @@ Result<GraphInfo> GraphInfo::Load(const std::string& input,
return graph_info;
}

} // namespace

Result<GraphInfo> GraphInfo::Load(const std::string& path) {
std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(path, &no_url_path));
GAR_ASSIGN_OR_RAISE(auto yaml_content,
fs->ReadFileToValue<std::string>(no_url_path));
GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(yaml_content));
std::string default_name = "graph";
std::string default_prefix = PathToDirectory(path);
no_url_path = PathToDirectory(no_url_path);
return ConstructGraphInfo(graph_meta, default_name, default_prefix, fs,
no_url_path);
}

Result<GraphInfo> GraphInfo::Load(const std::string& input,
const std::string& relative_location) {
GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(input));
std::string default_name = "graph";
std::string default_prefix =
relative_location; // default chunk file prefix is relative location
std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs,
FileSystemFromUriOrPath(relative_location, &no_url_path));
return ConstructGraphInfo(graph_meta, default_name, default_prefix, fs,
no_url_path);
}

Result<std::string> GraphInfo::Dump() const noexcept {
if (!IsValidated()) {
return Status::Invalid();
Expand Down
8 changes: 4 additions & 4 deletions cpp/test/test_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::ordered_by_source>>(expect.value());
auto end = edges.end();
GAR_NAMESPACE::IdType count = 0;
size_t count = 0;
for (auto it = edges.begin(); it != end; ++it) {
// access data through iterator directly
std::cout << "src=" << it.source() << ", dst=" << it.destination()
Expand All @@ -95,7 +95,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges1 = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::ordered_by_dest>>(expect1.value());
auto end1 = edges1.end();
GAR_NAMESPACE::IdType count1 = 0;
size_t count1 = 0;
for (auto it = edges1.begin(); it != end1; ++it) {
count1++;
}
Expand All @@ -110,7 +110,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges2 = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::ordered_by_source>>(expect2.value());
auto end2 = edges2.end();
GAR_NAMESPACE::IdType count2 = 0;
size_t count2 = 0;
for (auto it = edges2.begin(); it != end2; ++it) {
auto edge = *it;
std::cout << "src=" << edge.source() << ", dst=" << edge.destination()
Expand All @@ -128,7 +128,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges3 = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::unordered_by_source>>(expect3.value());
auto end3 = edges3.end();
GAR_NAMESPACE::IdType count3 = 0;
size_t count3 = 0;
for (auto it = edges3.begin(); it != end3; ++it) {
count3++;
}
Expand Down
18 changes: 18 additions & 0 deletions cpp/test/test_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include "./util.h"

#include "gar/graph_info.h"
#include "gar/utils/filesystem.h"
#include "gar/utils/version_parser.h"

#define CATCH_CONFIG_MAIN
Expand Down Expand Up @@ -336,6 +337,9 @@ TEST_CASE("test_graph_info_load_from_file") {

std::string path = root + "/ldbc_sample/csv/ldbc_sample.graph.yml";
auto graph_info_result = GAR_NAMESPACE::GraphInfo::Load(path);
if (graph_info_result.has_error()) {
std::cout << graph_info_result.status().message() << std::endl;
}
REQUIRE(!graph_info_result.has_error());
auto graph_info = graph_info_result.value();
REQUIRE(graph_info.GetName() == "ldbc_sample");
Expand All @@ -345,3 +349,17 @@ TEST_CASE("test_graph_info_load_from_file") {
REQUIRE(vertex_infos.size() == 1);
REQUIRE(edge_infos.size() == 1);
}

TEST_CASE("test_graph_info_load_from_s3") {
std::string path =
"s3://graphar/ldbc/ldbc.graph.yml"
"?endpoint_override=graphscope.oss-cn-beijing.aliyuncs.com";
auto graph_info_result = GAR_NAMESPACE::GraphInfo::Load(path);
REQUIRE(!graph_info_result.has_error());
auto graph_info = graph_info_result.value();
REQUIRE(graph_info.GetName() == "ldbc");
const auto& vertex_infos = graph_info.GetVertexInfos();
const auto& edge_infos = graph_info.GetEdgeInfos();
REQUIRE(vertex_infos.size() == 8);
REQUIRE(edge_infos.size() == 23);
}