diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.cc b/paddle/fluid/distributed/ps/table/common_graph_table.cc index da583254010e2..430d8ef3ed06a 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.cc +++ b/paddle/fluid/distributed/ps/table/common_graph_table.cc @@ -18,8 +18,12 @@ #include #include #include +#include + #include "gflags/gflags.h" +#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/distributed/common/utils.h" +#include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/distributed/ps/table/graph/graph_node.h" #include "paddle/fluid/framework/generator.h" #include "paddle/fluid/string/printf.h" @@ -1022,6 +1026,75 @@ int32_t GraphTable::Load(const std::string &path, const std::string ¶m) { return 0; } +std::string GraphTable::get_inverse_etype(std::string &etype) { + auto etype_split = paddle::string::split_string(etype, "2"); + std::string res; + if ((int)etype_split.size() == 3) { + res = etype_split[2] + "2" + etype_split[1] + "2" + etype_split[0]; + } else { + res = etype_split[1] + "2" + etype_split[0]; + } + return res; +} + +int32_t GraphTable::load_node_and_edge_file(std::string etype, std::string ntype, std::string epath, + std::string npath, int part_num, bool reverse) { + auto etypes = paddle::string::split_string(etype, ","); + auto ntypes = paddle::string::split_string(ntype, ","); + VLOG(0) << "etypes size: " << etypes.size(); + VLOG(0) << "whether reverse: " << reverse; + std::string delim = ";"; + size_t total_len = etypes.size() + 1; // 1 is for node + + std::vector> tasks; + for (size_t i = 0; i < total_len; i++) { + tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( + [&, i, this]() ->int { + if (i < etypes.size()) { + std::string etype_path = epath + "/" + etypes[i]; + auto etype_path_list = paddle::framework::localfs_list(etype_path); + std::string etype_path_str; + if (part_num > 0 && part_num < (int)etype_path_list.size()) { + std::vector sub_etype_path_list(etype_path_list.begin(), etype_path_list.begin() + part_num); + etype_path_str = boost::algorithm::join(sub_etype_path_list, delim); + } else { + etype_path_str = boost::algorithm::join(etype_path_list, delim); + } + this->load_edges(etype_path_str, false, etypes[i]); + if (reverse) { + std::string r_etype = get_inverse_etype(etypes[i]); + this->load_edges(etype_path_str, true, r_etype); + } + } else { + auto npath_list = paddle::framework::localfs_list(npath); + std::string npath_str; + if (part_num > 0 && part_num < (int)npath_list.size()) { + std::vector sub_npath_list(npath_list.begin(), npath_list.begin() + part_num); + npath_str = boost::algorithm::join(sub_npath_list, delim); + } else { + npath_str = boost::algorithm::join(npath_list, delim); + } + + if (ntypes.size() == 0) { + VLOG(0) << "node_type not specified, nothing will be loaded "; + return 0; + } else { + for (size_t i = 0; i < ntypes.size(); i++) { + if (feature_to_id.find(ntypes[i]) == feature_to_id.end()) { + VLOG(0) << "node_type " << ntypes[i] << "is not defined, will not load"; + return 0; + } + } + } + this->load_nodes(npath_str, ""); + } + return 0; + })); + } + for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + return 0; +} + int32_t GraphTable::get_nodes_ids_by_ranges( int type_id, int idx, std::vector> ranges, std::vector &res) { @@ -1061,19 +1134,20 @@ int32_t GraphTable::get_nodes_ids_by_ranges( return 0; } -int32_t GraphTable::parse_node_file(const std::string &path, const std::string &node_type, int idx, uint64_t &count, uint64_t &valid_count) { +std::pair GraphTable::parse_node_file(const std::string &path, const std::string &node_type, int idx) { std::ifstream file(path); std::string line; uint64_t local_count = 0; uint64_t local_valid_count = 0; while (std::getline(file, line)) { - auto values = paddle::string::split_string(line, "\t"); - if (values.size() < 2) continue; - if (values[0] != node_type) { + size_t start = line.find_first_of('\t'); + if (start == std::string::npos) continue; + std::string parse_node_type = line.substr(0, start); + if (parse_node_type != node_type) { continue; } - - auto id = std::stoull(values[1]); + size_t end = line.find_first_of('\t', start + 1); + uint64_t id = std::stoull(line.substr(start +1, end - start - 1)); size_t shard_id = id % shard_num; if (shard_id >= shard_end || shard_id < shard_start) { VLOG(4) << "will not load " << id << " from " << path @@ -1086,18 +1160,64 @@ int32_t GraphTable::parse_node_file(const std::string &path, const std::string & auto node = feature_shards[idx][index]->add_feature_node(id, false); if (node != NULL) { node->set_feature_size(feat_name[idx].size()); - for (size_t slice = 2; slice < values.size(); slice++) { - parse_feature(idx, values[slice], node); + while (end != std::string::npos) { + start = end; + end = line.find_first_of('\t', start + 1); + std::string tmp_str = line.substr(start + 1, end - start - 1); + parse_feature(idx, tmp_str, node); } } local_valid_count++; } - mutex_.lock(); - count += local_count; - valid_count += local_valid_count; - mutex_.unlock(); VLOG(0) << "node_type[" << node_type << "] loads " << local_count << " nodes from filepath->" << path; - return 0; + return {local_count, local_valid_count}; +} + +std::pair GraphTable::parse_node_file(const std::string &path) { + std::ifstream file(path); + std::string line; + uint64_t local_count = 0; + uint64_t local_valid_count = 0; + int idx = 0; + + auto path_split = paddle::string::split_string(path, "/"); + auto path_name = path_split[path_split.size() - 1]; + + while (std::getline(file, line)) { + size_t start = line.find_first_of('\t'); + if (start == std::string::npos) continue; + std::string parse_node_type = line.substr(0, start); + auto it = feature_to_id.find(parse_node_type); + if (it == feature_to_id.end()) { + VLOG(0) << parse_node_type << "type error, please check"; + continue; + } + idx = it->second; + size_t end = line.find_first_of('\t', start + 1); + uint64_t id = std::stoull(line.substr(start +1, end - start - 1)); + size_t shard_id = id % shard_num; + if (shard_id >= shard_end || shard_id < shard_start) { + VLOG(4) << "will not load " << id << " from " << path + << ", please check id distribution"; + continue; + } + local_count++; + + size_t index = shard_id - shard_start; + auto node = feature_shards[idx][index]->add_feature_node(id, false); + if (node != NULL) { + while (end != std::string::npos) { + start = end; + end = line.find_first_of('\t', start + 1); + std::string tmp_str = line.substr(start + 1, end - start - 1); + parse_feature(idx, tmp_str, node); + } + } + + local_valid_count++; + } + VLOG(0) << local_valid_count << "/" << local_count << " nodes from filepath->" << path; + return {local_count, local_valid_count}; } // TODO opt load all node_types in once reading @@ -1106,33 +1226,40 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) { uint64_t count = 0; uint64_t valid_count = 0; int idx = 0; - if (node_type == "") { - VLOG(0) << "node_type not specified, loading edges to " << id_to_feature[0] - << " part"; - } else { - if (feature_to_id.find(node_type) == feature_to_id.end()) { - VLOG(0) << "node_type " << node_type - << " is not defined, nothing will be loaded"; - return 0; - } - idx = feature_to_id[node_type]; - } - - VLOG(0) << "Begin GraphTable::load_nodes() node_type[" << node_type << "]"; if (FLAGS_graph_load_in_parallel) { - std::vector> tasks; + if (node_type == "") { + VLOG(0) << "Begin GraphTable::load_nodes(), will load all node_type once"; + } + std::vector>> tasks; for (size_t i = 0; i < paths.size(); i++) { tasks.push_back(load_node_edge_task_pool->enqueue( - [&, i, idx, this]() -> int { - parse_node_file(paths[i], node_type, idx, count, valid_count); - return 0; + [&, i, this]() -> std::pair { + return parse_node_file(paths[i]); })); } - for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + for (int i = 0; i < (int)tasks.size(); i++) { + auto res = tasks[i].get(); + count += res.first; + valid_count += res.second; + } } else { + VLOG(0) << "Begin GraphTable::load_nodes() node_type[" << node_type << "]"; + if (node_type == "") { + VLOG(0) << "node_type not specified, loading edges to " << id_to_feature[0] + << " part"; + } else { + if (feature_to_id.find(node_type) == feature_to_id.end()) { + VLOG(0) << "node_type " << node_type + << " is not defined, nothing will be loaded"; + return 0; + } + idx = feature_to_id[node_type]; + } for (auto path : paths) { VLOG(2) << "Begin GraphTable::load_nodes(), path[" << path << "]"; - parse_node_file(path, node_type, idx, count, valid_count); + auto res = parse_node_file(path, node_type, idx); + count += res.first; + valid_count += res.second; } } @@ -1151,7 +1278,7 @@ int32_t GraphTable::build_sampler(int idx, std::string sample_type) { return 0; } -int32_t GraphTable::parse_edge_file(const std::string &path, int idx, bool reverse, uint64_t &count, uint64_t &valid_count) { +std::pair GraphTable::parse_edge_file(const std::string &path, int idx, bool reverse) { std::string sample_type = "random"; bool is_weighted = false; std::ifstream file(path); @@ -1164,13 +1291,13 @@ int32_t GraphTable::parse_edge_file(const std::string &path, int idx, bool rever auto part_name_split = paddle::string::split_string(path_split[path_split.size() - 1], "-"); part_num = std::stoull(part_name_split[part_name_split.size() - 1]); } - + while (std::getline(file, line)) { - auto values = paddle::string::split_string(line, "\t"); + size_t start = line.find_first_of('\t'); + if (start == std::string::npos) continue; local_count++; - if (values.size() < 2) continue; - auto src_id = std::stoull(values[0]); - auto dst_id = std::stoull(values[1]); + uint64_t src_id = std::stoull(line.substr(0, start)); + uint64_t dst_id = std::stoull(line.substr(start + 1)); if (reverse) { std::swap(src_id, dst_id); } @@ -1182,8 +1309,9 @@ int32_t GraphTable::parse_edge_file(const std::string &path, int idx, bool rever } float weight = 1; - if (values.size() == 3) { - weight = std::stof(values[2]); + size_t last = line.find_last_of('\t'); + if (start != last) { + weight = std::stof(line.substr(last + 1)); sample_type = "weighted"; is_weighted = true; } @@ -1193,34 +1321,26 @@ int32_t GraphTable::parse_edge_file(const std::string &path, int idx, bool rever << ", please check id distribution"; continue; } - size_t index = src_shard_id - shard_start; - edge_shards[idx][index]->add_graph_node(src_id)->build_edges(is_weighted); - edge_shards[idx][index]->add_neighbor(src_id, dst_id, weight); + auto node = edge_shards[idx][index]->add_graph_node(src_id); + if (node != NULL) { + node->build_edges(is_weighted); + node->add_edge(dst_id, weight); + } + local_valid_count++; + } - mutex_.lock(); - count += local_count; - valid_count += local_valid_count; -#ifdef PADDLE_WITH_HETERPS - const uint64_t fixed_load_edges = 1000000; - if (count > fixed_load_edges && search_level == 2) { - dump_edges_to_ssd(idx); - VLOG(0) << "dumping edges to ssd, edge count is reset to 0"; - clear_graph(idx); - count = 0; - } -#endif - mutex_.unlock(); VLOG(0) << local_count << " edges are loaded from filepath->" << path; - return 0; + return {local_count, local_valid_count}; + } int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge, const std::string &edge_type) { #ifdef PADDLE_WITH_HETERPS if (search_level == 2) total_memory_cost = 0; - //const uint64_t fixed_load_edges = 1000000; + const uint64_t fixed_load_edges = 1000000; #endif int idx = 0; if (edge_type == "") { @@ -1241,18 +1361,23 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge, VLOG(0) << "Begin GraphTable::load_edges() edge_type[" << edge_type << "]"; if (FLAGS_graph_load_in_parallel) { - std::vector> tasks; + std::vector>> tasks; for (int i = 0; i < paths.size(); i++) { - tasks.push_back(load_node_edge_task_pool->enqueue( - [&, i, idx, this]() -> int { - parse_edge_file(paths[i], idx, reverse_edge, count, valid_count); - return 0; + tasks.push_back(load_node_edge_task_pool->enqueue( + [&, i, idx, this]() -> std::pair { + return parse_edge_file(paths[i], idx, reverse_edge); })); } - for (int j = 0; j < (int)tasks.size(); j++) tasks[j].get(); + for (int j = 0; j < (int)tasks.size(); j++) { + auto res = tasks[j].get(); + count += res.first; + valid_count += res.second; + } } else { for (auto path : paths) { - parse_edge_file(path, idx, reverse_edge, count, valid_count); + auto res = parse_edge_file(path, idx, reverse_edge); + count += res.first; + valid_count += res.second; } } VLOG(0) << valid_count << "/" << count << " edge_type[" << edge_type << "] edges are loaded successfully"; @@ -1581,6 +1706,7 @@ int GraphTable::parse_feature(int idx, const std::string& feat_str, // "") std::vector fields = paddle::string::split_string(feat_str, feature_separator_); + auto it = feat_id_map[idx].find(fields[0]); if (it != feat_id_map[idx].end()) { int32_t id = it->second; @@ -1604,10 +1730,10 @@ int GraphTable::parse_feature(int idx, const std::string& feat_str, } else if (dtype == "int64") { FeatureNode::parse_value_to_bytes(fields.begin() + 1, fields.end(), fea_ptr); return 0; - } + } } else { - VLOG(2) << "feature_name[" << fields[0] - << "] is not in feat_id_map, ntype_id[" << idx + VLOG(2) << "feature_name[" << fields[0] + << "] is not in feat_id_map, ntype_id[" << idx << "] feat_id_map_size[" << feat_id_map.size() << "]"; } diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.h b/paddle/fluid/distributed/ps/table/common_graph_table.h index 2b542bada63be..a6bfbec34b755 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.h +++ b/paddle/fluid/distributed/ps/table/common_graph_table.h @@ -494,6 +494,11 @@ class GraphTable : public Table { const FsClientParameter &fs_config); virtual int32_t Initialize(const GraphParameter &config); int32_t Load(const std::string &path, const std::string ¶m); + + int32_t load_node_and_edge_file(std::string etype, std::string ntype, std::string epath, + std::string npath, int part_num, bool reverse); + + std::string get_inverse_etype(std::string &etype); int32_t load_edges(const std::string &path, bool reverse, const std::string &edge_type); @@ -506,11 +511,10 @@ class GraphTable : public Table { int slice_num); int get_all_feature_ids(int type, int idx, int slice_num, std::vector>* output); - int32_t load_nodes(const std::string &path, std::string node_type); - int32_t parse_edge_file(const std::string &path, int idx, bool reverse, - uint64_t &count, uint64_t &valid_count); - int32_t parse_node_file(const std::string &path, const std::string &node_type, - int idx, uint64_t &count, uint64_t &valid_count); + int32_t load_nodes(const std::string &path, std::string node_type = std::string()); + std::pair parse_edge_file(const std::string &path, int idx, bool reverse); + std::pair parse_node_file(const std::string &path, const std::string &node_type, int idx); + std::pair parse_node_file(const std::string &path); int32_t add_graph_node(int idx, std::vector &id_list, std::vector &is_weight_list); @@ -620,7 +624,8 @@ class GraphTable : public Table { std::vector> edge_shards, feature_shards; size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num; int task_pool_size_ = 24; - int load_thread_num = 150; + int load_thread_num = 160; + const int random_sample_nodes_ranges = 3; std::vector>> node_weight; diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu index db562e635c769..17b6905b8a5e4 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu @@ -141,6 +141,12 @@ void GraphGpuWrapper::load_node_file(std::string name, std::string filepath) { } } +void GraphGpuWrapper::load_node_and_edge(std::string etype, std::string ntype, std::string epath, + std::string npath, int part_num, bool reverse) { + ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table_->load_node_and_edge_file(etype, ntype, epath, npath, part_num, reverse); +} + void GraphGpuWrapper::add_table_feat_conf(std::string table_name, std::string feat_name, std::string feat_dtype, diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h index d5fdc5e5cddc4..d5527e235c0e7 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h @@ -43,6 +43,8 @@ class GraphGpuWrapper { std::string feat_dtype, int feat_shape); void load_edge_file(std::string name, std::string filepath, bool reverse); void load_node_file(std::string name, std::string filepath); + void load_node_and_edge(std::string etype, std::string ntype, std::string epath, + std::string npath, int part_num, bool reverse); int32_t load_next_partition(int idx); int32_t get_partition_num(int idx); void load_node_weight(int type_id, int idx, std::string path); diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu index ee41d8ac49260..ffaddf9965336 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu @@ -159,7 +159,7 @@ __global__ void dy_mf_update_kernel(Table* table, float* cur = (float*)(grads + i * grad_value_size); sgd.dy_mf_update_value(optimizer_config, (it.getter())->second, cur); } else { - printf("warning: push miss key: %d", keys[i]); + printf("warning: push miss key: %lu", keys[i]); } } } diff --git a/paddle/fluid/framework/io/fs.cc b/paddle/fluid/framework/io/fs.cc index b8aca886e7d60..3dfc35d149af9 100644 --- a/paddle/fluid/framework/io/fs.cc +++ b/paddle/fluid/framework/io/fs.cc @@ -155,7 +155,7 @@ std::vector localfs_list(const std::string& path) { std::shared_ptr pipe; int err_no = 0; pipe = shell_popen( - string::format_string("find %s -type f -maxdepth 1", path.c_str()), "r", + string::format_string("find %s -type f -maxdepth 1 | sort", path.c_str()), "r", &err_no); string::LineFileReader reader; std::vector list; diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 76a9b16a75d6d..27068183c9c02 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -349,6 +349,7 @@ void BindGraphGpuWrapper(py::module* m) { .def("query_node_list", &GraphGpuWrapper::query_node_list) .def("add_table_feat_conf", &GraphGpuWrapper::add_table_feat_conf) .def("load_edge_file", &GraphGpuWrapper::load_edge_file) + .def("load_node_and_edge", &GraphGpuWrapper::load_node_and_edge) .def("upload_batch", py::overload_cast>&>( &GraphGpuWrapper::upload_batch))