Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_OP_COST_ESTIMATE_KEY_H
#define _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_OP_COST_ESTIMATE_KEY_H

#include "compiler/cost_estimator/op_cost_estimate_key.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"
#include "pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h"

namespace FlexFlow {

OpCostEstimateKey get_mapped_op_cost_estimate_key_for_layer(
ParallelComputationGraph const &pcg,
parallel_layer_guid_t const &layer,
MachineView const &machine_view);

} // namespace FlexFlow

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace = "FlexFlow"
name = "TaskGraph"
features = [
]

includes = [
"utils/graph/digraph/digraph_view.h",
"<unordered_map>",
"<functional>",
"compiler/cost_estimator/tasks_state_tracker.dtg.h"
]

src_includes = [
"utils/hash/unordered_map.h",
"utils/fmt/unordered_map.h",
]

[[fields]]
name = "graph"
type = "::FlexFlow::DiGraphView"

[[fields]]
name = "cost_map"
type = "std::unordered_map<::FlexFlow::Node, float>"

[[fields]]
name = "is_allowed_to_run"
type = "std::function<bool(::FlexFlow::Node, ::FlexFlow::TasksStateTracker)>"
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace = "FlexFlow"
name = "TaskGraphProfile"

features = [
"hash",
"fmt",
"eq"
]

includes = [
"compiler/cost_estimator/task_profile.dtg.h",
"<unordered_set>"
]

src_includes = [
"utils/fmt/unordered_set.h",
"utils/hash/unordered_set.h"
]


[[fields]]
name = "task_profiles"
type = "std::unordered_set<::FlexFlow::TaskProfile>"

[[fields]]
name = "end_time"
type = "float"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#ifndef _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_TASK_GRAPH_TRAVERSAL_H
#define _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_TASK_GRAPH_TRAVERSAL_H

#include "compiler/cost_estimator/task_graph.dtg.h"
#include "compiler/cost_estimator/task_graph_profile.dtg.h"
namespace FlexFlow {

TaskGraphProfile simulate_forward_pass(TaskGraph const &task_graph);

} // namespace FlexFlow

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace = "FlexFlow"
name = "TaskProfile"

features = [
"eq",
"hash",
"fmt",
"ord"
]

includes = [
"utils/graph/node/node.dtg.h"
]


[[fields]]
name = "node"
type = "::FlexFlow::Node"

[[fields]]
name = "start_time"
type = "float"

[[fields]]
name = "end_time"
type = "float"
18 changes: 18 additions & 0 deletions lib/compiler/include/compiler/cost_estimator/task_simulator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_TASK_SIMULATOR_H
#define _FLEXFLOW_LIB_COMPILER_INCLUDE_COMPILER_COST_ESTIMATOR_TASK_SIMULATOR_H

#include "compiler/cost_estimator/cost_estimator.h"
#include "compiler/machine_mapping/machine_mapping.dtg.h"
#include "pcg/machine_specification.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"

namespace FlexFlow {
float task_simulator_estimate_forward_pass_time(
ParallelComputationGraph const &pcg,
CostEstimator const &estimator,
MachineMapping const &machine_mapping,
MachineSpecification const &machine_spec);

} // namespace FlexFlow

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace = "FlexFlow"
name = "TasksStateTracker"

features = [
]

includes = [
"utils/deduplicated_priority_queue.h",
"utils/graph/node/node.dtg.h",
"compiler/cost_estimator/timed_task.dtg.h",
"<unordered_set>",
"<functional>"
]

src_includes = [
"utils/hash/unordered_set.h",
"utils/fmt/unordered_set.h",
"utils/fmt/vector.h",
"utils/hash/vector.h"
]

[[fields]]
name = "ready_tasks"
type = "std::unordered_set<::FlexFlow::Node>"

[[fields]]
name = "tasks_processing"
type = "::FlexFlow::DeduplicatedPriorityQueue<::FlexFlow::TimedTask, std::vector<::FlexFlow::TimedTask>, std::greater<::FlexFlow::TimedTask>>"

[[fields]]
name = "processed_tasks"
type = "std::unordered_set<::FlexFlow::Node>"

[[fields]]
name = "current_time"
type = "float"
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace = "FlexFlow"
name = "TimedTask"

features = [
"eq",
"hash",
"fmt",
"ord"
]

includes = [
"utils/graph/node/node.dtg.h"
]


[[fields]]
name = "endtime"
type = "float"

[[fields]]
name = "node"
type = "::FlexFlow::Node"
28 changes: 28 additions & 0 deletions lib/compiler/include/compiler/machine_mapping/device_mapping.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

#include "compiler/machine_mapping/device_mapping.h"
#include "compiler/machine_mapping/device_mapping.dtg.h"
#include "pcg/machine_specification.h"
#include "pcg/machine_view.h"
#include "pcg/operator_task_space.dtg.h"
#include "pcg/operator_task_space.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.h"
#include "utils/containers/keys.h"
#include "utils/containers/map_values.h"

namespace FlexFlow {

DeviceMapping get_device_mapping(MachineMapping const &machine_mapping,
MachineSpecification const &machine_spec,
ParallelComputationGraph const &pcg) {
std::unordered_map<parallel_layer_guid_t, std::unordered_set<device_id_t>>
device_mapping;
for (auto const &[layer, machine_view] : machine_mapping.machine_views) {
OperatorTaskSpace op =
get_operator_task_space(pcg, layer));
device_mapping.insert(
{layer, get_device_ids(op, machine_view, machine_spec)});
}
return DeviceMapping{device_mapping};
}

} // namespace FlexFlow
17 changes: 17 additions & 0 deletions lib/compiler/include/compiler/machine_mapping/device_mapping.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef _FLEXFLOW_COMPILER_DEVICE_MAPPING_H
#define _FLEXFLOW_COMPILER_DEVICE_MAPPING_H

#include "compiler/machine_mapping/device_mapping.dtg.h"
#include "compiler/machine_mapping/machine_mapping.dtg.h"
#include "pcg/machine_specification.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"

namespace FlexFlow {

DeviceMapping get_device_mapping(MachineMapping const &machine_mapping,
MachineSpecification const &machine_spec,
ParallelComputationGraph const &pcg);

} // namespace FlexFlow

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace = "FlexFlow"
name = "DeviceMapping"
features = [
"eq",
# "ord",
"hash",
# "json",
# "rapidcheck",
"fmt",
]

includes = [
"pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h",
"pcg/device_id_t.dtg.h"
]

src_includes = [
"utils/hash/unordered_map.h",
"utils/fmt/unordered_map.h",
"utils/hash/unordered_set.h",
"utils/fmt/unordered_set.h"
]

[[fields]]
name = "raw_device_map"
type = "std::unordered_map<::FlexFlow::parallel_layer_guid_t, std::unordered_set<::FlexFlow::device_id_t>>"
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
#define _FLEXFLOW_COMPILER_MACHINE_MAPPING_H

#include "compiler/machine_mapping/machine_mapping.dtg.h"
#include "pcg/device_id_t.dtg.h"
#include "pcg/machine_specification.dtg.h"
#include "pcg/operator_task_space.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.dtg.h"

namespace FlexFlow {

Expand Down
4 changes: 4 additions & 0 deletions lib/compiler/src/compiler/allowed_machine_views.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ namespace FlexFlow {
bool is_valid_machine_view(MachineView const &mv,
OperatorTaskSpace const &task,
MachineSpecification const &ms) {
if (num_dims(mv) != num_dims(task)) {
return false;
}

std::optional<MachineSpaceCoordinate> maximum_device_coord =
get_machine_space_coordinate(
task, mv, get_task_space_maximum_coordinate(task), ms);
Expand Down
14 changes: 14 additions & 0 deletions lib/compiler/src/compiler/cost_estimator/op_cost_estimate_key.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "compiler/cost_estimator/op_cost_estimate_key.h"
#include "compiler/machine_mapping/machine_mapping_problem_tree/unmapped_op_cost_estimate_key.h"

namespace FlexFlow {

OpCostEstimateKey get_mapped_op_cost_estimate_key_for_layer(
ParallelComputationGraph const &pcg,
parallel_layer_guid_t const &layer,
MachineView const &machine_view) {
return map_unmapped_op_cost_estimate_key(
get_unmapped_op_cost_estimate_key_for_layer(pcg, layer), machine_view);
}

} // namespace FlexFlow
88 changes: 88 additions & 0 deletions lib/compiler/src/compiler/cost_estimator/task_graph_traversal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "compiler/cost_estimator/task_graph.dtg.h"
#include "compiler/cost_estimator/task_graph_profile.dtg.h"
#include "compiler/cost_estimator/tasks_state_tracker.dtg.h"
#include "compiler/cost_estimator/timed_task.dtg.h"
#include "pcg/parallel_computation_graph/parallel_computation_graph.h"
#include "utils/containers/filtrans.h"
#include "utils/containers/is_subseteq_of.h"
#include "utils/containers/sorted.h"
#include "utils/exception.h"
#include "utils/graph/digraph/algorithms.h"
#include "utils/graph/digraph/algorithms/get_predecessors.h"
#include "utils/graph/digraph/algorithms/get_successors.h"
#include "utils/overload.h"

namespace FlexFlow {

static void start_task_processing(TasksStateTracker &state_tracker,
TaskGraph const &task_graph,
Node const &task) {
float cost = task_graph.cost_map.at(task);
state_tracker.tasks_processing.push(
TimedTask{state_tracker.current_time + cost, task});
state_tracker.ready_tasks.erase(task);
}

static bool dependencies_are_satisfied(TasksStateTracker const &state_tracker,
TaskGraph const &task_graph,
Node const &task) {
std::unordered_set<Node> incoming_dependencies =
get_predecessors(task_graph.graph, task);
return is_subseteq_of(incoming_dependencies, state_tracker.processed_tasks);
}

static void
finish_task_processing(TasksStateTracker &state_tracker,
TaskGraph const &task_graph,
TimedTask const &timed_task,
std::unordered_set<TaskProfile> &task_profiles) {
state_tracker.processed_tasks.insert(timed_task.node);
for (Node const &task : get_successors(task_graph.graph, timed_task.node)) {
if (dependencies_are_satisfied(state_tracker, task_graph, task)) {
state_tracker.ready_tasks.insert(task);
}
}
task_profiles.insert(
TaskProfile{timed_task.node,
timed_task.endtime - task_graph.cost_map.at(timed_task.node),
timed_task.endtime});
state_tracker.current_time = timed_task.endtime;
}

static bool is_processing_done(TasksStateTracker const &state_tracker) {
return state_tracker.ready_tasks.empty() &&
state_tracker.tasks_processing.empty();
}

static TimedTask get_next_task(TasksStateTracker &state_tracker) {
TimedTask task = state_tracker.tasks_processing.top();
state_tracker.tasks_processing.pop();
return task;
}

TaskGraphProfile simulate_forward_pass(TaskGraph const &task_graph) {
TasksStateTracker state_tracker =
TasksStateTracker{get_sources(task_graph.graph), {}, {}, 0.0};

std::unordered_set<TaskProfile> task_profiles;

while (!is_processing_done(state_tracker)) {
auto ready_tasks_copy = state_tracker.ready_tasks;
for (Node const &task : sorted(ready_tasks_copy)) {
if (task_graph.is_allowed_to_run(task, state_tracker)) {
start_task_processing(state_tracker, task_graph, task);
}
}
if (!state_tracker.tasks_processing.contents().empty()) {
TimedTask next_task = get_next_task(state_tracker);
finish_task_processing(
state_tracker, task_graph, next_task, task_profiles);
} else {
throw mk_runtime_error("Constraints cannot be satisfied");
}
}

return TaskGraphProfile{task_profiles, state_tracker.current_time};
}

} // namespace FlexFlow
Loading