Skip to content

Commit

Permalink
change sample result's structure to fit training (PaddlePaddle#42426)
Browse files Browse the repository at this point in the history
* enable graph-engine to return all id

* change vector's dimension

* change vector's dimension

* enlarge returned ids dimensions

* add actual_val

* change vlog

* fix bug

* bug fix

* bug fix

* fix display test

* singleton of gpu_graph_wrapper

* change sample result's structure to fit training

* recover sample code

* fix

* secondary sample

* add graph partition

* fix pybind

Co-authored-by: DesmonDay <[email protected]>
  • Loading branch information
seemingwang and DesmonDay committed May 4, 2022
1 parent 878c38f commit b35c82d
Show file tree
Hide file tree
Showing 11 changed files with 781 additions and 95 deletions.
401 changes: 379 additions & 22 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ class GraphTable : public Table {
use_cache = false;
shard_num = 0;
rw_lock.reset(new pthread_rwlock_t());
#ifdef PADDLE_WITH_HETERPS
next_partition = 0;
total_memory_cost = 0;
#endif
}
virtual ~GraphTable();

Expand Down Expand Up @@ -521,7 +525,7 @@ class GraphTable : public Table {
const std::vector<std::vector<std::string>> &res);

size_t get_server_num() { return server_num; }

void clear_graph(int idx);
virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) {
{
std::unique_lock<std::mutex> lock(mutex_);
Expand All @@ -546,15 +550,33 @@ class GraphTable : public Table {
// graph_sampler->set_graph_sample_callback(callback);
// return 0;
// }
virtual void make_partitions(int idx, int64_t gb_size, int device_len);
virtual char *random_sample_neighbor_from_ssd(
int idx, int64_t id, int sample_size,
const std::shared_ptr<std::mt19937_64> rng, int &actual_size);
virtual int32_t add_node_to_ssd(int type_id, int idx, int64_t src_id,
char *data, int len);
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
int idx, std::vector<int64_t> ids);
int32_t Load_to_ssd(const std::string &path, const std::string &param);
int64_t load_graph_to_memory_from_ssd(int idx, std::vector<int64_t> &ids);
int32_t make_complementary_graph(int idx, int64_t byte_size);
int32_t dump_edges_to_ssd(int idx);
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
std::vector<int64_t> get_partition(int idx, int index) {
if (idx >= partitions.size() || index >= partitions[idx].size())
return std::vector<int64_t>();
return partitions[idx][index];
}
int32_t load_edges_to_ssd(const std::string &path, bool reverse_edge,
const std::string &edge_type);
int32_t load_next_partition(int idx);
void set_search_level(int search_level) { this->search_level = search_level; }
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
int search_level;
int64_t total_memory_cost;
std::vector<std::vector<std::vector<int64_t>>> partitions;
int next_partition;
#endif
virtual int32_t add_comm_edge(int idx, int64_t src_id, int64_t dst_id);
virtual int32_t build_sampler(int idx, std::string sample_type = "random");
Expand Down
48 changes: 38 additions & 10 deletions paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,46 @@ namespace paddle {
namespace framework {
struct GpuPsGraphNode {
int64_t node_id;
int neighbor_size, neighbor_offset;
unsigned int neighbor_size, neighbor_offset;
// this node's neighbor is stored on [neighbor_offset,neighbor_offset +
// neighbor_size) of int64_t *neighbor_list;
};

struct GpuPsCommGraph {
int64_t *neighbor_list;
GpuPsGraphNode *node_list;
int neighbor_size, node_size;
unsigned int neighbor_size, node_size;
// the size of neighbor array and graph_node_list array
GpuPsCommGraph()
: neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {}
GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_,
int neighbor_size_, int node_size_)
unsigned int neighbor_size_, unsigned int node_size_)
: neighbor_list(neighbor_list_),
node_list(node_list_),
neighbor_size(neighbor_size_),
node_size(node_size_) {}
void init_on_cpu(unsigned int neighbor_size, unsigned int node_size) {
this->neighbor_size = neighbor_size;
this->node_size = node_size;
this->neighbor_list = new int64_t[neighbor_size];
this->node_list = new paddle::framework::GpuPsGraphNode[node_size];
}
void release_on_cpu() {
delete[] neighbor_list;
delete[] node_list;
}
void display_on_cpu() {
VLOG(0) << "neighbor_size = " << neighbor_size;
VLOG(0) << "node_size = " << node_size;
for (int i = 0; i < neighbor_size; i++) {
for (size_t i = 0; i < neighbor_size; i++) {
VLOG(0) << "neighbor " << i << " " << neighbor_list[i];
}
for (int i = 0; i < node_size; i++) {
for (size_t i = 0; i < node_size; i++) {
VLOG(0) << "node i " << node_list[i].node_id
<< " neighbor_size = " << node_list[i].neighbor_size;
std::string str;
int offset = node_list[i].neighbor_offset;
for (int j = 0; j < node_list[i].neighbor_size; j++) {
for (size_t j = 0; j < node_list[i].neighbor_size; j++) {
if (j > 0) str += ",";
str += std::to_string(neighbor_list[j + offset]);
}
Expand Down Expand Up @@ -139,12 +149,18 @@ struct NeighborSampleQuery {
};
struct NeighborSampleResult {
int64_t *val;
int64_t *actual_val;
int *actual_sample_size, sample_size, key_size;
int total_sample_size;
std::shared_ptr<memory::Allocation> val_mem, actual_sample_size_mem;
std::shared_ptr<memory::Allocation> actual_val_mem;
int64_t *get_val() { return val; }
int64_t get_actual_val() { return (int64_t)actual_val; }
int *get_actual_sample_size() { return actual_sample_size; }
int get_sample_size() { return sample_size; }
int get_key_size() { return key_size; }
void set_total_sample_size(int s) { total_sample_size = s; }
int get_len() { return total_sample_size; }
void initialize(int _sample_size, int _key_size, int dev_id) {
sample_size = _sample_size;
key_size = _key_size;
Expand All @@ -165,18 +181,30 @@ struct NeighborSampleResult {
int *ac_size = new int[key_size];
cudaMemcpy(ac_size, actual_sample_size, key_size * sizeof(int),
cudaMemcpyDeviceToHost); // 3, 1, 3
int total_sample_size = 0;
for (int i = 0; i < key_size; i++) {
total_sample_size += ac_size[i];
}
int64_t *res2 = new int64_t[total_sample_size]; // r
cudaMemcpy(res2, actual_val, total_sample_size * sizeof(int64_t),
cudaMemcpyDeviceToHost); // r

int start = 0;
for (int i = 0; i < key_size; i++) {
VLOG(0) << "actual sample size for " << i << "th key is " << ac_size[i];
VLOG(0) << "sampled neighbors are ";
std::string neighbor;
std::string neighbor, neighbor2;
for (int j = 0; j < ac_size[i]; j++) {
if (neighbor.size() > 0) neighbor += ";";
neighbor += std::to_string(res[i * sample_size + j]);
// if (neighbor.size() > 0) neighbor += ";";
if (neighbor2.size() > 0) neighbor2 += ";"; // r
// neighbor += std::to_string(res[i * sample_size + j]);
neighbor2 += std::to_string(res2[start + j]); // r
}
VLOG(0) << neighbor;
VLOG(0) << neighbor << " " << neighbor2;
start += ac_size[i]; // r
}
delete[] res;
delete[] res2; // r
delete[] ac_size;
VLOG(0) << " ------------------";
}
Expand Down
11 changes: 9 additions & 2 deletions paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> {
public:
GpuPsGraphTable(std::shared_ptr<HeterPsResource> resource, int topo_aware)
: HeterComm<int64_t, int, int>(1, resource) {
: HeterComm<int64_t, unsigned int, int>(1, resource) {
load_factor_ = 0.25;
rw_lock.reset(new pthread_rwlock_t());
gpu_num = resource_->total_device();
for (int i = 0; i < gpu_num; i++) {
gpu_graph_list.push_back(GpuPsCommGraph());
sample_status.push_back(NULL);
tables_.push_back(NULL);
}
cpu_table_status = -1;
if (topo_aware) {
int total_gpu = resource_->total_device();
Expand Down Expand Up @@ -82,6 +87,8 @@ class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
// end_graph_sampling();
// }
}
void build_graph_on_single_gpu(GpuPsCommGraph &g, int gpu_id);
void clear_graph_info(int gpu_id);
void build_graph_from_cpu(std::vector<GpuPsCommGraph> &cpu_node_list);
NodeQueryResult graph_node_sample(int gpu_id, int sample_size);
NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q,
Expand Down
Loading

0 comments on commit b35c82d

Please sign in to comment.