Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph partition #42472

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
422 changes: 400 additions & 22 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc

Large diffs are not rendered by default.

34 changes: 32 additions & 2 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ class GraphShard {
}
return res;
}

std::vector<int64_t> get_all_id() {
std::vector<int64_t> res;
for (int i = 0; i < (int)bucket.size(); i++) {
res.push_back(bucket[i]->get_id());
}
return res;
}
GraphNode *add_graph_node(int64_t id);
GraphNode *add_graph_node(Node *node);
FeatureNode *add_feature_node(int64_t id);
Expand Down Expand Up @@ -420,6 +426,10 @@ class GraphTable : public Table {
use_cache = false;
shard_num = 0;
rw_lock.reset(new pthread_rwlock_t());
#ifdef PADDLE_WITH_HETERPS
next_partition = 0;
total_memory_cost = 0;
#endif
}
virtual ~GraphTable();

Expand Down Expand Up @@ -465,6 +475,8 @@ class GraphTable : public Table {
int32_t load_edges(const std::string &path, bool reverse,
const std::string &edge_type);

std::vector<std::vector<int64_t>> get_all_id(int type, int idx,
int slice_num);
int32_t load_nodes(const std::string &path, std::string node_type);

int32_t add_graph_node(int idx, std::vector<int64_t> &id_list,
Expand Down Expand Up @@ -513,7 +525,7 @@ class GraphTable : public Table {
const std::vector<std::vector<std::string>> &res);

size_t get_server_num() { return server_num; }

void clear_graph(int idx);
virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) {
{
std::unique_lock<std::mutex> lock(mutex_);
Expand All @@ -538,15 +550,33 @@ class GraphTable : public Table {
// graph_sampler->set_graph_sample_callback(callback);
// return 0;
// }
virtual void make_partitions(int idx, int64_t gb_size, int device_len);
virtual char *random_sample_neighbor_from_ssd(
int idx, int64_t id, int sample_size,
const std::shared_ptr<std::mt19937_64> rng, int &actual_size);
virtual int32_t add_node_to_ssd(int type_id, int idx, int64_t src_id,
char *data, int len);
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
int idx, std::vector<int64_t> ids);
int32_t Load_to_ssd(const std::string &path, const std::string &param);
int64_t load_graph_to_memory_from_ssd(int idx, std::vector<int64_t> &ids);
int32_t make_complementary_graph(int idx, int64_t byte_size);
int32_t dump_edges_to_ssd(int idx);
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
std::vector<int64_t> get_partition(int idx, int index) {
if (idx >= partitions.size() || index >= partitions[idx].size())
return std::vector<int64_t>();
return partitions[idx][index];
}
int32_t load_edges_to_ssd(const std::string &path, bool reverse_edge,
const std::string &edge_type);
int32_t load_next_partition(int idx);
void set_search_level(int search_level) { this->search_level = search_level; }
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
int search_level;
int64_t total_memory_cost;
std::vector<std::vector<std::vector<int64_t>>> partitions;
int next_partition;
#endif
virtual int32_t add_comm_edge(int idx, int64_t src_id, int64_t dst_id);
virtual int32_t build_sampler(int idx, std::string sample_type = "random");
Expand Down
48 changes: 38 additions & 10 deletions paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,46 @@ namespace paddle {
namespace framework {
struct GpuPsGraphNode {
int64_t node_id;
int neighbor_size, neighbor_offset;
unsigned int neighbor_size, neighbor_offset;
// this node's neighbor is stored on [neighbor_offset,neighbor_offset +
// neighbor_size) of int64_t *neighbor_list;
};

struct GpuPsCommGraph {
int64_t *neighbor_list;
GpuPsGraphNode *node_list;
int neighbor_size, node_size;
unsigned int neighbor_size, node_size;
// the size of neighbor array and graph_node_list array
GpuPsCommGraph()
: neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {}
GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_,
int neighbor_size_, int node_size_)
unsigned int neighbor_size_, unsigned int node_size_)
: neighbor_list(neighbor_list_),
node_list(node_list_),
neighbor_size(neighbor_size_),
node_size(node_size_) {}
void init_on_cpu(unsigned int neighbor_size, unsigned int node_size) {
this->neighbor_size = neighbor_size;
this->node_size = node_size;
this->neighbor_list = new int64_t[neighbor_size];
this->node_list = new paddle::framework::GpuPsGraphNode[node_size];
}
void release_on_cpu() {
delete[] neighbor_list;
delete[] node_list;
}
void display_on_cpu() {
VLOG(0) << "neighbor_size = " << neighbor_size;
VLOG(0) << "node_size = " << node_size;
for (int i = 0; i < neighbor_size; i++) {
for (size_t i = 0; i < neighbor_size; i++) {
VLOG(0) << "neighbor " << i << " " << neighbor_list[i];
}
for (int i = 0; i < node_size; i++) {
for (size_t i = 0; i < node_size; i++) {
VLOG(0) << "node i " << node_list[i].node_id
<< " neighbor_size = " << node_list[i].neighbor_size;
std::string str;
int offset = node_list[i].neighbor_offset;
for (int j = 0; j < node_list[i].neighbor_size; j++) {
for (size_t j = 0; j < node_list[i].neighbor_size; j++) {
if (j > 0) str += ",";
str += std::to_string(neighbor_list[j + offset]);
}
Expand Down Expand Up @@ -139,12 +149,18 @@ struct NeighborSampleQuery {
};
struct NeighborSampleResult {
int64_t *val;
int64_t *actual_val;
int *actual_sample_size, sample_size, key_size;
int total_sample_size;
std::shared_ptr<memory::Allocation> val_mem, actual_sample_size_mem;
std::shared_ptr<memory::Allocation> actual_val_mem;
int64_t *get_val() { return val; }
int64_t get_actual_val() { return (int64_t)actual_val; }
int *get_actual_sample_size() { return actual_sample_size; }
int get_sample_size() { return sample_size; }
int get_key_size() { return key_size; }
void set_total_sample_size(int s) { total_sample_size = s; }
int get_len() { return total_sample_size; }
void initialize(int _sample_size, int _key_size, int dev_id) {
sample_size = _sample_size;
key_size = _key_size;
Expand All @@ -165,18 +181,30 @@ struct NeighborSampleResult {
int *ac_size = new int[key_size];
cudaMemcpy(ac_size, actual_sample_size, key_size * sizeof(int),
cudaMemcpyDeviceToHost); // 3, 1, 3
int total_sample_size = 0;
for (int i = 0; i < key_size; i++) {
total_sample_size += ac_size[i];
}
int64_t *res2 = new int64_t[total_sample_size]; // r
cudaMemcpy(res2, actual_val, total_sample_size * sizeof(int64_t),
cudaMemcpyDeviceToHost); // r

int start = 0;
for (int i = 0; i < key_size; i++) {
VLOG(0) << "actual sample size for " << i << "th key is " << ac_size[i];
VLOG(0) << "sampled neighbors are ";
std::string neighbor;
std::string neighbor, neighbor2;
for (int j = 0; j < ac_size[i]; j++) {
if (neighbor.size() > 0) neighbor += ";";
neighbor += std::to_string(res[i * sample_size + j]);
// if (neighbor.size() > 0) neighbor += ";";
if (neighbor2.size() > 0) neighbor2 += ";"; // r
// neighbor += std::to_string(res[i * sample_size + j]);
neighbor2 += std::to_string(res2[start + j]); // r
}
VLOG(0) << neighbor;
VLOG(0) << neighbor << " " << neighbor2;
start += ac_size[i]; // r
}
delete[] res;
delete[] res2; // r
delete[] ac_size;
VLOG(0) << " ------------------";
}
Expand Down
11 changes: 9 additions & 2 deletions paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> {
public:
GpuPsGraphTable(std::shared_ptr<HeterPsResource> resource, int topo_aware)
: HeterComm<int64_t, int, int>(1, resource) {
: HeterComm<int64_t, unsigned int, int>(1, resource) {
load_factor_ = 0.25;
rw_lock.reset(new pthread_rwlock_t());
gpu_num = resource_->total_device();
for (int i = 0; i < gpu_num; i++) {
gpu_graph_list.push_back(GpuPsCommGraph());
sample_status.push_back(NULL);
tables_.push_back(NULL);
}
cpu_table_status = -1;
if (topo_aware) {
int total_gpu = resource_->total_device();
Expand Down Expand Up @@ -82,6 +87,8 @@ class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
// end_graph_sampling();
// }
}
void build_graph_on_single_gpu(GpuPsCommGraph &g, int gpu_id);
void clear_graph_info(int gpu_id);
void build_graph_from_cpu(std::vector<GpuPsCommGraph> &cpu_node_list);
NodeQueryResult graph_node_sample(int gpu_id, int sample_size);
NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q,
Expand Down
Loading