diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 38091db06..98a682ada 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,7 +47,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update -y - sudo apt-get install -y libboost-graph-dev ccache + sudo apt-get install -y libboost-graph-dev ccache libcurl4-openssl-dev - name: CMake run: | diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index afd5542c7..81fce3529 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -149,6 +149,11 @@ endmacro() find_package(Threads REQUIRED) find_yaml_cpp() find_package(OpenSSL QUIET) +if (APPLE) + find_package(curl REQUIRED) +else() + find_package(CURL REQUIRED) +endif() if(OPENSSL_FOUND) if(OPENSSL_VERSION LESS "1.1.0") message(ERROR "The OpenSSL must be greater than or equal to 1.1.0, current version is ${OPENSSL_VERSION}") @@ -204,6 +209,12 @@ macro(build_gar) if(OPENSSL_FOUND) target_link_libraries(gar PRIVATE OpenSSL::SSL) endif() + if (CURL_FOUND) + target_link_libraries(gar PUBLIC CURL::libcurl) + endif() + if (APPLE) + target_link_libraries(gar "-framework CoreFoundation") + endif() endmacro() build_gar() diff --git a/cpp/README.rst b/cpp/README.rst index 950e86b32..b0ee374a2 100644 --- a/cpp/README.rst +++ b/cpp/README.rst @@ -26,6 +26,7 @@ Building requires: sufficient. For MacOS, at least clang 5 is required * CMake 3.5 or higher * On Linux and macOS, ``make`` build utilities +* curl-devel (Linux) or curl (macOS), for s3 filesystem support Dependencies for optional features: diff --git a/cpp/cmake/apache-arrow.cmake b/cpp/cmake/apache-arrow.cmake index e995db38a..2566e1fe1 100644 --- a/cpp/cmake/apache-arrow.cmake +++ b/cpp/cmake/apache-arrow.cmake @@ -78,7 +78,8 @@ function(build_arrow) "-DARROW_WITH_ZSTD=ON" "-DARROW_WITH_ZLIB=OFF" "-DARROW_WITH_BROTLI=OFF" - "-DARROW_WITH_BZ2=OFF") + "-DARROW_WITH_BZ2=OFF" + "-DARROW_S3=ON") set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory") set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}") diff --git a/cpp/include/gar/graph_info.h b/cpp/include/gar/graph_info.h index 92c0b763b..c6e1d7b7d 100644 --- a/cpp/include/gar/graph_info.h +++ b/cpp/include/gar/graph_info.h @@ -34,6 +34,8 @@ namespace GAR_NAMESPACE_INTERNAL { class Yaml; +class FileSystem; + /** * Property is a struct to store the property information for a group. */ diff --git a/cpp/include/gar/reader/arrow_chunk_reader.h b/cpp/include/gar/reader/arrow_chunk_reader.h index a4da2359e..1e3c9ad8e 100644 --- a/cpp/include/gar/reader/arrow_chunk_reader.h +++ b/cpp/include/gar/reader/arrow_chunk_reader.h @@ -63,7 +63,7 @@ class VertexPropertyArrowChunkReader { vertex_info.GetPathPrefix(property_group)); std::string base_dir = prefix_ + pg_path_prefix; GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_, - utils::GetVertexChunkNum(prefix_, vertex_info_)); + utils::GetVertexChunkNum(prefix_, vertex_info)); } /** diff --git a/cpp/include/gar/utils/filesystem.h b/cpp/include/gar/utils/filesystem.h index 3c8779a3d..5313a4dd2 100644 --- a/cpp/include/gar/utils/filesystem.h +++ b/cpp/include/gar/utils/filesystem.h @@ -123,7 +123,7 @@ class FileSystem { /** * @brief Create a new FileSystem by URI * - * wrapper of arrow::fs::FileSystemFromUri + * wrapper of arrow::fs::FileSystemFromUriOrPath * * Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", * "gs" and "gcs". diff --git a/cpp/src/arrow_chunk_writer.cc b/cpp/src/arrow_chunk_writer.cc index 7db321f62..eed4b4885 100644 --- a/cpp/src/arrow_chunk_writer.cc +++ b/cpp/src/arrow_chunk_writer.cc @@ -122,8 +122,11 @@ Status VertexPropertyWriter::WriteChunk( auto schema = input_table->schema(); for (auto& property : property_group.GetProperties()) { int indice = schema->GetFieldIndex(property.name); - if (indice == -1) - return Status::InvalidOperation("invalid property"); + if (indice == -1) { + std::string msg = "invalid property: " + property.name + + " vertex: " + vertex_info_.GetLabel(); + return Status::InvalidOperation(msg); + } indices.push_back(indice); } GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table, @@ -274,8 +277,11 @@ Status EdgeChunkWriter::WritePropertyChunk( auto schema = input_table->schema(); for (auto& property : property_group.GetProperties()) { int indice = schema->GetFieldIndex(property.name); - if (indice == -1) - return Status::InvalidOperation("invalid property"); + if (indice == -1) { + std::string msg = "invalid property: " + property.name + + " edge: " + edge_info_.GetEdgeLabel(); + return Status::InvalidOperation(msg); + } indices.push_back(indice); } auto in_table = input_table->SelectColumns(indices).ValueOrDie(); diff --git a/cpp/src/filesystem.cc b/cpp/src/filesystem.cc index d190f4e86..07b0c82ef 100644 --- a/cpp/src/filesystem.cc +++ b/cpp/src/filesystem.cc @@ -73,6 +73,12 @@ static Status CastToLargeOffsetArray( GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks)); return Status::OK(); } + +Result ParseFileSystemUri(const std::string& uri_string) { + arrow::internal::Uri uri; + RETURN_NOT_ARROW_OK(uri.Parse(uri_string)); + return std::move(uri); +} } // namespace detail Result> FileSystem::ReadFileToTable( @@ -173,8 +179,8 @@ Result FileSystem::ReadFileToValue(const std::string& path) const template Status FileSystem::WriteValueToFile(const T& value, const std::string& path) const noexcept { - RETURN_NOT_ARROW_OK( - arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/")))); + // try to create the directory, oss filesystem may not support this, ignore + ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/")))); GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream, arrow_fs_->OpenOutputStream(path)); RETURN_NOT_ARROW_OK(ofstream->Write(&value, sizeof(T))); @@ -185,8 +191,8 @@ Status FileSystem::WriteValueToFile(const T& value, template <> Status FileSystem::WriteValueToFile(const std::string& value, const std::string& path) const noexcept { - RETURN_NOT_ARROW_OK( - arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/")))); + // try to create the directory, oss filesystem may not support this, ignore + ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/")))); GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream, arrow_fs_->OpenOutputStream(path)); RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size())); @@ -197,8 +203,8 @@ Status FileSystem::WriteValueToFile(const std::string& value, Status FileSystem::WriteTableToFile(const std::shared_ptr& table, FileType file_type, const std::string& path) const noexcept { - RETURN_NOT_ARROW_OK( - arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/")))); + // try to create the directory, oss filesystem may not support this, ignore + ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/")))); GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream, arrow_fs_->OpenOutputStream(path)); switch (file_type) { @@ -242,7 +248,8 @@ Status FileSystem::WriteTableToFile(const std::shared_ptr& table, Status FileSystem::CopyFile(const std::string& src_path, const std::string& dst_path) const noexcept { - RETURN_NOT_ARROW_OK( + // try to create the directory, oss filesystem may not support this, ignore + ARROW_UNUSED( arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of("/")))); RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path)); return Status::OK(); @@ -261,9 +268,31 @@ Result FileSystem::GetFileNumOfDir(const std::string& dir_path, } Result> FileSystemFromUriOrPath( - const std::string& uri, std::string* out_path) { + const std::string& uri_string, std::string* out_path) { + if (arrow::fs::internal::DetectAbsolutePath(uri_string)) { + // if the uri_string is an absolute path, we need to create a local file + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( + auto arrow_fs, + arrow::fs::FileSystemFromUriOrPath(uri_string, out_path)); + return std::make_shared(arrow_fs); + } + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( - auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri, out_path)); + auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string)); + GAR_ASSIGN_OR_RAISE(auto uri, detail::ParseFileSystemUri(uri_string)); + if (out_path != nullptr) { + if (uri.scheme() == "file" || uri.scheme() == "hdfs" || + uri.scheme().empty()) { + *out_path = uri.path(); + } else if (uri.scheme() == "s3" || uri.scheme() == "gs") { + // bucket name is the host, path is the path + // the arrow parser would delete the trailing slash which we don't want to + *out_path = uri.host() + uri.path(); + } else { + return Status::Invalid("Unrecognized filesystem type in URI: " + + uri_string); + } + } return std::make_shared(arrow_fs); } diff --git a/cpp/src/graph_info.cc b/cpp/src/graph_info.cc index 833b8372f..05e8c850c 100644 --- a/cpp/src/graph_info.cc +++ b/cpp/src/graph_info.cc @@ -227,29 +227,32 @@ Status EdgeInfo::Save(const std::string& path) const { return fs->WriteValueToFile(yaml_content, path); } +namespace { + static std::string PathToDirectory(const std::string& path) { - const size_t last_slash_idx = path.rfind('/'); - if (std::string::npos != last_slash_idx) { - return path.substr(0, last_slash_idx + 1); // +1 to include the slash + if (path.rfind("s3://", 0) == 0) { + int t = path.find_last_of('?'); + std::string prefix = path.substr(0, t); + std::string suffix = path.substr(t); + const size_t last_slash_idx = prefix.rfind('/'); + if (std::string::npos != last_slash_idx) { + return prefix.substr(0, last_slash_idx + 1) + suffix; + } + } else { + const size_t last_slash_idx = path.rfind('/'); + if (std::string::npos != last_slash_idx) { + return path.substr(0, last_slash_idx + 1); // +1 to include the slash + } } return path; } -Result GraphInfo::Load(const std::string& path) { - std::string no_url_path; - GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(path, &no_url_path)); - GAR_ASSIGN_OR_RAISE(auto yaml_content, - fs->ReadFileToValue(no_url_path)); - std::string path_dir = PathToDirectory(path); - return GraphInfo::Load(yaml_content, path_dir); -} - -Result GraphInfo::Load(const std::string& input, - const std::string& relative_location) { - GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(input)); - std::string name = "graph"; - std::string prefix = - relative_location; // default chunk file prefix is relative location +static Result ConstructGraphInfo( + std::shared_ptr graph_meta, const std::string& default_name, + const std::string& default_prefix, const std::shared_ptr fs, + const std::string& no_url_path) { + std::string name = default_name; + std::string prefix = default_prefix; if (graph_meta->operator[]("name")) { name = graph_meta->operator[]("name").as(); } @@ -264,9 +267,6 @@ Result GraphInfo::Load(const std::string& input, } GraphInfo graph_info(name, version, prefix); - std::string no_url_path; - GAR_ASSIGN_OR_RAISE(auto fs, - FileSystemFromUriOrPath(relative_location, &no_url_path)); const auto& vertices = graph_meta->operator[]("vertices"); if (vertices) { for (YAML::const_iterator it = vertices.begin(); it != vertices.end(); @@ -293,6 +293,34 @@ Result GraphInfo::Load(const std::string& input, return graph_info; } +} // namespace + +Result GraphInfo::Load(const std::string& path) { + std::string no_url_path; + GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(path, &no_url_path)); + GAR_ASSIGN_OR_RAISE(auto yaml_content, + fs->ReadFileToValue(no_url_path)); + GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(yaml_content)); + std::string default_name = "graph"; + std::string default_prefix = PathToDirectory(path); + no_url_path = PathToDirectory(no_url_path); + return ConstructGraphInfo(graph_meta, default_name, default_prefix, fs, + no_url_path); +} + +Result GraphInfo::Load(const std::string& input, + const std::string& relative_location) { + GAR_ASSIGN_OR_RAISE(auto graph_meta, Yaml::Load(input)); + std::string default_name = "graph"; + std::string default_prefix = + relative_location; // default chunk file prefix is relative location + std::string no_url_path; + GAR_ASSIGN_OR_RAISE(auto fs, + FileSystemFromUriOrPath(relative_location, &no_url_path)); + return ConstructGraphInfo(graph_meta, default_name, default_prefix, fs, + no_url_path); +} + Result GraphInfo::Dump() const noexcept { if (!IsValidated()) { return Status::Invalid(); diff --git a/cpp/test/test_graph.cc b/cpp/test/test_graph.cc index a433c8547..55832c810 100644 --- a/cpp/test/test_graph.cc +++ b/cpp/test/test_graph.cc @@ -74,7 +74,7 @@ TEST_CASE("test_edges_collection", "[Slow]") { auto& edges = std::get>(expect.value()); auto end = edges.end(); - GAR_NAMESPACE::IdType count = 0; + size_t count = 0; for (auto it = edges.begin(); it != end; ++it) { // access data through iterator directly std::cout << "src=" << it.source() << ", dst=" << it.destination() @@ -95,7 +95,7 @@ TEST_CASE("test_edges_collection", "[Slow]") { auto& edges1 = std::get>(expect1.value()); auto end1 = edges1.end(); - GAR_NAMESPACE::IdType count1 = 0; + size_t count1 = 0; for (auto it = edges1.begin(); it != end1; ++it) { count1++; } @@ -110,7 +110,7 @@ TEST_CASE("test_edges_collection", "[Slow]") { auto& edges2 = std::get>(expect2.value()); auto end2 = edges2.end(); - GAR_NAMESPACE::IdType count2 = 0; + size_t count2 = 0; for (auto it = edges2.begin(); it != end2; ++it) { auto edge = *it; std::cout << "src=" << edge.source() << ", dst=" << edge.destination() @@ -128,7 +128,7 @@ TEST_CASE("test_edges_collection", "[Slow]") { auto& edges3 = std::get>(expect3.value()); auto end3 = edges3.end(); - GAR_NAMESPACE::IdType count3 = 0; + size_t count3 = 0; for (auto it = edges3.begin(); it != end3; ++it) { count3++; } diff --git a/cpp/test/test_info.cc b/cpp/test/test_info.cc index 59b3881ef..4ffb07a16 100644 --- a/cpp/test/test_info.cc +++ b/cpp/test/test_info.cc @@ -21,6 +21,7 @@ limitations under the License. #include "./util.h" #include "gar/graph_info.h" +#include "gar/utils/filesystem.h" #include "gar/utils/version_parser.h" #define CATCH_CONFIG_MAIN @@ -336,6 +337,9 @@ TEST_CASE("test_graph_info_load_from_file") { std::string path = root + "/ldbc_sample/csv/ldbc_sample.graph.yml"; auto graph_info_result = GAR_NAMESPACE::GraphInfo::Load(path); + if (graph_info_result.has_error()) { + std::cout << graph_info_result.status().message() << std::endl; + } REQUIRE(!graph_info_result.has_error()); auto graph_info = graph_info_result.value(); REQUIRE(graph_info.GetName() == "ldbc_sample"); @@ -345,3 +349,17 @@ TEST_CASE("test_graph_info_load_from_file") { REQUIRE(vertex_infos.size() == 1); REQUIRE(edge_infos.size() == 1); } + +TEST_CASE("test_graph_info_load_from_s3") { + std::string path = + "s3://graphar/ldbc/ldbc.graph.yml" + "?endpoint_override=graphscope.oss-cn-beijing.aliyuncs.com"; + auto graph_info_result = GAR_NAMESPACE::GraphInfo::Load(path); + REQUIRE(!graph_info_result.has_error()); + auto graph_info = graph_info_result.value(); + REQUIRE(graph_info.GetName() == "ldbc"); + const auto& vertex_infos = graph_info.GetVertexInfos(); + const auto& edge_infos = graph_info.GetEdgeInfos(); + REQUIRE(vertex_infos.size() == 8); + REQUIRE(edge_infos.size() == 23); +}