Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 79 additions & 92 deletions ggml/src/ggml-backend-meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,18 +899,20 @@ static enum ggml_status ggml_backend_meta_graph_compute(ggml_backend_t backend,
if (max_tmp_size > backend_ctx->max_tmp_size) {
for (size_t j = 0; j < n_backends; j++) {
auto & bcj = backend_ctx->backend_configs[j];
bcj.buf.reset(ggml_backend_alloc_buffer(bcj.backend, max_tmp_size + n_backends*n_backends*sizeof(float)));
bcj.buf.reset(ggml_backend_alloc_buffer(bcj.backend, max_tmp_size));
}
backend_ctx->max_tmp_size = max_tmp_size;
}


if (max_nnodes_raised || n_subgraphs > backend_ctx->max_subgraphs) {
backend_ctx->max_subgraphs = std::max(backend_ctx->max_subgraphs, n_subgraphs);
const size_t n_nodes_red = 2 + 2*n_backends-1;
const size_t n_reduce_steps = backend_ctx->n_reduce_steps();
const size_t n_nodes_per_device = 2 * n_reduce_steps; // tmp + ADD per step
const size_t n_cgraphs_per_device = n_reduce_steps; // 1 ADD graph per step
const size_t mem_per_device_graphs_main = backend_ctx->max_subgraphs*ggml_graph_overhead_custom(backend_ctx->max_nnodes, cgraph->grads);
const size_t mem_per_device_graphs_aux = backend_ctx->max_subgraphs*ggml_graph_overhead_custom(n_nodes_red, cgraph->grads);
const size_t mem_per_device_nodes_aux = backend_ctx->max_subgraphs*n_nodes_red*ggml_tensor_overhead();
const size_t mem_per_device_graphs_aux = n_cgraphs_per_device*backend_ctx->max_subgraphs*ggml_graph_overhead_custom(1, cgraph->grads);
const size_t mem_per_device_nodes_aux = n_nodes_per_device*backend_ctx->max_subgraphs*ggml_tensor_overhead();
ggml_init_params params = {
/*.mem_size =*/ n_backends * (mem_per_device_graphs_main + mem_per_device_graphs_aux + mem_per_device_nodes_aux),
/*.mem_buffer =*/ nullptr,
Expand All @@ -923,11 +925,11 @@ static enum ggml_status ggml_backend_meta_graph_compute(ggml_backend_t backend,
bcj.cgraphs[i].cgraph_main = ggml_new_graph_custom(backend_ctx->ctx.get(), cgraph->n_nodes, /*grads =*/ false);
}
}
backend_ctx->cgraphs_aux.resize(n_backends*backend_ctx->max_subgraphs);
backend_ctx->cgraphs_aux.resize(n_backends*n_cgraphs_per_device*backend_ctx->max_subgraphs);
for (size_t k = 0; k < backend_ctx->cgraphs_aux.size(); k++) {
backend_ctx->cgraphs_aux[k] = ggml_new_graph_custom(backend_ctx->ctx.get(), n_nodes_red, cgraph->grads);
backend_ctx->cgraphs_aux[k] = ggml_new_graph_custom(backend_ctx->ctx.get(), 1, cgraph->grads);
}
backend_ctx->nodes_aux.resize(n_backends*backend_ctx->max_subgraphs*n_nodes_red);
backend_ctx->nodes_aux.resize(n_backends*n_nodes_per_device*backend_ctx->max_subgraphs);
for (size_t k = 0; k < backend_ctx->nodes_aux.size(); k++) {
backend_ctx->nodes_aux[k] = ggml_new_tensor_1d(backend_ctx->ctx.get(), GGML_TYPE_F32, 1);
}
Expand Down Expand Up @@ -973,95 +975,80 @@ static enum ggml_status ggml_backend_meta_graph_compute(ggml_backend_t backend,

// Preferentially use backend-specific allreduce_tensor_async (e.g. NCCL for CUDA), use a generic fallback if unavailable:
auto allreduce_fallback = [&](size_t i) -> ggml_status {
const int64_t ne = ggml_nelements(
backend_ctx->backend_configs[0].cgraphs[i].cgraph_main->nodes[backend_ctx->backend_configs[0].cgraphs[i].cgraph_main->n_nodes - 1]);
std::vector<int64_t> ne_bounds;
ne_bounds.reserve(n_backends + 1);
for (size_t j = 0; j < n_backends + 1; j++) {
ne_bounds.push_back(ne * j/n_backends);
}
std::vector<ggml_tensor *> dst_views;
dst_views.reserve(n_backends*n_backends);
std::vector<ggml_tensor *> src_views;
src_views.reserve(n_backends*n_backends);
for (size_t j_dst = 0; j_dst < n_backends; j_dst++) {
const int64_t ne_low = ne_bounds[j_dst];
const int64_t ne_high = ne_bounds[j_dst + 1];
const int64_t ne_diff = ne_high - ne_low;

auto & bcj_dst = backend_ctx->backend_configs[j_dst];
ggml_tensor * node_dst = bcj_dst.cgraphs[i].cgraph_main->nodes[bcj_dst.cgraphs[i].cgraph_main->n_nodes-1];
GGML_ASSERT(ggml_is_contiguously_allocated(node_dst));

for (size_t j_src_0 = 0; j_src_0 < n_backends; j_src_0++) {
// Backends need to copy to themselves first to ensure correct synchronization:
const size_t j_src = j_src_0 == 0 ? j_dst : (j_src_0 - (j_src_0 <= j_dst ? 1 : 0));

auto & bcj_src = backend_ctx->backend_configs[j_src];
ggml_tensor * node_src = bcj_src.cgraphs[i].cgraph_main->nodes[bcj_src.cgraphs[i].cgraph_main->n_nodes-1];

ggml_tensor * view_src = get_node_aux(node_src);
view_src->ne[0] = ne_diff;
for (size_t k = 1; k < GGML_MAX_DIMS; k++) {
view_src->ne[k] = 1;
}
view_src->nb[0] = ggml_element_size(view_src);
for (size_t k = 1; k < GGML_MAX_DIMS; k++) {
view_src->nb[k] = view_src->ne[k - 1] * view_src->nb[k - 1];
}
view_src->data = (char *) node_src->data + ggml_row_size(view_src->type, ne_low);
view_src->buffer = node_src->buffer;
src_views.push_back(view_src);

ggml_tensor * view_dst = get_node_aux(node_dst);
view_dst->ne[0] = ne_diff;
for (size_t k = 1; k < GGML_MAX_DIMS; k++) {
view_dst->ne[k] = 1;
}
view_dst->nb[0] = ggml_element_size(view_dst);
for (size_t k = 1; k < GGML_MAX_DIMS; k++) {
view_dst->nb[k] = view_dst->ne[k - 1] * view_dst->nb[k - 1];
}
view_dst->data = (char *) ggml_backend_buffer_get_base(bcj_dst.buf.get()) + ggml_row_size(view_dst->type, j_src_0*ne_diff);
view_dst->buffer = node_dst->buffer;
dst_views.push_back(view_dst);

ggml_backend_tensor_copy_async(bcj_src.backend, bcj_dst.backend, view_src, view_dst);
}
}
for (size_t j_dst = 0; j_dst < n_backends; j_dst++) {
auto & bcj_dst = backend_ctx->backend_configs[j_dst];
const size_t ina_0 = ina;
for (size_t j_src_0 = 1; j_src_0 < n_backends; j_src_0++) {
ggml_tensor * add = get_node_aux(dst_views[j_dst*n_backends]);
add->op = GGML_OP_ADD;
add->src[0] = dst_views[j_dst*n_backends];
add->src[1] = dst_views[j_dst*n_backends + j_src_0];
add->flags |= GGML_TENSOR_FLAG_COMPUTE;

add->data = dst_views[j_dst*n_backends]->data;
add->view_src = dst_views[j_dst*n_backends];
add->buffer = dst_views[j_dst*n_backends]->buffer;
}
ggml_cgraph * cgraph_aux = get_cgraph_aux();
cgraph_aux->nodes = backend_ctx->nodes_aux.data() + ina_0;
cgraph_aux->n_nodes = ina - ina_0;
std::vector<ggml_cgraph *> step_cgraphs(n_backends, nullptr);

const ggml_status status = ggml_backend_graph_compute_async(bcj_dst.backend, cgraph_aux);
if (status != GGML_STATUS_SUCCESS) {
return status;
}
}
for (size_t offset_j = 1; offset_j < n_backends; offset_j *= 2) {
std::fill(step_cgraphs.begin(), step_cgraphs.end(), nullptr);

// Write back the reduced data, "dst" and "src" here still refer to the original copy direction.
for (size_t j_dst = 0; j_dst < n_backends; j_dst++) {
auto & bcj_dst = backend_ctx->backend_configs[j_dst];
for (size_t j = 0; j < n_backends; j++) {
const size_t j_other = j ^ offset_j;
if (j_other > j) {
continue;
}

for (size_t j_src_0 = 0; j_src_0 < n_backends; j_src_0++) {
const size_t j_src = j_src_0 == 0 ? j_dst : (j_src_0 - (j_src_0 <= j_dst ? 1 : 0));
auto & bcj_src = backend_ctx->backend_configs[j_src];
auto & bcj1 = backend_ctx->backend_configs[j];
auto & bcj2 = backend_ctx->backend_configs[j_other];

ggml_tensor * node1 = bcj1.cgraphs[i].cgraph_main->nodes[bcj1.cgraphs[i].cgraph_main->n_nodes - 1];
ggml_tensor * node2 = bcj2.cgraphs[i].cgraph_main->nodes[bcj2.cgraphs[i].cgraph_main->n_nodes - 1];
GGML_ASSERT(ggml_is_contiguous(node1));
GGML_ASSERT(ggml_is_contiguous(node2));

// Tmp tensors to receive P2P copies
ggml_tensor * node_tmp_1 = get_node_aux(node1);
node_tmp_1->buffer = bcj1.buf.get();
node_tmp_1->data = ggml_backend_buffer_get_base(bcj1.buf.get());

ggml_tensor * node_tmp_2 = get_node_aux(node2);
node_tmp_2->buffer = bcj2.buf.get();
node_tmp_2->data = ggml_backend_buffer_get_base(bcj2.buf.get());

// 2 P2P copies: exchange full buffers
ggml_backend_tensor_copy_async(bcj1.backend, bcj2.backend, node1, node_tmp_2);
ggml_backend_tensor_copy_async(bcj2.backend, bcj1.backend, node2, node_tmp_1);

// Local ADD: node1 += tmp1 (in-place via view)
ggml_tensor * node_red_1 = get_node_aux(node1);
node_red_1->view_src = node1->view_src == nullptr ? node1 : node1->view_src;
node_red_1->view_offs = node1->view_offs;
node_red_1->op = GGML_OP_ADD;
node_red_1->src[0] = node1;
node_red_1->src[1] = node_tmp_1;
node_red_1->flags |= GGML_TENSOR_FLAG_COMPUTE;
ggml_backend_view_init(node_red_1);

// Local ADD: node2 += tmp2 (in-place via view)
ggml_tensor * node_red_2 = get_node_aux(node2);
node_red_2->view_src = node2->view_src == nullptr ? node2 : node2->view_src;
node_red_2->view_offs = node2->view_offs;
node_red_2->op = GGML_OP_ADD;
node_red_2->src[0] = node2;
node_red_2->src[1] = node_tmp_2;
node_red_2->flags |= GGML_TENSOR_FLAG_COMPUTE;
ggml_backend_view_init(node_red_2);

// Build 1-node cgraphs for the ADD ops
ggml_cgraph * cgraph_aux_1 = get_cgraph_aux();
cgraph_aux_1->nodes[0] = node_red_1;
cgraph_aux_1->n_nodes = 1;
step_cgraphs[j] = cgraph_aux_1;

ggml_cgraph * cgraph_aux_2 = get_cgraph_aux();
cgraph_aux_2->nodes[0] = node_red_2;
cgraph_aux_2->n_nodes = 1;
step_cgraphs[j_other] = cgraph_aux_2;
}

ggml_backend_tensor_copy_async(bcj_dst.backend, bcj_src.backend, dst_views[j_dst*n_backends], src_views[j_dst*n_backends + j_src_0]);
// Execute local ADDs for this step
for (size_t j = 0; j < n_backends; j++) {
if (step_cgraphs[j] == nullptr) {
continue;
}
auto & bcj = backend_ctx->backend_configs[j];
const ggml_status status = ggml_backend_graph_compute_async(bcj.backend, step_cgraphs[j]);
if (status != GGML_STATUS_SUCCESS) {
return status;
}
}
}
return GGML_STATUS_SUCCESS;
Expand Down