Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
0ca274c
start
lroberts36 Apr 16, 2025
8231df5
seemingly works
lroberts36 Mar 4, 2025
53baa5b
format
lroberts36 Mar 4, 2025
fa704d5
remove intermediate prints
lroberts36 Mar 4, 2025
bd7e649
print some solutions
lroberts36 Mar 4, 2025
1b5ca18
small
lroberts36 Mar 26, 2025
9568945
remove comments
lroberts36 Mar 26, 2025
257cede
Increase MPI timeout counter by 10x (#1243)
pgrete Apr 17, 2025
f2b9185
cleanup output and add option of outputting a specific stage
jonahm-LANL Apr 23, 2025
58e8d43
docs
jonahm-LANL Apr 23, 2025
f89802c
changelog and formatting
jonahm-LANL Apr 23, 2025
78ce0e3
oops swarms are only in Base
jonahm-LANL Apr 23, 2025
c41148b
formatting
jonahm-LANL Apr 23, 2025
28bcb6d
lukes nicer cleanup
jonahm-LANL Apr 24, 2025
09d8a87
add compute asymmetry script
jonahm-LANL Apr 24, 2025
7ed5b05
Add shebang to movie2d
jonahm-LANL Apr 24, 2025
e77d876
document how to change output cadence from restart
jonahm-LANL Apr 24, 2025
80c67b8
changelog
jonahm-LANL Apr 24, 2025
95bddb8
CC
jonahm-LANL Apr 24, 2025
4a8242e
CC
jonahm-LANL Apr 24, 2025
3325440
add discussion of SMR to AMR doc
jonahm-LANL Apr 22, 2025
b5cefbf
changelog
jonahm-LANL Apr 22, 2025
dc8df19
typo thanks patrick
jonahm-LANL Apr 22, 2025
44c4be7
Try to move all LogicalLocation related logic to class methods
lroberts36 Apr 24, 2025
2c367ba
fix bugs
lroberts36 Apr 24, 2025
d1c65fe
make sure colorbar label generated
jonahm-LANL Apr 28, 2025
0348a2a
add resport_asymmetry
jonahm-LANL Apr 30, 2025
94dfbd5
Update doc/sphinx/src/outputs.rst
Yurlungur Apr 30, 2025
924fa91
Update doc/sphinx/src/outputs.rst
Yurlungur Apr 30, 2025
1140020
Add checks for `parthenon/mesh/multigrid = true` when using multigrid…
lroberts36 May 16, 2025
cd4148a
[Bugfix]: Propagate flux `Metadata` correctly in `SparsePool` (#1249)
lroberts36 May 16, 2025
4fb0d94
[Trivial] Fix leftover reference to old coords interface (#1251)
pgrete May 17, 2025
eabf2d0
Release 25.05 (#1252)
pgrete May 21, 2025
7ed5abf
Add support for uint64 swarm vars and add default ids (#1253)
pgrete Jun 6, 2025
cdd399e
New CUDA CI+development Docker container (#1162)
BenWibking Jun 9, 2025
8899cb7
Fix -m (#1257)
adamdempsey90 Jun 10, 2025
6afe50d
remove kokkos warning
jonahm-LANL Jun 12, 2025
1113d3b
add core dump output format
jonahm-LANL Jun 12, 2025
aad682d
check that only one core dump file is requested
jonahm-LANL Jun 12, 2025
460fc37
changelog
jonahm-LANL Jun 12, 2025
fde392f
update docs and delete accidentally added files
jonahm-LANL Jun 12, 2025
46b8ce6
output postfix -> .chdf
jonahm-LANL Jun 12, 2025
e4cfcdc
make sure to slurp in fluxes
jonahm-LANL Jun 12, 2025
63c513d
add a 5
jonahm-LANL Jun 12, 2025
701a17d
CC
jonahm-LANL Jun 12, 2025
9d36b5e
more CC
jonahm-LANL Jun 12, 2025
12e82dc
example CC
jonahm-LANL Jun 12, 2025
0ec2686
comment that ghost zones exist
jonahm-LANL Jun 12, 2025
bb9881a
seemingly working timeout capability
lroberts36 Jun 18, 2025
4ad1d93
Merge branch 'develop' into lroberts36/add-taskregion-timer
lroberts36 Jun 18, 2025
94aff1e
seemingly working
lroberts36 Jun 18, 2025
0513b75
add timing option
lroberts36 Jun 19, 2025
3ae45ff
format
lroberts36 Jun 19, 2025
6498ef7
changelog
lroberts36 Jun 19, 2025
739fede
Remove old receive try test
lroberts36 Jun 19, 2025
35aee2f
remove comm buffer hang check
lroberts36 Jun 19, 2025
93d9721
use task collection for internal mesh communication
lroberts36 Jun 19, 2025
8b81c94
Merge branch 'develop' into lroberts36/add-taskregion-timer
lroberts36 Jun 23, 2025
b851966
Merge branch 'develop' into lroberts36/add-taskregion-timer
lroberts36 Jun 26, 2025
8917b82
Add timeout parameter to doc
lroberts36 Jul 7, 2025
87136a8
Merge branch 'develop' into lroberts36/add-taskregion-timer
lroberts36 Jul 14, 2025
02666f7
format
lroberts36 Jul 14, 2025
b9adcba
Merge branch 'develop' into lroberts36/add-taskregion-timer
Yurlungur Jul 24, 2025
61c85ff
Update src/mesh/mesh.cpp
lroberts36 Aug 14, 2025
9dfcb58
Update src/tasks/tasks.hpp
lroberts36 Aug 14, 2025
db3d4e3
Merge branch 'develop' into lroberts36/add-taskregion-timer
lroberts36 Aug 15, 2025
d87b251
Merge branch 'develop' into lroberts36/add-taskregion-timer
pgrete Aug 17, 2025
4e4b8b2
Merge branch 'develop' into lroberts36/add-taskregion-timer
pgrete Sep 2, 2025
39cf680
Fix auto-docs and typo
pgrete Sep 2, 2025
d45dff0
respond to Philipp's comments
lroberts36 Sep 3, 2025
6ba1c44
check the task collection return status
lroberts36 Sep 3, 2025
7a82f07
Merge branch 'develop' into lroberts36/add-taskregion-timer
pgrete Sep 5, 2025
63f033b
Merge branch 'develop' into lroberts36/add-taskregion-timer
pgrete Sep 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Current develop

### Added (new features/APIs/variables/...)
- [[PR 1244]](https://github.com/parthenon-hpc-lab/parthenon/pull/1244) Add TaskCollection timeout capability
- [[PR 1311]](https://github.com/parthenon-hpc-lab/parthenon/pull/1311) Magnitude refinement criteria, per-package PostInitialize function hooks, mergescv and prettyparams utilities
- [[PR 1142]](https://github.com/parthenon-hpc-lab/parthenon/pull/1142) Unify par_dispatch, par_for_outer & par_for_inner overloads
- [[PR 1255]](https://github.com/parthenon-hpc-lab/parthenon/pull/1255) RK34 low storage 3rd order 4 stage SSP integrator with CFL <= 2 from Spiteri & Ruuth 2002, SIAM Journal on Numerical Analysis, 40(2):469–491
Expand Down
4 changes: 3 additions & 1 deletion doc/sphinx/src/driver.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ More details on integrators can be found on the :ref:`integrators` page.
Second, it defines a ``Step()`` function, which is reponsible for taking
a timestep by looping over stages and calling the
``ConstructAndExecuteTaskLists`` function which builds and executes the
tasks for each stage. Applications that derive from ``MultiStageDriver``
tasks for each stage. The task list called here has a timeout set by the
parameter ``parthenon/mesh/task_collection_timeout_in_seconds``, which
defaults to five minutes. Applications that derive from ``MultiStageDriver``
are responsible for defining a ``MakeTaskCollection`` function that
makes a ``TaskCollection`` given a ``BlockList_t &`` and an integer
stage. The advection example
Expand Down
1 change: 1 addition & 0 deletions doc/sphinx/src/generated/diffusion-parth-table.csv
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ block,parameters,type,default,description
"parthenon/mesh","packs_per_rank","int","1","number of meshblockpacks per rank, overrides pack_size"
"parthenon/mesh","refinement","string","none","mesh refinement mode; Allowed values: adaptive, none, static"
"parthenon/mesh","refinement_in_one_min_nbufs","int","64",""
"parthenon/mesh","task_collection_timeout_in_seconds","int","300",""
"parthenon/mesh","x1max","Real","","maximum x1 value of domain"
"parthenon/mesh","x1min","Real","","minimum x1 value of domain"
"parthenon/mesh","x1rat","Real","1","unused"
Expand Down
4 changes: 2 additions & 2 deletions src/driver/driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ TaskListStatus ConstructAndExecuteBlockTasks(T *driver, Args... args) {
for (auto &pmb : driver->pmesh->block_list) {
tr[i++] = driver->MakeTaskList(pmb.get(), std::forward<Args>(args)...);
}
TaskListStatus status = tc.Execute();
TaskListStatus status = tc.Execute(driver->pmesh->task_collection_timeout_in_seconds);
Comment thread
Yurlungur marked this conversation as resolved.
return status;
}

template <typename T, class... Args>
TaskListStatus ConstructAndExecuteTaskLists(T *driver, Args... args) {
TaskCollection tc =
driver->MakeTaskCollection(driver->pmesh->block_list, std::forward<Args>(args)...);
TaskListStatus status = tc.Execute();
TaskListStatus status = tc.Execute(driver->pmesh->task_collection_timeout_in_seconds);
return status;
}

Expand Down
71 changes: 10 additions & 61 deletions src/mesh/mesh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, Packages_t &packages,
"enable a multigrid mesh")),
nbnew(), nbdel(), step_since_lb(), gflag(), packages(packages),
resolved_packages(ResolvePackages(packages)),
task_collection_timeout_in_seconds(pin->GetOrAddInteger(
"parthenon/mesh", "task_collection_timeout_in_seconds", 60 * 5)),
// private members:
num_mesh_threads_(
pin->GetOrAddInteger("parthenon/mesh", "num_threads", 1,
Expand Down Expand Up @@ -653,70 +655,17 @@ void Mesh::BuildTagMapAndBoundaryBuffers() {
void Mesh::CommunicateBoundaries(std::string md_name,
const std::vector<std::string> &fields) {
const int num_partitions = DefaultNumPartitions();
const int nmb = GetNumMeshBlocksThisRank(Globals::my_rank);
constexpr std::int64_t max_it = 1e10;
std::vector<bool> sent(num_partitions, false);
bool all_sent;
std::int64_t send_iters = 0;

TaskCollection tc;
TaskRegion &region = tc.AddRegion(num_partitions);
auto partitions = GetDefaultBlockPartitions();
do {
all_sent = true;
for (int i = 0; i < partitions.size(); ++i) {
auto &md = mesh_data.Add(md_name, partitions[i], fields);
if (!sent[i]) {
if (SendBoundaryBuffers(md) != TaskStatus::complete) {
all_sent = false;
} else {
sent[i] = true;
}
}
}
send_iters++;
} while (!all_sent && send_iters < max_it);
PARTHENON_REQUIRE(
send_iters < max_it,
"Too many iterations waiting to send boundary communication buffers.");

// wait to receive FillGhost variables
// TODO(someone) evaluate if ReceiveWithWait kind of logic is better, also related to
// https://github.com/lanl/parthenon/issues/418
std::vector<bool> received(num_partitions, false);
bool all_received;
std::int64_t receive_iters = 0;
do {
all_received = true;
for (int i = 0; i < partitions.size(); ++i) {
auto &md = mesh_data.Add(md_name, partitions[i], fields);
if (!received[i]) {
if (ReceiveBoundaryBuffers(md) != TaskStatus::complete) {
all_received = false;
} else {
received[i] = true;
}
}
}
receive_iters++;
} while (!all_received && receive_iters < max_it);
PARTHENON_REQUIRE(
receive_iters < max_it,
"Too many iterations waiting to receive boundary communication buffers.");

for (auto &partition : partitions) {
auto &md = mesh_data.Add(md_name, partition, fields);
// unpack FillGhost variables
SetBoundaries(md);
}

// Now do prolongation, compute primitives, apply BCs
for (auto &partition : partitions) {
auto &md = mesh_data.Add(md_name, partition, fields);
if (multilevel) {
ApplyBoundaryConditionsOnCoarseOrFineMD(md, true);
ProlongateBoundaries(md);
}
ApplyBoundaryConditionsOnCoarseOrFineMD(md, false);
for (int i = 0; i < num_partitions; i++) {
auto &md = mesh_data.Add(md_name, partitions[i], fields);
auto bound = AddBoundaryExchangeTasks(TaskID(0), region[i], md, multilevel);
Comment thread
lroberts36 marked this conversation as resolved.
}
TaskListStatus status = tc.Execute(task_collection_timeout_in_seconds);
PARTHENON_REQUIRE(status == TaskListStatus::complete,
"Boundary communication called internal by mesh failed.");
}

void Mesh::PreCommFillDerived() {
Expand Down
1 change: 1 addition & 0 deletions src/mesh/mesh.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class Mesh {

int step_since_lb;
int gflag;
int task_collection_timeout_in_seconds;

BlockList_t block_list;
Packages_t packages;
Expand Down
5 changes: 2 additions & 3 deletions src/tasks/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,11 @@ TaskListStatus TaskRegion::Execute(Pool_t &pool) {
}

// then wait until everything is done
pool.wait();
pool.check_task_returns();
const TaskStatus status = pool.wait();

// Check the results, so as to fire any exceptions from threads
// Return failure if a task failed
if (task_failed.load(std::memory_order_acquire)) {
if (status != TaskStatus::complete || task_failed.load(std::memory_order_acquire)) {
return TaskListStatus::fail;
} else {
return TaskListStatus::complete;
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ class TaskCollection {
regions.emplace_back(num_lists);
return regions.back();
}
TaskListStatus Execute() {
static Pool_t pool(1);
TaskListStatus Execute(std::size_t timeout_in_seconds = 60 * 5) {
static Pool_t pool(timeout_in_seconds, 1);
return Execute(pool);
}
TaskListStatus Execute(Pool_t &pool) {
Expand Down
54 changes: 36 additions & 18 deletions src/tasks/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#ifndef TASKS_THREAD_POOL_HPP_
#define TASKS_THREAD_POOL_HPP_

#include <chrono>
#include <condition_variable>
#include <functional>
#include <future>
Expand Down Expand Up @@ -66,17 +67,22 @@ class ThreadQueue {
complete = true;
cv.notify_all();
}
void wait_for_complete() {
std::unique_lock<std::mutex> lock(mutex);
waiting = true;
if (queue.empty() && nwaiting == nworkers) {
bool wait_for_complete(std::chrono::seconds max_time) {
bool timeout{false};
Comment thread
lroberts36 marked this conversation as resolved.
{
std::unique_lock<std::mutex> lock(mutex);
waiting = true;
if (queue.empty() && nwaiting == nworkers) {
complete = false;
waiting = false;
return timeout;
}
timeout = !complete_cv.wait_for(lock, max_time, [this]() { return complete; });
complete = false;
waiting = false;
return;
}
complete_cv.wait(lock, [this]() { return complete; });
complete = false;
waiting = false;
if (timeout) signal_kill();
return timeout;
}

private:
Expand Down Expand Up @@ -124,8 +130,9 @@ class ThreadVector {

class ThreadPool {
public:
explicit ThreadPool(const int numthreads = std::thread::hardware_concurrency())
: nthreads(numthreads), queue(nthreads) {
explicit ThreadPool(const std::size_t timeout_in_seconds,
const int numthreads = std::thread::hardware_concurrency())
: nthreads(numthreads), queue(nthreads), timeout_duration(timeout_in_seconds) {
for (int i = 0; i < nthreads; i++) {
auto worker = [&]() {
while (true) {
Expand All @@ -145,8 +152,6 @@ class ThreadPool {
}
}

void wait() { queue.wait_for_complete(); }

void kill() { queue.signal_kill(); }

template <typename F, class... Args>
Expand All @@ -164,29 +169,32 @@ class ThreadPool {
// Mostly this exists to throw any exceptions,
// but we can check returns too.
// Would need changes for >1 failure mode
TaskStatus check_task_returns() {
queue.wait_for_complete();
TaskStatus wait() {
const bool timeout = queue.wait_for_complete(timeout_duration);
TaskStatus overall = TaskStatus::complete;
for (auto &task : run_tasks) {
TaskStatus task_return = task->get_future().get();
if (task_return == TaskStatus::fail) overall = TaskStatus::fail;
}
run_tasks.clear();

return overall;
return timeout ? TaskStatus::fail : overall;
}

private:
const int nthreads;
std::vector<std::thread> threads;
ThreadQueue<std::function<void()>> queue;
ThreadVector<std::shared_ptr<std::packaged_task<TaskStatus()>>> run_tasks;
std::chrono::seconds timeout_duration;
};

template <typename return_t = TaskStatus>
class SerialPool {
public:
explicit SerialPool([[maybe_unused]] const int numthreads = 1) {}
explicit SerialPool(const std::size_t timeout_in_seconds,
[[maybe_unused]] const int numthreads = 1)
: timeout_duration(timeout_in_seconds) {}

template <typename F, class... Args>
void enqueue(F &&f, Args &&...args) {
Expand All @@ -197,23 +205,33 @@ class SerialPool {
}

int size() const { return 1; }
void wait() {}

TaskStatus check_task_returns() {
TaskStatus wait() {
TaskStatus overall = TaskStatus::complete;

const auto start_time = std::chrono::high_resolution_clock::now();
auto timeout_check = [start_time, timeout_duration = timeout_duration]() {
const auto end_time = std::chrono::high_resolution_clock::now();
const auto duration =
std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time);
return duration >= timeout_duration;
};

while (!queue.empty()) {
auto f = queue.front();
auto ret = f();
if constexpr (std::is_same<return_t, TaskStatus>::value) {
if (ret == TaskStatus::fail) overall = TaskStatus::fail;
}
queue.pop();
if (timeout_check()) return TaskStatus::fail;
}
return overall;
}

private:
std::queue<std::function<return_t()>> queue;
std::chrono::seconds timeout_duration;
};

#ifdef PARTHENON_USE_SERIAL_POOL
Expand Down
11 changes: 11 additions & 0 deletions tst/unit/test_tasklist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ TEST_CASE("Task Object Lifecycle", "[TaskList][AddTask]") {
REQUIRE(track_destruction.expired());
}
}

TEST_CASE("TaskCollection timeout", "[TaskList][AddTask]") {
GIVEN("A TaskCollection") {
parthenon::TaskCollection tc;
parthenon::TaskRegion &region = tc.AddRegion(1);
region[0].AddTask(TaskID(0), []() { return TaskStatus::incomplete; });
const std::size_t timeout_in_seconds = 4;
parthenon::TaskListStatus status = tc.Execute(timeout_in_seconds);
REQUIRE(status == parthenon::TaskListStatus::fail);
}
}
Loading