Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ownership #8

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4fd7010
works with chibios step1
SMarbaise Jul 19, 2021
63e163f
undo some changes
SMarbaise Aug 25, 2021
9ee29bd
changed time function
SMarbaise Aug 25, 2021
bf07eed
changed structure
SMarbaise Sep 1, 2021
60b36d9
Test again, boards reader and writer dont match
SMarbaise Sep 7, 2021
fca9539
clean up 1
SMarbaise Sep 13, 2021
dacf029
minor changes
SMarbaise Sep 23, 2021
f4faa7a
announce QoS
SMarbaise Sep 27, 2021
ff96f23
matching process doesnt really work
SMarbaise Sep 28, 2021
10a2846
Some changes
SMarbaise Oct 2, 2021
85a3f68
clean up
SMarbaise Oct 3, 2021
a9a8359
Increased Pbuf size , Fast dds sends to much stuff
SMarbaise Oct 4, 2021
65c728a
discovery writer
SMarbaise Oct 5, 2021
5e5af1e
discovery writer fix for fast dds
SMarbaise Oct 5, 2021
3ad8b3e
Reader discovery works
SMarbaise Oct 6, 2021
9e09d0d
messages at fast dds dont throw exception anymore
SMarbaise Oct 6, 2021
602fd24
reader handles ownership now
SMarbaise Oct 6, 2021
5bb8a80
ownership loop back, seems to not work 100%
SMarbaise Oct 7, 2021
0cd92f4
found ownership error
SMarbaise Oct 7, 2021
db46874
best effort reader ownership
SMarbaise Oct 12, 2021
567947d
best effort reader ownership
SMarbaise Oct 12, 2021
fa48b75
solved cdr alignment issue
SMarbaise Oct 17, 2021
77fb576
prepare rtt
SMarbaise Oct 20, 2021
91c1df9
fixed an ownership bug, rtps example dedicated to ownership test
SMarbaise Oct 23, 2021
a0eab28
merge
SMarbaise Oct 25, 2021
6e9a35d
Merge branch 'embedded-software-laboratory-master'
SMarbaise Oct 25, 2021
f83f2ed
Merge branch 'master' of github.com:SMarbaise/embeddedRTPS
SMarbaise Oct 25, 2021
d60b3bc
merge part one
SMarbaise Oct 25, 2021
9c21365
chibios
SMarbaise Oct 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/rtps/common/MD5.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Created by Sven on 14.10.21.
//

#ifndef RTPS_MD5_H
#define RTPS_MD5_H

#include "rtps/config.h"

namespace rtps{
// MD5 context.
typedef struct {
uint32_t state[4]; // state (ABCD)
uint32_t count[2]; // number of bits, modulo 2^64 (lsb first)
uint8_t buffer[64]; // input buffer
} MD5_CTX;

void MD5Init (MD5_CTX *);
void MD5Update (MD5_CTX *, const uint8_t *, uint32_t);
void MD5Final (uint8_t[16], MD5_CTX *);

}
#endif //EMBEDDEDRTPS_EXAMPLE_MD5_H
28 changes: 26 additions & 2 deletions include/rtps/common/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ enum class DurabilityKind_t : uint32_t {
PERSISTENT = 3
};

enum class OwnershipKind_t : uint32_t{
SHARED = 0,
EXCLUSIVE = 1
};

typedef uint32_t OwnershipStrength_t;

struct GuidPrefix_t {
std::array<uint8_t, 12> id;

Expand Down Expand Up @@ -225,8 +232,16 @@ enum class ChangeForReaderStatusKind {

enum class ChangeFromWriterStatusKind { LOST, MISSING, RECEIVED, UNKNOWN };

struct InstanceHandle_t { // TODO
uint64_t value;
struct InstanceHandle_t {
uint8_t key[16];
bool operator==(const InstanceHandle_t &other) const {
for (int i = 0; i < 16; i++) {
if(key[i] != other.key[i]) {
return false;
}
}
return true;
}
};

struct ParticipantMessageData { // TODO
Expand Down Expand Up @@ -269,7 +284,16 @@ const SequenceNumber_t SEQUENCENUMBER_UNKNOWN = {-1, 0};
const Time_t TIME_ZERO = {};
const Time_t TIME_INVALID = {-1, 0xFFFFFFFF};
const Time_t TIME_INFINITY = {0x7FFFFFFF, 0xFFFFFFFF};
class WriterProxy;

struct Instance_t{
InstanceHandle_t handle;
WriterProxy *owner = nullptr;
};

#ifndef CHIBIOS //has its own TIME_INFINITE
const Time_t TIME_INFINITE = {0x7FFFFFFF, 0xFFFFFFFF};
#endif
const VendorId_t VENDOR_UNKNOWN = {};
} // namespace rtps

Expand Down
1 change: 1 addition & 0 deletions include/rtps/communication/UdpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace rtps {

struct UdpConnection {
udp_pcb *pcb = nullptr;

uint16_t port = 0;

UdpConnection() = default; // Required for static allocation
Expand Down
16 changes: 13 additions & 3 deletions include/rtps/discovery/ParticipantProxyData.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ Author: i11 - Embedded Software, RWTH Aachen University
#include "rtps/common/Locator.h"
#include "rtps/config.h"
#include "rtps/messages/MessageTypes.h"

#include "ucdr/microcdr.h"

#if defined(unix) || defined(__unix__)
#include <chrono>
#elif defined(CHIBIOS)
#include "chtime.h"
#endif
#include <array>

Expand Down Expand Up @@ -63,8 +67,10 @@ class ParticipantProxyData {
#if defined(unix) || defined(__unix__)
std::chrono::time_point<std::chrono::high_resolution_clock>
m_lastLivelinessReceivedTimestamp;
#else
#elif !defined(CHIBIOS)
TickType_t m_lastLivelinessReceivedTickCount = 0;
#else
sysinterval_t m_lastLivelinessReceivedTickCount = 0;
#endif
void reset();

Expand Down Expand Up @@ -146,8 +152,10 @@ bool ParticipantProxyData::hasSubscriptionReader() {
void ParticipantProxyData::onAliveSignal() {
#if defined(unix) || defined(__unix__)
m_lastLivelinessReceivedTimestamp = std::chrono::high_resolution_clock::now();
#else
#elif !defined(CHIBIOS)
m_lastLivelinessReceivedTickCount = xTaskGetTickCount();
#else
m_lastLivelinessReceivedTickCount = chVTGetSystemTimeX();
#endif
}

Expand All @@ -157,9 +165,11 @@ uint32_t ParticipantProxyData::getAliveSignalAgeInMilliseconds() {
std::chrono::duration<double, std::milli> duration =
now - m_lastLivelinessReceivedTimestamp;
return duration.count();
#else
#elif !defined(CHIBIOS)
return (xTaskGetTickCount() - m_lastLivelinessReceivedTickCount) *
(1000 / configTICK_RATE_HZ);
#else
return chTimeI2MS(chVTGetSystemTimeX() - m_lastLivelinessReceivedTickCount);
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion include/rtps/discovery/SEDPAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SEDPAgent {
Participant *m_part;
sys_mutex_t m_mutex;
uint8_t m_buffer[600]; // TODO check size, currently changed from 300 to 600
// (FastDDS gives too many options)
// (FastDDS gives too much options)
BuiltInEndpoints m_endpoints;
/*
* If we add readers later on, remote participants will not send matching
Expand Down
7 changes: 5 additions & 2 deletions include/rtps/discovery/TopicData.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ struct TopicData {
DurabilityKind_t durabilityKind;
Locator unicastLocator;
Locator multicastLocator;

OwnershipKind_t ownership_Kind;
OwnershipStrength_t ownership_strenght;
TopicData()
: endpointGuid(GUID_UNKNOWN), typeName{'\0'}, topicName{'\0'},
reliabilityKind(ReliabilityKind_t::BEST_EFFORT),
Expand All @@ -57,13 +58,15 @@ struct TopicData {
192, 168, 0, 42, rtps::getUserUnicastPort(0));
unicastLocator = someLocator;
multicastLocator = Locator();
ownership_Kind = OwnershipKind_t::SHARED;
ownership_strenght = 0;
};

TopicData(Guid_t guid, ReliabilityKind_t reliability, Locator loc)
: endpointGuid(guid), typeName{'\0'}, topicName{'\0'},
reliabilityKind(reliability),
durabilityKind(DurabilityKind_t::TRANSIENT_LOCAL), unicastLocator(loc) {
}
};

bool matchesTopicOf(const TopicData &other);

Expand Down
11 changes: 11 additions & 0 deletions include/rtps/entities/Domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,24 @@ class Domain {
void stop();

Participant *createParticipant();

Writer *createWriter(Participant &part, const char *topicName,
const char *typeName, bool reliable,
bool enforceUnicast = false);


Writer *createWriter(Participant &part, const char *topicName, const char *typeName, bool topichasKey, OwnershipKind_t ownership_kind, OwnershipStrength_t ownershipStrength, bool reliable , bool enforceUnicast);


Reader *createReader(Participant &part, const char *topicName, bool topichasKey,
const char *typeName, bool reliable, OwnershipKind_t ownershipKind,
ip4_addr_t mcastaddress = {0});

Reader *createReader(Participant &part, const char *topicName,
const char *typeName, bool reliable,
ip4_addr_t mcastaddress = {0});


Writer *writerExists(Participant &part, const char *topicName,
const char *typeName, bool reliable);
Reader *readerExists(Participant &part, const char *topicName,
Expand Down
3 changes: 2 additions & 1 deletion include/rtps/entities/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ class ReaderCacheChange {

typedef void (*ddsReaderCallback_fp)(void *callee,
const ReaderCacheChange &cacheChange);

typedef void (*ddsGetKey_Callback_fp)(const uint8_t *data, uint32_t data_len, InstanceHandle_t &key);
class Reader {
public:
TopicData m_attributes;
virtual void newChange(const ReaderCacheChange &cacheChange) = 0;
virtual void registerCallback(ddsReaderCallback_fp cb, void *callee) = 0;
virtual void registerKeyCallback(ddsGetKey_Callback_fp cb) = 0;
virtual bool onNewHeartbeat(const SubmessageHeartbeat &msg,
const GuidPrefix_t &remotePrefix) = 0;
virtual bool addNewMatchedWriter(const WriterProxy &newProxy) = 0;
Expand Down
6 changes: 5 additions & 1 deletion include/rtps/entities/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ template <class NetworkDriver> class StatefulReaderT final : public Reader {
void init(const TopicData &attributes, NetworkDriver &driver);
void newChange(const ReaderCacheChange &cacheChange) override;
void registerCallback(ddsReaderCallback_fp cb, void *callee) override;
void registerKeyCallback(ddsGetKey_Callback_fp cb) override;
bool addNewMatchedWriter(const WriterProxy &newProxy) override;
void removeWriter(const Guid_t &guid) override;
void removeWriterOfParticipant(const GuidPrefix_t &guidPrefix) override;
Expand All @@ -51,10 +52,13 @@ template <class NetworkDriver> class StatefulReaderT final : public Reader {
PacketInfo
m_packetInfo; // TODO intended for reuse but buffer not used as such
NetworkDriver *m_transport;

TopicKind_t m_kind = TopicKind_t::NO_KEY;
ddsReaderCallback_fp m_callback = nullptr;
ddsGetKey_Callback_fp m_KeyCallback = nullptr;
void *m_callee = nullptr;
sys_mutex_t m_mutex;
bool isOwner(InstanceHandle_t &handle, WriterProxy *proxy);
MemoryPool<Instance_t, rtps::Config::MAX_NUMBER_INSTANCE> m_instances;
};

using StatefulReader = StatefulReaderT<UdpDriver>;
Expand Down
74 changes: 73 additions & 1 deletion include/rtps/entities/StatefulReader.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,49 @@ void StatefulReaderT<NetworkDriver>::init(const TopicData &attributes,

return;
}

if(attributes.endpointGuid.entityId.entityKind == EntityKind_t::USER_DEFINED_READER_WITH_KEY){
m_kind = TopicKind_t::WITH_KEY;
}

m_attributes = attributes;
m_transport = &driver;
m_packetInfo.srcPort = attributes.unicastLocator.port;
m_is_initialized_ = true;
}

template <class NetworkDriver>
bool StatefulReaderT<NetworkDriver>::isOwner(InstanceHandle_t &handle, WriterProxy *proxy){
for(auto &instance : m_instances){
if(instance.handle == handle){
if(instance.owner == nullptr){
instance.owner = proxy;
return true;
}
if(instance.owner == proxy){
return true;
}
else{
if(proxy->ownershipStrength < instance.owner->ownershipStrength){
return false;
}
else if(proxy->ownershipStrength > instance.owner->ownershipStrength){
instance.owner = proxy;
return true;
}
else{//equal strength , just pick the first one
return false;
}
}
}
}
Instance_t instance;
instance.owner = proxy;
instance.handle = handle;
m_instances.add(instance);
return true;
}

template <class NetworkDriver>
void StatefulReaderT<NetworkDriver>::newChange(
const ReaderCacheChange &cacheChange) {
Expand All @@ -75,8 +112,17 @@ void StatefulReaderT<NetworkDriver>::newChange(
for (auto &proxy : m_proxies) {
if (proxy.remoteWriterGuid == cacheChange.writerGuid) {
if (proxy.expectedSN == cacheChange.sn) {
m_callback(m_callee, cacheChange);
++proxy.expectedSN;
if(m_attributes.ownership_Kind == OwnershipKind_t::EXCLUSIVE) {
InstanceHandle_t handle;
m_KeyCallback(cacheChange.getData(), cacheChange.getDataSize(), handle);
if (isOwner(handle, &proxy)) { //
m_callback(m_callee, cacheChange);
}
}
else{
m_callback(m_callee, cacheChange);
}
return;
}
}
Expand All @@ -95,6 +141,16 @@ void StatefulReaderT<NetworkDriver>::registerCallback(ddsReaderCallback_fp cb,
}
}

template <class NetworkDriver>
void StatefulReaderT<NetworkDriver>::registerKeyCallback(ddsGetKey_Callback_fp cb){
if(cb != nullptr){
m_KeyCallback = cb;
}
else{
SFR_LOG("Passed Key callback is nullptr\n");
}
}

template <class NetworkDriver>
bool StatefulReaderT<NetworkDriver>::addNewMatchedWriter(
const WriterProxy &newProxy) {
Expand All @@ -109,6 +165,14 @@ bool StatefulReaderT<NetworkDriver>::addNewMatchedWriter(
template <class NetworkDriver>
void StatefulReaderT<NetworkDriver>::removeWriter(const Guid_t &guid) {
Lock lock(m_mutex);
auto isElementToRemove_Instance = [&](const Instance_t &instance){
return instance.owner->remoteWriterGuid == guid;
};
auto thunk_instance = [](void *arg, const Instance_t &value) {
return (*static_cast<decltype(isElementToRemove_Instance) *>(arg))(value);
};
m_instances.remove(thunk_instance, &isElementToRemove_Instance);

auto isElementToRemove = [&](const WriterProxy &proxy) {
return proxy.remoteWriterGuid == guid;
};
Expand All @@ -123,6 +187,14 @@ template <class NetworkDriver>
void StatefulReaderT<NetworkDriver>::removeWriterOfParticipant(
const GuidPrefix_t &guidPrefix) {
Lock lock(m_mutex);
auto isElementToRemove_Instance = [&](const Instance_t &instance){
return instance.owner->remoteWriterGuid.prefix == guidPrefix;
};
auto thunk_instance = [](void *arg, const Instance_t &value) {
return (*static_cast<decltype(isElementToRemove_Instance) *>(arg))(value);
};
m_instances.remove(thunk_instance, &isElementToRemove_Instance);

auto isElementToRemove = [&](const WriterProxy &proxy) {
return proxy.remoteWriterGuid.prefix == guidPrefix;
};
Expand Down
10 changes: 8 additions & 2 deletions include/rtps/entities/StatefulWriter.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ bool StatefulWriterT<NetworkDriver>::sendData(
info.destAddr = locator.getIp4Address();
info.destPort = (Ip4Port_t)locator.port;


{
Lock lock(m_mutex);
const CacheChange *next = m_history.getChangeBySN(snMissing);
Expand All @@ -358,9 +359,14 @@ bool StatefulWriterT<NetworkDriver>::sendData(

return false;
}
bool inlineQoS = false;
// if(m_attributes.ownership_Kind == OwnershipKind_t::EXCLUSIVE){
// inlineQoS = true;
//}
MessageFactory::addSubMessageData(
info.buffer, next->data, false, next->sequenceNumber,
m_attributes.endpointGuid.entityId, reader.remoteReaderGuid.entityId);
info.buffer, next->data, inlineQoS, next->sequenceNumber,
m_attributes.endpointGuid.entityId, reader.remoteReaderGuid.entityId, m_attributes.ownership_Kind, m_attributes.ownership_strenght
);
}

m_transport->sendPacket(info);
Expand Down
6 changes: 5 additions & 1 deletion include/rtps/entities/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ class StatelessReader final : public Reader {
bool addNewMatchedWriter(const WriterProxy &newProxy) override;
void removeWriter(const Guid_t &guid) override;
void removeWriterOfParticipant(const GuidPrefix_t &guidPrefix) override;

void registerKeyCallback(ddsGetKey_Callback_fp cb) override;
private:
sys_mutex_t m_mutex;
ddsReaderCallback_fp m_callback = nullptr;
ddsGetKey_Callback_fp m_keyCallback = nullptr;
void *m_callee = nullptr;
MemoryPool<Instance_t, rtps::Config::MAX_NUMBER_INSTANCE> m_instances;

bool isOwner(InstanceHandle_t &handle, WriterProxy *proxy);
};

} // namespace rtps
Expand Down
1 change: 0 additions & 1 deletion include/rtps/entities/StatelessWriter.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ Author: i11 - Embedded Software, RWTH Aachen University
#include "rtps/utils/udpUtils.h"

using rtps::CacheChange;
using rtps::SequenceNumber_t;
using rtps::StatelessWriterT;

#if SLW_VERBOSE && RTPS_GLOBAL_VERBOSE
Expand Down
Loading