Skip to content

Commit

Permalink
Enable arrow S3 support to support reading and writing file with S3/O…
Browse files Browse the repository at this point in the history
…SS (#125)
  • Loading branch information
acezen authored Apr 18, 2023
1 parent 551cc78 commit 5027c59
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update -y
sudo apt-get install -y libboost-graph-dev ccache
sudo apt-get install -y libboost-graph-dev ccache libcurl4-openssl-dev
- name: CMake
run: |
Expand Down
11 changes: 11 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ endmacro()
find_package(Threads REQUIRED)
find_yaml_cpp()
find_package(OpenSSL QUIET)
if (APPLE)
find_package(curl REQUIRED)
else()
find_package(CURL REQUIRED)
endif()
if(OPENSSL_FOUND)
if(OPENSSL_VERSION LESS "1.1.0")
message(ERROR "The OpenSSL must be greater than or equal to 1.1.0, current version is ${OPENSSL_VERSION}")
Expand Down Expand Up @@ -204,6 +209,12 @@ macro(build_gar)
if(OPENSSL_FOUND)
target_link_libraries(gar PRIVATE OpenSSL::SSL)
endif()
if (CURL_FOUND)
target_link_libraries(gar PUBLIC CURL::libcurl)
endif()
if (APPLE)
target_link_libraries(gar "-framework CoreFoundation")
endif()
endmacro()

build_gar()
Expand Down
1 change: 1 addition & 0 deletions cpp/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Building requires:
sufficient. For MacOS, at least clang 5 is required
* CMake 3.5 or higher
* On Linux and macOS, ``make`` build utilities
* curl-devel (Linux) or curl (macOS), for s3 filesystem support

Dependencies for optional features:

Expand Down
3 changes: 2 additions & 1 deletion cpp/cmake/apache-arrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ function(build_arrow)
"-DARROW_WITH_ZSTD=ON"
"-DARROW_WITH_ZLIB=OFF"
"-DARROW_WITH_BROTLI=OFF"
"-DARROW_WITH_BZ2=OFF")
"-DARROW_WITH_BZ2=OFF"
"-DARROW_S3=ON")

set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}")
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace GAR_NAMESPACE_INTERNAL {

class Yaml;

class FileSystem;

/**
* Property is a struct to store the property information for a group.
*/
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class VertexPropertyArrowChunkReader {
vertex_info.GetPathPrefix(property_group));
std::string base_dir = prefix_ + pg_path_prefix;
GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
utils::GetVertexChunkNum(prefix_, vertex_info_));
utils::GetVertexChunkNum(prefix_, vertex_info));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/gar/utils/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class FileSystem {
/**
* @brief Create a new FileSystem by URI
*
* wrapper of arrow::fs::FileSystemFromUri
* wrapper of arrow::fs::FileSystemFromUriOrPath
*
* Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
* "gs" and "gcs".
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/arrow_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ Status VertexPropertyWriter::WriteChunk(
auto schema = input_table->schema();
for (auto& property : property_group.GetProperties()) {
int indice = schema->GetFieldIndex(property.name);
if (indice == -1)
return Status::InvalidOperation("invalid property");
if (indice == -1) {
std::string msg = "invalid property: " + property.name +
" vertex: " + vertex_info_.GetLabel();
return Status::InvalidOperation(msg);
}
indices.push_back(indice);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table,
Expand Down Expand Up @@ -274,8 +277,11 @@ Status EdgeChunkWriter::WritePropertyChunk(
auto schema = input_table->schema();
for (auto& property : property_group.GetProperties()) {
int indice = schema->GetFieldIndex(property.name);
if (indice == -1)
return Status::InvalidOperation("invalid property");
if (indice == -1) {
std::string msg = "invalid property: " + property.name +
" edge: " + edge_info_.GetEdgeLabel();
return Status::InvalidOperation(msg);
}
indices.push_back(indice);
}
auto in_table = input_table->SelectColumns(indices).ValueOrDie();
Expand Down
47 changes: 38 additions & 9 deletions cpp/src/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ static Status CastToLargeOffsetArray(
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
return Status::OK();
}

Result<arrow::internal::Uri> ParseFileSystemUri(const std::string& uri_string) {
arrow::internal::Uri uri;
RETURN_NOT_ARROW_OK(uri.Parse(uri_string));
return std::move(uri);
}
} // namespace detail

Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
Expand Down Expand Up @@ -173,8 +179,8 @@ Result<std::string> FileSystem::ReadFileToValue(const std::string& path) const
template <typename T>
Status FileSystem::WriteValueToFile(const T& value,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
arrow_fs_->OpenOutputStream(path));
RETURN_NOT_ARROW_OK(ofstream->Write(&value, sizeof(T)));
Expand All @@ -185,8 +191,8 @@ Status FileSystem::WriteValueToFile(const T& value,
template <>
Status FileSystem::WriteValueToFile(const std::string& value,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
arrow_fs_->OpenOutputStream(path));
RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size()));
Expand All @@ -197,8 +203,8 @@ Status FileSystem::WriteValueToFile(const std::string& value,
Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
FileType file_type,
const std::string& path) const noexcept {
RETURN_NOT_ARROW_OK(
arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
arrow_fs_->OpenOutputStream(path));
switch (file_type) {
Expand Down Expand Up @@ -242,7 +248,8 @@ Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,

Status FileSystem::CopyFile(const std::string& src_path,
const std::string& dst_path) const noexcept {
RETURN_NOT_ARROW_OK(
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(
arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of("/"))));
RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path));
return Status::OK();
Expand All @@ -261,9 +268,31 @@ Result<IdType> FileSystem::GetFileNumOfDir(const std::string& dir_path,
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
const std::string& uri, std::string* out_path) {
const std::string& uri_string, std::string* out_path) {
if (arrow::fs::internal::DetectAbsolutePath(uri_string)) {
// if the uri_string is an absolute path, we need to create a local file
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto arrow_fs,
arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
return std::make_shared<FileSystem>(arrow_fs);
}

GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri, out_path));
auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string));
GAR_ASSIGN_OR_RAISE(auto uri, detail::ParseFileSystemUri(uri_string));
if (out_path != nullptr) {
if (uri.scheme() == "file" || uri.scheme() == "hdfs" ||
uri.scheme().empty()) {
*out_path = uri.path();
} else if (uri.scheme() == "s3" || uri.scheme() == "gs") {
// bucket name is the host, path is the path
// the arrow parser would delete the trailing slash which we don't want to
*out_path = uri.host() + uri.path();
} else {
return Status::Invalid("Unrecognized filesystem type in URI: " +
uri_string);
}
}
return std::make_shared<FileSystem>(arrow_fs);
}

Expand Down
70 changes: 49 additions & 21 deletions cpp/src/graph_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,29 +227,32 @@ Status EdgeInfo::Save(const std::string& path) const {
return fs->WriteValueToFile(yaml_content, path);
}

namespace {

static std::string PathToDirectory(const std::string& path) {
const size_t last_slash_idx = path.rfind('/');
if (std::string::npos != last_slash_idx) {
return path.substr(0, last_slash_idx + 1); // +1 to include the slash
if (path.rfind("s3://", 0) == 0) {
int t = path.find_last_of('?');
std::string prefix = path.substr(0, t);
std::string suffix = path.substr(t);
const size_t last_slash_idx = prefix.rfind('/');
if (std::string::npos != last_slash_idx) {
return prefix.substr(0, last_slash_idx + 1) + suffix;
}
} else {
const size_t last_slash_idx = path.rfind('/');
if (std::string::npos != last_slash_idx) {
return path.substr(0, last_slash_idx + 1); // +1 to include the slash
}
}
return path;
}

Result<GraphInfo> GraphInfo::Load(const std::string& path) {
std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(path, &no_url_path));
GAR_ASSIGN_OR_RAISE(auto yaml_content,
fs->ReadFileToValue<std::string>(no_url_path));
std::string path_dir = PathToDirectory(path);
return GraphInfo::Load(yaml_content, path_dir);
}

Result<GraphInfo> GraphInfo::Load(const std::string& input,
const std::string& relative_location) {
GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(input));
std::string name = "graph";
std::string prefix =
relative_location; // default chunk file prefix is relative location
static Result<GraphInfo> ConstructGraphInfo(
std::shared_ptr<Yaml> graph_meta, const std::string& default_name,
const std::string& default_prefix, const std::shared_ptr<FileSystem> fs,
const std::string& no_url_path) {
std::string name = default_name;
std::string prefix = default_prefix;
if (graph_meta->operator[]("name")) {
name = graph_meta->operator[]("name").as<std::string>();
}
Expand All @@ -264,9 +267,6 @@ Result<GraphInfo> GraphInfo::Load(const std::string& input,
}
GraphInfo graph_info(name, version, prefix);

std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs,
FileSystemFromUriOrPath(relative_location, &no_url_path));
const auto& vertices = graph_meta->operator[]("vertices");
if (vertices) {
for (YAML::const_iterator it = vertices.begin(); it != vertices.end();
Expand All @@ -293,6 +293,34 @@ Result<GraphInfo> GraphInfo::Load(const std::string& input,
return graph_info;
}

} // namespace

Result<GraphInfo> GraphInfo::Load(const std::string& path) {
std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(path, &no_url_path));
GAR_ASSIGN_OR_RAISE(auto yaml_content,
fs->ReadFileToValue<std::string>(no_url_path));
GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(yaml_content));
std::string default_name = "graph";
std::string default_prefix = PathToDirectory(path);
no_url_path = PathToDirectory(no_url_path);
return ConstructGraphInfo(graph_meta, default_name, default_prefix, fs,
no_url_path);
}

Result<GraphInfo> GraphInfo::Load(const std::string& input,
const std::string& relative_location) {
GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(input));
std::string default_name = "graph";
std::string default_prefix =
relative_location; // default chunk file prefix is relative location
std::string no_url_path;
GAR_ASSIGN_OR_RAISE(auto fs,
FileSystemFromUriOrPath(relative_location, &no_url_path));
return ConstructGraphInfo(graph_meta, default_name, default_prefix, fs,
no_url_path);
}

Result<std::string> GraphInfo::Dump() const noexcept {
if (!IsValidated()) {
return Status::Invalid();
Expand Down
8 changes: 4 additions & 4 deletions cpp/test/test_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::ordered_by_source>>(expect.value());
auto end = edges.end();
GAR_NAMESPACE::IdType count = 0;
size_t count = 0;
for (auto it = edges.begin(); it != end; ++it) {
// access data through iterator directly
std::cout << "src=" << it.source() << ", dst=" << it.destination()
Expand All @@ -95,7 +95,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges1 = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::ordered_by_dest>>(expect1.value());
auto end1 = edges1.end();
GAR_NAMESPACE::IdType count1 = 0;
size_t count1 = 0;
for (auto it = edges1.begin(); it != end1; ++it) {
count1++;
}
Expand All @@ -110,7 +110,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges2 = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::ordered_by_source>>(expect2.value());
auto end2 = edges2.end();
GAR_NAMESPACE::IdType count2 = 0;
size_t count2 = 0;
for (auto it = edges2.begin(); it != end2; ++it) {
auto edge = *it;
std::cout << "src=" << edge.source() << ", dst=" << edge.destination()
Expand All @@ -128,7 +128,7 @@ TEST_CASE("test_edges_collection", "[Slow]") {
auto& edges3 = std::get<GAR_NAMESPACE::EdgesCollection<
GAR_NAMESPACE::AdjListType::unordered_by_source>>(expect3.value());
auto end3 = edges3.end();
GAR_NAMESPACE::IdType count3 = 0;
size_t count3 = 0;
for (auto it = edges3.begin(); it != end3; ++it) {
count3++;
}
Expand Down
18 changes: 18 additions & 0 deletions cpp/test/test_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include "./util.h"

#include "gar/graph_info.h"
#include "gar/utils/filesystem.h"
#include "gar/utils/version_parser.h"

#define CATCH_CONFIG_MAIN
Expand Down Expand Up @@ -336,6 +337,9 @@ TEST_CASE("test_graph_info_load_from_file") {

std::string path = root + "/ldbc_sample/csv/ldbc_sample.graph.yml";
auto graph_info_result = GAR_NAMESPACE::GraphInfo::Load(path);
if (graph_info_result.has_error()) {
std::cout << graph_info_result.status().message() << std::endl;
}
REQUIRE(!graph_info_result.has_error());
auto graph_info = graph_info_result.value();
REQUIRE(graph_info.GetName() == "ldbc_sample");
Expand All @@ -345,3 +349,17 @@ TEST_CASE("test_graph_info_load_from_file") {
REQUIRE(vertex_infos.size() == 1);
REQUIRE(edge_infos.size() == 1);
}

TEST_CASE("test_graph_info_load_from_s3") {
std::string path =
"s3://graphar/ldbc/ldbc.graph.yml"
"?endpoint_override=graphscope.oss-cn-beijing.aliyuncs.com";
auto graph_info_result = GAR_NAMESPACE::GraphInfo::Load(path);
REQUIRE(!graph_info_result.has_error());
auto graph_info = graph_info_result.value();
REQUIRE(graph_info.GetName() == "ldbc");
const auto& vertex_infos = graph_info.GetVertexInfos();
const auto& edge_infos = graph_info.GetEdgeInfos();
REQUIRE(vertex_infos.size() == 8);
REQUIRE(edge_infos.size() == 23);
}

0 comments on commit 5027c59

Please sign in to comment.