Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 committed Aug 16, 2023
1 parent df8ce99 commit 5d3d473
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 33 deletions.
80 changes: 47 additions & 33 deletions flex/storages/rt_mutable_graph/loading_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static bool fetch_src_dst_column_mapping(const Schema& schema, YAML::Node node,
}

} else {
LOG(FATAL) << "No primary key column mapping for [" << key << "]";
LOG(ERROR) << "No primary key column mapping for [" << key << "]";
}
return true;
}
Expand Down Expand Up @@ -105,7 +105,7 @@ static bool parse_column_mappings(

// These files share the same column mapping.
static bool parse_vertex_files(
YAML::Node node, const Schema& schema, const std::string& data_location,
YAML::Node node, const Schema& schema,
std::unordered_map<label_t, std::vector<std::string>>& files,
std::unordered_map<label_t, std::vector<std::pair<size_t, std::string>>>&
vertex_mapping) {
Expand Down Expand Up @@ -161,9 +161,6 @@ static bool parse_vertex_files(
int num = files_node.size();
for (int i = 0; i < num; ++i) {
std::string file_path = files_node[i].as<std::string>();
if (!data_location.empty()) {
file_path = data_location + "/" + file_path;
}
if (!access_file(file_path)) {
LOG(ERROR) << "vertex file - " << file_path << " file not found...";
}
Expand All @@ -172,31 +169,31 @@ static bool parse_vertex_files(
}
return true;
} else {
LOG(FATAL) << "vertex [" << label_name << "] does not have input files";
LOG(ERROR) << "vertex [" << label_name << "] does not have input files";
return false;
}
}

static bool parse_vertices_files_schema(
YAML::Node node, const Schema& schema, const std::string& data_location,
YAML::Node node, const Schema& schema,
std::unordered_map<label_t, std::vector<std::string>>& files,
std::unordered_map<label_t, std::vector<std::pair<size_t, std::string>>>&
column_mappings) {
if (!node.IsSequence()) {
LOG(FATAL) << "vertex is not set properly";
LOG(ERROR) << "vertex is not set properly";
return false;
}
int num = node.size();
for (int i = 0; i < num; ++i) {
if (!parse_vertex_files(node[i], schema, data_location, files,
column_mappings)) {
if (!parse_vertex_files(node[i], schema, files, column_mappings)) {
return false;
}
}
return true;
}

static bool parse_edge_files(
YAML::Node node, const Schema& schema, const std::string& data_location,
YAML::Node node, const Schema& schema,
std::unordered_map<
std::tuple<label_t, label_t, label_t>, std::vector<std::string>,
boost::hash<std::tuple<label_t, label_t, label_t>>>& files,
Expand All @@ -209,22 +206,22 @@ static bool parse_edge_files(
boost::hash<typename LoadingConfig::edge_triplet_type>>&
edge_src_dst_col) {
if (!node["type_triplet"]) {
LOG(FATAL) << "edge [type_triplet] is not set properly";
LOG(ERROR) << "edge [type_triplet] is not set properly";
return false;
}
auto triplet_node = node["type_triplet"];
std::string src_label, dst_label, edge_label;
if (!get_scalar(triplet_node, "edge", edge_label)) {
LOG(FATAL) << "Field [edge] is not set for edge [" << triplet_node << "]";
LOG(ERROR) << "Field [edge] is not set for edge [" << triplet_node << "]";
return false;
}
if (!get_scalar(triplet_node, "source_vertex", src_label)) {
LOG(FATAL) << "Field [source_vertex] is not set for edge [" << edge_label
LOG(ERROR) << "Field [source_vertex] is not set for edge [" << edge_label
<< "]";
return false;
}
if (!get_scalar(triplet_node, "destination_vertex", dst_label)) {
LOG(FATAL) << "Field [destination_vertex] is not set for edge ["
LOG(ERROR) << "Field [destination_vertex] is not set for edge ["
<< edge_label << "]";
return false;
}
Expand Down Expand Up @@ -317,9 +314,6 @@ static bool parse_edge_files(
int num = files_node.size();
for (int i = 0; i < num; ++i) {
std::string file_path = files_node[i].as<std::string>();
if (!data_location.empty()) {
file_path = data_location + "/" + file_path;
}
if (!access_file(file_path)) {
LOG(ERROR) << "edge file - " << file_path << " file not found...";
}
Expand All @@ -330,13 +324,13 @@ static bool parse_edge_files(
std::filesystem::canonical(path));
}
} else {
LOG(FATAL) << "No edge files found for edge " << edge_label << "...";
LOG(ERROR) << "No edge files found for edge " << edge_label << "...";
}
return true;
}

static bool parse_edges_files_schema(
YAML::Node node, const Schema& schema, const std::string& data_location,
YAML::Node node, const Schema& schema,
std::unordered_map<
std::tuple<label_t, label_t, label_t>, std::vector<std::string>,
boost::hash<std::tuple<label_t, label_t, label_t>>>& files,
Expand All @@ -355,7 +349,7 @@ static bool parse_edges_files_schema(
int num = node.size();
LOG(INFO) << " Try to parse " << num << "edge configuration";
for (int i = 0; i < num; ++i) {
if (!parse_edge_files(node[i], schema, data_location, files, edge_mapping,
if (!parse_edge_files(node[i], schema, files, edge_mapping,
edge_src_dst_col)) {
return false;
}
Expand All @@ -369,6 +363,9 @@ static bool parse_bulk_load_config_file(const std::string& config_file,
YAML::Node root = YAML::LoadFile(config_file);
std::string data_location;
load_config.scheme_ = "file"; // default data source is file
load_config.method_ = "init";
load_config.delimiter_ = "|";
load_config.format_ = "csv";
if (root["loading_config"]) {
auto loading_config_node = root["loading_config"];
if (loading_config_node["data_source"]) {
Expand All @@ -378,8 +375,6 @@ static bool parse_bulk_load_config_file(const std::string& config_file,
}
get_scalar(loading_config_node, "import_option", load_config.method_);
auto format_node = loading_config_node["format"];
load_config.delimiter_ = "|";
load_config.format_ = "csv";
if (format_node) {
get_scalar(format_node, "type", load_config.format_);
if (load_config.format_ == "csv") {
Expand All @@ -388,22 +383,37 @@ static bool parse_bulk_load_config_file(const std::string& config_file,
load_config.delimiter_);
}
} else {
LOG(FATAL) << "Only support csv format now";
LOG(ERROR) << "Only support csv format now";
return false;
}
}
}
// only delimeter with | is supported now
if (load_config.delimiter_ != "|") {
LOG(FATAL) << "Only support | as delimiter now";
if (load_config.GetValidDelimiters().find(load_config.delimiter_) ==
load_config.GetValidDelimiters().end()) {
LOG(ERROR) << "Not valid delimiter: " << load_config.delimiter_
<< ", supported delimeters: '|'";
return false;
}
if (load_config.method_ != "init") {
LOG(FATAL) << "Only support init method now";
LOG(ERROR) << "Only support init method now";
return false;
}
if (data_location.empty()) {
LOG(WARNING) << "data_location is not set";
// check FLEX_DATA_DIR env variable
const char* flex_data_dir = std::getenv("FLEX_DATA_DIR");
if (!flex_data_dir) {
LOG(ERROR) << "data_location is not set, and FLEX_DATA_DIR is not set";
return false;
} else {
LOG(INFO) << "data_location is not set, use FLEX_DATA_DIR: "
<< flex_data_dir;
}
} else {
// override FLEX_DATA_DIR env variable
setenv("FLEX_DATA_DIR", data_location.c_str(), 1);
}

if (load_config.scheme_ != "file") {
LOG(ERROR) << "Only support [file] data source now";
return false;
Expand All @@ -416,18 +426,16 @@ static bool parse_bulk_load_config_file(const std::string& config_file,
if (root["vertex_mappings"]) {
VLOG(10) << "vertex_mappings is set";
if (!parse_vertices_files_schema(root["vertex_mappings"], schema,
data_location,
load_config.vertex_loading_meta_,
load_config.vertex_column_mappings_)) {
return false;
}
}
if (root["edge_mappings"]) {
VLOG(10) << "edge_mappings is set";
if (!parse_edges_files_schema(root["edge_mappings"], schema, data_location,
load_config.edge_loading_meta_,
load_config.edge_column_mappings_,
load_config.edge_src_dst_col_)) {
if (!parse_edges_files_schema(
root["edge_mappings"], schema, load_config.edge_loading_meta_,
load_config.edge_column_mappings_, load_config.edge_src_dst_col_)) {
return false;
}
}
Expand Down Expand Up @@ -541,4 +549,10 @@ LoadingConfig::GetEdgeSrcDstCol(label_t src_label_id, label_t dst_label_id,
return edge_src_dst_col_.at(key);
}

const std::unordered_set<std::string>& LoadingConfig::GetValidDelimiters() {
return LoadingConfig::valid_delimiter_;
}
// define delimeter here
const std::unordered_set<std::string> LoadingConfig::valid_delimiter_ = {"|"};

} // namespace gs
4 changes: 4 additions & 0 deletions flex/storages/rt_mutable_graph/loading_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <filesystem>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include "flex/storages/rt_mutable_graph/schema.h"
#include "flex/utils/yaml_utils.h"

Expand All @@ -41,6 +42,7 @@ class LoadingConfig {
using edge_triplet_type =
std::tuple<schema_label_type, schema_label_type,
schema_label_type>; // src_label_t, dst_label_t, edge_label_t
static const std::unordered_set<std::string> valid_delimiter_;

// Check whether loading config file is consistent with schema
static LoadingConfig ParseFromYaml(const Schema& schema,
Expand Down Expand Up @@ -91,6 +93,8 @@ class LoadingConfig {
const std::pair<std::vector<size_t>, std::vector<size_t>>& GetEdgeSrcDstCol(
label_t src_label_id, label_t dst_label_id, label_t edge_label_id) const;

static const std::unordered_set<std::string>& GetValidDelimiters();

private:
const Schema& schema_;
std::string scheme_; // "file", "hdfs", "oss", "s3"
Expand Down

0 comments on commit 5d3d473

Please sign in to comment.