Skip to content

Commit

Permalink
Merge pull request #466 from JetBrains/ab-fix-callback-lifetimes
Browse files Browse the repository at this point in the history
Pack of RD lifetimes fixes + auto-binding for task results
  • Loading branch information
mirasrael authored Jan 16, 2024
2 parents 7f3315d + 3563bee commit 2b05bd0
Show file tree
Hide file tree
Showing 19 changed files with 564 additions and 80 deletions.
1 change: 1 addition & 0 deletions rd-cpp/src/rd_core_cpp/src/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ set(RD_CORE_CPP_SOURCES
#pch
${PCH_CPP_OPT}
util/export_api_helper.h
util/lifetime_util.h
)

if (RD_STATIC)
Expand Down
13 changes: 13 additions & 0 deletions rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ Lifetime const& Lifetime::Eternal()
return ETERNAL;
}


Lifetime const& Lifetime::Terminated()
{
static Lifetime TERMINATED = []
{
Lifetime lifetime;
lifetime->terminate();
return lifetime;
}();

return TERMINATED;
}

bool operator==(Lifetime const& lw1, Lifetime const& lw2)
{
return lw1.ptr == lw2.ptr;
Expand Down
3 changes: 3 additions & 0 deletions rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ class RD_CORE_API Lifetime final
std::shared_ptr<LifetimeImpl> ptr;

public:
typedef LifetimeImpl::counter_t counter_t;

static Lifetime const& Eternal();
static Lifetime const& Terminated();

// region ctor/dtor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace rd
{
LifetimeDefinition::LifetimeDefinition(bool eternaled) : eternaled(eternaled), lifetime(eternaled)
LifetimeDefinition::LifetimeDefinition(bool eternaled) : lifetime(eternaled)
{
}

Expand All @@ -18,7 +18,7 @@ bool LifetimeDefinition::is_terminated() const
return lifetime->is_terminated();
}

void LifetimeDefinition::terminate()
void LifetimeDefinition::terminate() const
{
lifetime->terminate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ class RD_CORE_API LifetimeDefinition
{
private:
friend class SequentialLifetimes;

bool eternaled = false;

public:
Lifetime lifetime;

Expand All @@ -39,14 +36,13 @@ class RD_CORE_API LifetimeDefinition

virtual ~LifetimeDefinition();

// static std::shared_ptr<LifetimeDefinition> eternal;
static std::shared_ptr<LifetimeDefinition> get_shared_eternal();

bool is_terminated() const;

bool is_eternal() const;

void terminate();
void terminate() const;

template <typename F>
static auto use(F&& block) -> typename util::result_of_t<F(Lifetime)>
Expand Down
33 changes: 32 additions & 1 deletion rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class RD_CORE_API LifetimeImpl final
{
public:
friend class LifetimeDefinition;

friend class SequentialLifetimes;
friend class Lifetime;

using counter_t = int32_t;
Expand Down Expand Up @@ -75,6 +75,37 @@ class RD_CORE_API LifetimeImpl final
actions.erase(i);
}


// Attach pointer to lifetime. It guarantee that pointer will survive at least lifetime duration.
template <typename T, typename ...Args>
std::shared_ptr<T> make_attached(Args... args)
{
auto ptr = std::make_shared<T>(std::forward<Args>(args)...);
attach(ptr);
return ptr;
}

/// \brief Attach pointer to lifetime. Guarantees that pointer will survive at least lifetime duration.
template <typename T>
counter_t attach(std::shared_ptr<T> pointer)
{
// No-op structure wich acts as an action, but preserves pointer until lifetime terminated
struct holder
{
std::shared_ptr<T> pointer;

explicit holder(const std::shared_ptr<T>& pointer) : pointer(pointer)
{
}

void operator()() const
{
}
};

return add_action(holder(pointer));
}

#if __cplusplus >= 201703L
static inline counter_t get_id = 0;
#else
Expand Down
22 changes: 11 additions & 11 deletions rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,32 @@

namespace rd
{
SequentialLifetimes::SequentialLifetimes(Lifetime parent_lifetime) : parent_lifetime(std::move(parent_lifetime))
SequentialLifetimes::SequentialLifetimes(const Lifetime& parent_lifetime) : parent_lifetime(parent_lifetime)
{
this->parent_lifetime->add_action([this] { set_current_lifetime(LifetimeDefinition::get_shared_eternal()); });
}

Lifetime SequentialLifetimes::next()
{
std::shared_ptr<LifetimeDefinition> new_def = std::make_shared<LifetimeDefinition>(parent_lifetime);
set_current_lifetime(new_def);
return current_def->lifetime;
Lifetime new_lifetime = parent_lifetime.create_nested();
set_current_lifetime(new_lifetime);
return new_lifetime;
}

void SequentialLifetimes::terminate_current()
{
set_current_lifetime(LifetimeDefinition::get_shared_eternal());
set_current_lifetime(Lifetime::Terminated());
}

bool SequentialLifetimes::is_terminated() const
{
return current_def->is_eternal() || current_def->is_terminated();
return current_lifetime->is_terminated();
}

void SequentialLifetimes::set_current_lifetime(std::shared_ptr<LifetimeDefinition> new_def)
void SequentialLifetimes::set_current_lifetime(const Lifetime& lifetime)
{
std::shared_ptr<LifetimeDefinition> prev = current_def;
current_def = new_def;
prev->terminate();
const Lifetime prev = current_lifetime;
current_lifetime = lifetime;
if (!prev->is_terminated())
prev->terminate();
}
} // namespace rd
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace rd
{
class RD_CORE_API SequentialLifetimes
{
private:
std::shared_ptr<LifetimeDefinition> current_def = LifetimeDefinition::get_shared_eternal();
Lifetime parent_lifetime;
Lifetime current_lifetime = Lifetime::Terminated();
void set_current_lifetime(const Lifetime& lifetime);

public:
// region ctor/dtor
Expand All @@ -28,16 +28,14 @@ class RD_CORE_API SequentialLifetimes

SequentialLifetimes& operator=(SequentialLifetimes&&) = delete;

explicit SequentialLifetimes(Lifetime parent_lifetime);
explicit SequentialLifetimes(const Lifetime& parent_lifetime);
// endregion

Lifetime next();

void terminate_current();

bool is_terminated() const;

void set_current_lifetime(std::shared_ptr<LifetimeDefinition> new_def);
};
} // namespace rd

Expand Down
84 changes: 56 additions & 28 deletions rd-cpp/src/rd_core_cpp/src/main/reactive/base/SignalX.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

#include "interfaces.h"
#include "SignalCookie.h"
#include "lifetime/LifetimeDefinition.h"

#include <lifetime/Lifetime.h>
#include <util/core_util.h>

#include <utility>
#include <functional>
#include <atomic>
#include <list>

namespace rd
{
Expand All @@ -22,67 +24,93 @@ class Signal final : public ISignal<T>
private:
using WT = typename ISignal<T>::WT;

class Event
struct Event
{
using F = std::function<void(T const&)>;
private:
std::function<void(T const&)> action;
Lifetime lifetime;
F action;
std::atomic_int8_t state;

constexpr static int8_t ACTIVE = 0;
constexpr static int8_t FIRING = 1;
constexpr static int8_t TERMINATED = 2;
public:
// region ctor/dtor
Event() = delete;

template <typename F>
Event(F&& action, Lifetime lifetime) : action(std::forward<F>(action)), lifetime(lifetime)
explicit Event(const Lifetime& lifetime, F&& action) : lifetime(lifetime), action(std::forward<F>(action)), state(ACTIVE)
{
}

Event(Event&&) = default;
// endregion
Event& operator=(Event&& other) = default;

bool is_alive() const
bool operator()(T const& arg)
{
return !lifetime->is_terminated();
}
if (lifetime->is_terminated())
return false;

void execute_if_alive(T const& value) const
{
if (is_alive())
auto expected_state = ACTIVE;
// set firing flag to prevent action destruction during action firing
// skip action if it isn't active (lifetime was terminated)
if (!state.compare_exchange_strong(expected_state, FIRING))
return false;

expected_state = FIRING;
try
{
action(value);
action(arg);
return state.compare_exchange_strong(expected_state, ACTIVE);
}
catch (...)
{
if (!state.compare_exchange_strong(expected_state, ACTIVE))
action = nullptr;
throw;
}
}

void terminate()
{
const auto old_state = state.exchange(TERMINATED);
// release action immediatelly if it isn't firing right now
if (old_state == ACTIVE)
action = nullptr;
lifetime = Lifetime::Terminated();
}
};

using counter_t = int32_t;
using listeners_t = std::map<counter_t, Event>;
using listeners_t = std::vector<std::shared_ptr<Event>>;

mutable counter_t advise_id = 0;
mutable listeners_t listeners, priority_listeners;

static void cleanup(listeners_t& queue)
{
util::erase_if(queue, [](Event const& e) -> bool { return !e.is_alive(); });
}

void fire_impl(T const& value, listeners_t& queue) const
{
for (auto const& p : queue)
auto it = queue.begin();
auto end = queue.end();
auto alive_it = it;
while (it != end)
{
auto const& event = p.second;
event.execute_if_alive(value);
if (it->get()->operator()(value))
{
if (alive_it != it)
*alive_it = std::move(*it);
++alive_it;
}
++it;
}
cleanup(queue);
if (alive_it != end)
queue.erase(alive_it, end);
}

template <typename F>
void advise0(const Lifetime& lifetime, F&& handler, listeners_t& queue) const
{
if (lifetime->is_terminated())
return;
counter_t id = advise_id /*.load()*/;
queue.emplace(id, Event(std::forward<F>(handler), lifetime));
++advise_id;
auto event_ptr = std::make_shared<Event>(lifetime, std::forward<F>(handler));
lifetime->add_action([event_ptr] { event_ptr->terminate(); });
queue.push_back(std::move(event_ptr));
}

public:
Expand Down
37 changes: 37 additions & 0 deletions rd-cpp/src/rd_core_cpp/src/main/util/lifetime_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef LIFETIME_UTIL_H
#define LIFETIME_UTIL_H

#include <memory>
#include "../lifetime/LifetimeDefinition.h"

namespace rd
{
namespace util
{
/// \brief Attaches lifetime to shared_ptr. Lifetime terminates when shared_ptr destructed.
/// \param original original pointer which will be used to make a new pointer with lifetime.
/// \param lifetime_definition Lifetime definition associated with returned shared_ptr.
/// \return New shared_ptr which owns lifetime_definition and terminates that lifetime when destroyed.
template <typename T>
static std::shared_ptr<T> attach_lifetime(std::shared_ptr<T> original, LifetimeDefinition lifetime_definition)
{
struct Deleter
{
std::shared_ptr<T> ptr;
LifetimeDefinition lifetime_definition;

explicit Deleter(LifetimeDefinition&& lifetime_definition, std::shared_ptr<T> ptr) : ptr(std::move(ptr)), lifetime_definition(std::move(lifetime_definition)) { }

void operator()(T*) const
{
}
};

auto raw_ptr = original.get();
auto deleter = Deleter(std::move(lifetime_definition), std::move(original));
return std::shared_ptr<T>(raw_ptr, std::move(deleter));
}
}
}

#endif //LIFETIME_UTIL_H
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,9 @@ void MessageBroker::advise_on(Lifetime lifetime, RdReactiveBase const* entity) c
lifetime->add_action([this, key]() { subscriptions.erase(key); });
}
}

bool MessageBroker::is_subscribed(const RdId id) const
{
return subscriptions.find(id) != subscriptions.end();
}
} // namespace rd
2 changes: 2 additions & 0 deletions rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class RD_FRAMEWORK_API MessageBroker final
void dispatch(RdId id, Buffer message) const;

void advise_on(Lifetime lifetime, RdReactiveBase const* entity) const;

bool is_subscribed(const RdId id) const;
};
} // namespace rd

Expand Down
Loading

0 comments on commit 2b05bd0

Please sign in to comment.