Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Simple Loading for Direct Graph #1

Merged
merged 4 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions paddle/fluid/distributed/service/graph_py_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ std::vector<std::string> GraphPyService::split(std::string &str,
}

void GraphPyService::set_up(std::string ips_str, int shard_num, int rank,
int client_id, uint32_t table_id) {
int client_id, std::vector<std::string> edge_types) {
set_shard_num(shard_num);
set_client_Id(client_id);
set_rank(rank);
this->table_id = table_id;

this -> table_id_map[std::string("")] = 0;
// Table 0 are for nodes
for(size_t table_id = 0; table_id < edge_types.size(); table_id ++ ) {
this -> table_id_map[edge_types[table_id]] = int(table_id + 1);
}
server_thread = client_thread = NULL;
std::istringstream stream(ips_str);
std::string ip;
Expand All @@ -47,10 +52,10 @@ void GraphPyService::set_up(std::string ips_str, int shard_num, int rank,
host_sign_list.push_back(ph_host.serialize_to_string());
index++;
}
VLOG(0) << "IN set up rank = " << rank;
//VLOG(0) << "IN set up rank = " << rank;
start_client();
start_server(server_list[rank], std::stoul(port_list[rank]));
sleep(1);
}
}
}
}
78 changes: 55 additions & 23 deletions paddle/fluid/distributed/service/graph_py_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <thread> // NOLINT
#include <vector>
#include <unordered_map>
#include "google/protobuf/text_format.h"

#include "gtest/gtest.h"
Expand All @@ -46,7 +47,7 @@ class GraphPyService {
std::vector<int> keys;
std::vector<std::string> server_list, port_list, host_sign_list;
int server_size, shard_num, rank, client_id;
uint32_t table_id;
std::unordered_map<std::string, uint32_t > table_id_map;
std::thread *server_thread, *client_thread;

std::shared_ptr<paddle::distributed::PSServer> pserver_ptr;
Expand All @@ -67,7 +68,7 @@ class GraphPyService {
int get_shard_num() { return shard_num; }
void set_shard_num(int shard_num) { this->shard_num = shard_num; }
void GetDownpourSparseTableProto(
::paddle::distributed::TableParameter* sparse_table_proto) {
::paddle::distributed::TableParameter* sparse_table_proto, uint32_t table_id) {
sparse_table_proto->set_table_id(table_id);
sparse_table_proto->set_table_class("GraphTable");
sparse_table_proto->set_shard_num(shard_num);
Expand Down Expand Up @@ -96,10 +97,14 @@ class GraphPyService {
server_service_proto->set_start_server_port(0);
server_service_proto->set_server_thread_num(12);

::paddle::distributed::TableParameter* sparse_table_proto =
downpour_server_proto->add_downpour_table_param();
GetDownpourSparseTableProto(sparse_table_proto);
for(auto& tuple : this -> table_id_map) {
::paddle::distributed::TableParameter* sparse_table_proto =
downpour_server_proto->add_downpour_table_param();
GetDownpourSparseTableProto(sparse_table_proto, tuple.second);
}

return server_fleet_desc;

}

::paddle::distributed::PSParameter GetWorkerProto() {
Expand All @@ -111,9 +116,11 @@ class GraphPyService {
::paddle::distributed::DownpourWorkerParameter* downpour_worker_proto =
worker_proto->mutable_downpour_worker_param();

::paddle::distributed::TableParameter* worker_sparse_table_proto =
downpour_worker_proto->add_downpour_table_param();
GetDownpourSparseTableProto(worker_sparse_table_proto);
for(auto& tuple : this -> table_id_map) {
::paddle::distributed::TableParameter* worker_sparse_table_proto =
downpour_worker_proto->add_downpour_table_param();
GetDownpourSparseTableProto(worker_sparse_table_proto, tuple.second);
}

::paddle::distributed::ServerParameter* server_proto =
worker_fleet_desc.mutable_server_param();
Expand All @@ -127,34 +134,59 @@ class GraphPyService {
server_service_proto->set_start_server_port(0);
server_service_proto->set_server_thread_num(12);

::paddle::distributed::TableParameter* server_sparse_table_proto =
downpour_server_proto->add_downpour_table_param();
GetDownpourSparseTableProto(server_sparse_table_proto);
for(auto& tuple : this -> table_id_map) {
::paddle::distributed::TableParameter* sparse_table_proto =
downpour_server_proto->add_downpour_table_param();
GetDownpourSparseTableProto(sparse_table_proto, tuple.second);
}

return worker_fleet_desc;
}
void set_server_size(int server_size) { this->server_size = server_size; }
int get_server_size(int server_size) { return server_size; }
std::vector<std::string> split(std::string& str, const char pattern);

void load_file(std::string filepath) {
auto status =
get_ps_client()->load(table_id, std::string(filepath), std::string(""));
status.wait();
void load_edge_file(std::string name, std::string filepath, bool reverse) {
std::string params = "edge";
if(reverse) {
params += "|reverse";
}
if (this -> table_id_map.count(name)) {
uint32_t table_id = this -> table_id_map[name];
auto status =
get_ps_client()->load(table_id, std::string(filepath), params);
status.wait();
}
}

void load_node_file(std::string name, std::string filepath) {
std::string params = "node";
if (this -> table_id_map.count(name)) {
uint32_t table_id = this -> table_id_map[name];
auto status =
get_ps_client()->load(table_id, std::string(filepath), params);
status.wait();
}
}

std::vector<GraphNode> sample_k(uint64_t node_id, int sample_size) {
std::vector<GraphNode> sample_k(std::string name, uint64_t node_id, int sample_size) {
std::vector<GraphNode> v;
auto status = worker_ptr->sample(table_id, node_id, sample_size, v);
status.wait();
if (this -> table_id_map.count(name)) {
uint32_t table_id = this -> table_id_map[name];
auto status = worker_ptr->sample(table_id, node_id, sample_size, v);
status.wait();
}
return v;
}
std::vector<GraphNode> pull_graph_list(int server_index, int start,
std::vector<GraphNode> pull_graph_list(std::string name, int server_index, int start,
int size) {
std::vector<GraphNode> res;
auto status =
worker_ptr->pull_graph_list(table_id, server_index, start, size, res);
status.wait();
if (this -> table_id_map.count(name)) {
uint32_t table_id = this -> table_id_map[name];
auto status =
worker_ptr->pull_graph_list(table_id, server_index, start, size, res);
status.wait();
}
return res;
}
void start_server(std::string ip, uint32_t port) {
Expand Down Expand Up @@ -197,7 +229,7 @@ class GraphPyService {
worker_ptr->configure(worker_proto, dense_regions, _ps_env, client_id);
}
void set_up(std::string ips_str, int shard_num, int rank, int client_id,
uint32_t table_id);
std::vector<std::string> edge_types);
void set_keys(std::vector<int> keys) { // just for test
this->keys = keys;
}
Expand Down
73 changes: 45 additions & 28 deletions paddle/fluid/distributed/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
#include "paddle/fluid/distributed/table/common_graph_table.h"
#include <algorithm>
#include <sstream>
#include <time.h>
#include <set>
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {

int GraphShard::bucket_low_bound = 11;

std::vector<GraphNode *> GraphShard::get_batch(int start, int total_size) {
if (start < 0) start = 0;
int size = 0, cur_size;
Expand Down Expand Up @@ -51,68 +55,81 @@ std::vector<GraphNode *> GraphShard::get_batch(int start, int total_size) {
}
return res;
}

size_t GraphShard::get_size() {
size_t res = 0;
for (int i = 0; i < bucket_size; i++) {
res += bucket[i].size();
}
return res;
}

std::list<GraphNode *>::iterator GraphShard::add_node(GraphNode *node) {
if (node_location.find(node->get_id()) != node_location.end())
return node_location.find(node->get_id())->second;

int index = node->get_id() % shard_num % bucket_size;

std::list<GraphNode *>::iterator iter =
bucket[index].insert(bucket[index].end(), node);

node_location[node->get_id()] = iter;
return iter;
}

void GraphShard::add_neighboor(uint64_t id, GraphEdge *edge) {
(*add_node(new GraphNode(id, std::string(""))))->add_edge(edge);
}

GraphNode *GraphShard::find_node(uint64_t id) {
if (node_location.find(id) == node_location.end()) return NULL;
return *(node_location[id]);
}

int32_t GraphTable::load(const std::string &path, const std::string &param) {
auto cmd = paddle::string::split_string<std::string>(param, "|");
std::set<std::string> cmd_set(cmd.begin(), cmd.end());
bool load_edge = cmd_set.count(std::string("edge"));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_edge这个变量好像后续没用到

bool reverse_edge = cmd_set.count(std::string("reverse"));
VLOG(0) << "Reverse Edge " << reverse_edge;

auto paths = paddle::string::split_string<std::string>(path, ";");
VLOG(0) << paths.size();
int count = 0;

for (auto path : paths) {
std::ifstream file(path);
std::string line;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
count ++;
if (values.size() < 2) continue;
auto id = std::stoull(values[0]);
size_t shard_id = id % shard_num;
if (shard_id >= shard_end || shard_id < shard_start) {
VLOG(0) << "will not load " << id << " from " << path
auto src_id = std::stoull(values[0]);
auto dst_id = std::stoull(values[1]);
if(reverse_edge) {
std::swap(src_id, dst_id);
}
double weight = 0;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

float?

if (values.size() == 3) {
weight = std::stod(values[2]);
}
size_t src_shard_id = src_id % shard_num;

if (src_shard_id >= shard_end || src_shard_id < shard_start) {
VLOG(0) << "will not load " << src_id << " from " << path
<< ", please check id distribution";
continue;

}
size_t index = shard_id - shard_start;
// GraphNodeType type = GraphNode::get_graph_node_type(values[1]);
// VLOG(0)<<"shards's size = "<<shards.size()<<" values' size =
// "<<values.size();
// VLOG(0)<<"add to index "<<index<<" table rank = "<<_shard_idx;
shards[index].add_node(new GraphNode(id, values[1]));
// VLOG(0)<<"checking added of rank "<<_shard_idx<<" shard "<<index<<"
// "<<cc->get_id();
for (size_t i = 2; i < values.size(); i++) {
auto edge_arr =
paddle::string::split_string<std::string>(values[i], ";");
if (edge_arr.size() == 2) {
// VLOG(0)<<"edge content "<<edge_arr[0]<<" "<<edge_arr[1]<<"
// "<<edge_arr[2];
auto edge_id = std::stoull(edge_arr[0]);
auto weight = std::stod(edge_arr[1]);
// VLOG(0)<<"edge_id "<<edge_id<<" weight "<<weight;
GraphEdge *edge = new GraphEdge(edge_id, weight);
shards[index].add_neighboor(id, edge);
}
}
size_t index = src_shard_id - shard_start;
GraphEdge *edge = new GraphEdge(dst_id, weight);
shards[index].add_neighboor(src_id, edge);
}
for (auto &shard : shards) {
}
VLOG(0) << "Load Finished Total Edge Count " << count;

// Build Sampler j
for (auto &shard : shards) {
auto bucket = shard.get_bucket();
for (int i = 0; i < bucket.size(); i++) {
std::list<GraphNode *>::iterator iter = bucket[i].begin();
Expand All @@ -122,7 +139,6 @@ int32_t GraphTable::load(const std::string &path, const std::string &param) {
iter++;
}
}
}
}
return 0;
}
Expand All @@ -144,6 +160,7 @@ int32_t GraphTable::random_sample(uint64_t node_id, int sample_size,
char *&buffer, int &actual_size) {
return _shards_task_pool[get_thread_pool_index(node_id)]
->enqueue([&]() -> int {

GraphNode *node = find_node(node_id);
if (node == NULL) {
actual_size = 0;
Expand Down Expand Up @@ -275,4 +292,4 @@ int32_t GraphTable::initialize() {
return 0;
}
}
};
};
11 changes: 6 additions & 5 deletions paddle/fluid/distributed/test/graph_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,15 @@ void RunBrpcPushSparse() {

distributed::GraphPyService gps1, gps2;
std::string ips_str = "127.0.0.1:4211;127.0.0.1:4212";
gps1.set_up(ips_str, 127, 0, 0, 0);
gps2.set_up(ips_str, 127, 1, 1, 0);
gps1.load_file(std::string(file_name));
std::vector<std::string> edge_types = { std::string("user2item")};
gps1.set_up(ips_str, 127, 0, 0, edge_types);
gps2.set_up(ips_str, 127, 1, 1, edge_types);
gps1.load_edge_file(std::string("user2item"), std::string(file_name), 0);
v.clear();
v = gps2.pull_graph_list(0, 1, 4);
v = gps2.pull_graph_list(std::string("user2item"), 0, 1, 4);
ASSERT_EQ(v[0].get_id(), 59);
v.clear();
v = gps2.sample_k(96, 4);
v = gps2.sample_k(std::string("user2item"), 96, 4);
ASSERT_EQ(v.size(), 3);
// to test in python,try this:
// from paddle.fluid.core import GraphPyService
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/pybind/fleet_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ void BindGraphNode(py::module* m) {
void BindGraphService(py::module* m) {
py::class_<GraphPyService>(*m, "GraphPyService")
.def(py::init<>())
.def("load_file", &GraphPyService::load_file)
.def("load_edge_file", &GraphPyService::load_edge_file)
.def("load_node_file", &GraphPyService::load_node_file)
.def("set_up", &GraphPyService::set_up)
.def("pull_graph_list", &GraphPyService::pull_graph_list)
.def("sample_k", &GraphPyService::sample_k);
Expand Down