Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_OP_COST_ESTIMATE_KEY_H
#define _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_OP_COST_ESTIMATE_KEY_H

#include "compiler/cost_estimator/op_cost_estimate_key.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"
#include "pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h"

namespace FlexFlow {

OpCostEstimateKey get_mapped_op_cost_estimate_key_for_layer(
ParallelComputationGraph const &pcg,
parallel_layer_guid_t const &layer,
MachineView const &machine_view);

} // namespace FlexFlow

#endif
18 changes: 18 additions & 0 deletions lib/compiler/include/compiler/cost_estimator/task_simulator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_TASK_SIMULATOR_H
#define _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_TASK_SIMULATOR_H

#include "compiler/cost_estimator/cost_estimator.h"
#include "compiler/machine_mapping/machine_mapping.dtg.h"
#include "pcg/machine_specification.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"

namespace FlexFlow {
float task_simulator_estimate_forward_pass_time(
ParallelComputationGraph const &pcg,
CostEstimator const &estimator,
MachineMapping const &machine_mapping,
MachineSpecification const &machine_spec);

} // namespace FlexFlow

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace = "FlexFlow"
name = "TimedComponent"
features = [
"eq",
"hash",
"fmt",
"ord"
]

includes = [
"compiler/cost_estimator/timed_dependency.dtg.h",
"compiler/cost_estimator/timed_layer.dtg.h",
]

[[values]]
type = "::FlexFlow::TimedLayer"

[[values]]
type = "::FlexFlow::TimedDependency"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace = "FlexFlow"
name = "TimedDependency"
features = [
"eq",
"ord",
"hash",
"fmt",
]

includes = [
"pcg/parallel_computation_graph/parallel_computation_graph_edge.dtg.h",
]

[[fields]]
name = "endtime"
type = "float"

[[fields]]
name = "raw_edge"
type = "::FlexFlow::ParallelComputationGraphEdge"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace = "FlexFlow"
name = "TimedLayer"
features = [
"eq",
"ord",
"hash",
"fmt",
]

includes = [
"pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h",
]

[[fields]]
name = "endtime"
type = "float"

[[fields]]
name = "layer"
type = "::FlexFlow::parallel_layer_guid_t"
28 changes: 28 additions & 0 deletions lib/compiler/include/compiler/machine_mapping/device_mapping.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

#include "compiler/machine_mapping/device_mapping.h"
#include "compiler/machine_mapping/device_mapping.dtg.h"
#include "pcg/machine_specification.h"
#include "pcg/machine_view.h"
#include "pcg/operator_task_space.dtg.h"
#include "pcg/operator_task_space.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.h"
#include "utils/containers/keys.h"
#include "utils/containers/map_values.h"

namespace FlexFlow {

DeviceMapping get_device_mapping(MachineMapping const &machine_mapping,
MachineSpecification const &machine_spec,
ParallelComputationGraph const &pcg) {
std::unordered_map<parallel_layer_guid_t, std::unordered_set<device_id_t>>
device_mapping;
for (auto const &[layer, machine_view] : machine_mapping.machine_views) {
OperatorTaskSpace op =
get_operator_task_space(pcg, layer));
device_mapping.insert(
{layer, get_device_ids(op, machine_view, machine_spec)});
}
return DeviceMapping{device_mapping};
}

} // namespace FlexFlow
17 changes: 17 additions & 0 deletions lib/compiler/include/compiler/machine_mapping/device_mapping.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef _FLEXFLOW_COMPILER_DEVICE_MAPPING_H
#define _FLEXFLOW_COMPILER_DEVICE_MAPPING_H

#include "compiler/machine_mapping/device_mapping.dtg.h"
#include "compiler/machine_mapping/machine_mapping.dtg.h"
#include "pcg/machine_specification.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"

namespace FlexFlow {

DeviceMapping get_device_mapping(MachineMapping const &machine_mapping,
MachineSpecification const &machine_spec,
ParallelComputationGraph const &pcg);

} // namespace FlexFlow

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace = "FlexFlow"
name = "DeviceMapping"
features = [
"eq",
# "ord",
"hash",
# "json",
# "rapidcheck",
"fmt",
]

includes = [
"pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h",
"pcg/device_id_t.dtg.h"
]

src_includes = [
"utils/hash/unordered_map.h",
"utils/fmt/unordered_map.h",
"utils/hash/unordered_set.h",
"utils/fmt/unordered_set.h"
]

[[fields]]
name = "raw_device_map"
type = "std::unordered_map<::FlexFlow::parallel_layer_guid_t, std::unordered_set<::FlexFlow::device_id_t>>"
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
#define _FLEXFLOW_COMPILER_MACHINE_MAPPING_H

#include "compiler/machine_mapping/machine_mapping.dtg.h"
#include "pcg/device_id_t.dtg.h"
#include "pcg/machine_specification.dtg.h"
#include "pcg/operator_task_space.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"

namespace FlexFlow {

Expand Down
4 changes: 4 additions & 0 deletions lib/compiler/src/compiler/allowed_machine_views.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ namespace FlexFlow {
bool is_valid_machine_view(MachineView const &mv,
OperatorTaskSpace const &task,
MachineSpecification const &ms) {
if (num_dims(mv) != num_dims(task)) {
return false;
}

std::optional<MachineSpaceCoordinate> maximum_device_coord =
get_machine_space_coordinate(
task, mv, get_task_space_maximum_coordinate(task), ms);
Expand Down
14 changes: 14 additions & 0 deletions lib/compiler/src/compiler/cost_estimator/op_cost_estimate_key.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "compiler/cost_estimator/op_cost_estimate_key.h"
#include "compiler/machine_mapping/machine_mapping_problem_tree/unmapped_op_cost_estimate_key.h"

namespace FlexFlow {

OpCostEstimateKey get_mapped_op_cost_estimate_key_for_layer(
ParallelComputationGraph const &pcg,
parallel_layer_guid_t const &layer,
MachineView const &machine_view) {
return map_unmapped_op_cost_estimate_key(
get_unmapped_op_cost_estimate_key_for_layer(pcg, layer), machine_view);
}

} // namespace FlexFlow
181 changes: 181 additions & 0 deletions lib/compiler/src/compiler/cost_estimator/task_simulator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#include "compiler/cost_estimator/task_simulator.h"
#include "compiler/cost_estimator/cost_estimator.h"
#include "compiler/cost_estimator/op_cost_estimate_key.h"
#include "compiler/cost_estimator/single_tensor_movement.dtg.h"
#include "compiler/cost_estimator/tensor_set_movement.dtg.h"
#include "compiler/cost_estimator/timed_component.dtg.h"
#include "compiler/cost_estimator/timed_dependency.dtg.h"
#include "compiler/cost_estimator/timed_layer.dtg.h"
#include "compiler/machine_mapping/device_mapping.h"
#include "compiler/machine_mapping/machine_mapping.h"
#include "op-attrs/parallel_tensor_shape.dtg.h"
#include "op-attrs/parallel_tensor_shape.h"
#include "pcg/device_id.h"
#include "pcg/device_id_t.dtg.h"
#include "pcg/machine_specification.h"
#include "pcg/machine_view.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph_edge.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph_edge.h"
#include "pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h"
#include "pcg/parallel_computation_graph/parallel_tensor_guid_t.h"
#include "substitutions/sub_parallel_computation_graph.h"
#include "substitutions/sub_parallel_computation_graph_edge.dtg.h"
#include "utils/containers/all_of.h"
#include "utils/containers/filtrans.h"
#include "utils/containers/generate_map.h"
#include "utils/containers/get_one_of.h"
#include "utils/containers/is_subseteq_of.h"
#include "utils/containers/keys.h"
#include "utils/containers/set_union.h"
#include "utils/containers/transform.h"
#include "utils/containers/unordered_set_of.h"
#include "utils/containers/values.h"
#include "utils/deduplicated_priority_queue.h"
#include "utils/graph/dataflow_graph/algorithms/get_outgoing_edges.h"
#include "utils/graph/open_dataflow_graph/algorithms/get_open_dataflow_graph_inputs.h"
#include "utils/graph/open_dataflow_graph/algorithms/get_source_nodes.h"
#include "utils/hash/unordered_set.h"
#include "utils/overload.h"
#include <optional>
#include <unordered_set>

namespace FlexFlow {

static float
single_parallel_layer_cost_estimator(parallel_layer_guid_t const &layer,
ParallelComputationGraph const &pcg,
CostEstimator const &estimator,
MachineView const &mv) {
return estimator.estimate_cost(
get_mapped_op_cost_estimate_key_for_layer(pcg, layer, mv));
}

static float single_dependency_cost_estimator(
ParallelComputationGraphEdge const &dependency,
ParallelComputationGraph const &pcg,
MachineMapping const &machine_mapping,
CostEstimator const &estimator) {
parallel_layer_guid_t incoming = get_src_layer(dependency);
parallel_layer_guid_t outgoing = get_dst_layer(dependency);
MachineView src_mv = machine_mapping.machine_views.at(incoming);
MachineView dst_mv = machine_mapping.machine_views.at(outgoing);
ParallelTensorShape tensor_shape = get_parallel_tensor_shape(
pcg, parallel_tensor_guid_t{dependency.raw_edge.src});
TensorSetMovement movement = TensorSetMovement{
{SingleTensorMovement{tensor_shape, {src_mv}, {dst_mv}}}};
return estimator.estimate_cost(movement);
}

float task_simulator_estimate_forward_pass_time(
ParallelComputationGraph const &pcg,
CostEstimator const &estimator,
MachineMapping const &machine_mapping,
MachineSpecification const &machine_spec) {

float current_time = 0.0f;

std::unordered_set<parallel_layer_guid_t> ready_layers;
DeduplicatedPriorityQueue<TimedComponent, std::vector<TimedComponent>>
component_processing;
std::unordered_set<TimedComponent> processed_components;

DeviceMapping device_mapping =
get_device_mapping(machine_mapping, machine_spec, pcg);

std::unordered_map<device_id_t, bool> devices =
generate_map(set_union(values(device_mapping.raw_device_map)),
[](device_id_t const &d) { return false; });

auto start_layer_processing = [&](parallel_layer_guid_t const &layer) {
float cost = single_parallel_layer_cost_estimator(
layer, pcg, estimator, machine_mapping.machine_views.at(layer));
component_processing.push(
TimedComponent{TimedLayer{current_time + cost, layer}});
for (device_id_t d : device_mapping.raw_device_map.at(layer)) {
devices.at(d) = true;
}
ready_layers.erase(layer);
};

auto start_dependency_processing =
[&](ParallelComputationGraphEdge const &dependency, float start_time) {
float cost = single_dependency_cost_estimator(
dependency, pcg, machine_mapping, estimator);
component_processing.push(
TimedComponent{TimedDependency{start_time + cost, dependency}});
};

auto finish_layer_processing = [&](TimedLayer const &timed_layer) {
for (device_id_t d : device_mapping.raw_device_map.at(timed_layer.layer)) {
devices.at(d) = false;
}
processed_components.insert(TimedComponent{timed_layer});
current_time = timed_layer.endtime;
std::unordered_set<ParallelComputationGraphEdge> outgoing_dependencies =
get_outgoing_edges(pcg, timed_layer.layer);
for (ParallelComputationGraphEdge const &dep : outgoing_dependencies) {
start_dependency_processing(dep, timed_layer.endtime);
}
};

auto finish_dependency_processing =
[&](TimedDependency const &timed_dependency) {
processed_components.insert(TimedComponent{timed_dependency});
parallel_layer_guid_t destination_layer =
get_dst_layer(timed_dependency.raw_edge);
std::unordered_set<ParallelComputationGraphEdge> incoming_dependencies =
get_incoming_edges(pcg, destination_layer);
std::unordered_set<ParallelComputationGraphEdge>
non_timed_processed_dependencies = filtrans(
processed_components, [](TimedComponent const &component) {
return component
.visit<std::optional<ParallelComputationGraphEdge>>(
overload{[&](TimedLayer const &layer) {
return std::nullopt;
},
[&](TimedDependency const &dep) {
return dep.raw_edge;
}});
});
// start processing a new node if all dependencies have been processed
// already
if (is_subseteq_of(incoming_dependencies,
non_timed_processed_dependencies)) {
ready_layers.insert(destination_layer);
}
current_time = timed_dependency.endtime;
};

for (parallel_layer_guid_t const &layer : get_initial_layers(pcg)) {
ready_layers.insert(layer);
}

while (!ready_layers.empty() || !component_processing.empty()) {

auto frontier_copy = ready_layers;
for (parallel_layer_guid_t const &layer : frontier_copy) {
auto layer_devices = device_mapping.raw_device_map.at(layer);
if (all_of(layer_devices,
[&](device_id_t d) { return devices.at(d) == false; })) {
start_layer_processing(layer);
}
}

if (!component_processing.empty()) {
TimedComponent component = component_processing.top();
component_processing.pop();

if (component.has<TimedDependency>()) {
finish_dependency_processing(component.get<TimedDependency>());
} else if (component.has<TimedLayer>()) {
finish_layer_processing(component.get<TimedLayer>());
}
}
}

return current_time;
}

} // namespace FlexFlow
Loading