Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntcore] Simplify local startup #4803

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions ntcore/src/main/native/cpp/LocalStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ void SubscriberData::UpdateActive() {
}

void LSImpl::NotifyTopic(TopicData* topic, unsigned int eventFlags) {
DEBUG4("NotifyTopic({}, {})\n", topic->name, eventFlags);
DEBUG4("NotifyTopic({}, {})", topic->name, eventFlags);
auto topicInfo = topic->GetTopicInfo();
if (!topic->listeners.empty()) {
m_listenerStorage.Notify(topic->listeners, eventFlags, topicInfo);
Expand Down Expand Up @@ -1380,36 +1380,37 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
}
}

void LocalStorage::StartNetwork(net::NetworkStartupInterface& startup,
net::NetworkInterface* network) {
void LocalStorage::StartNetwork(net::NetworkInterface* network) {
WPI_DEBUG4(m_impl->m_logger, "StartNetwork()");
std::scoped_lock lock{m_mutex};
m_impl->m_network = network;
// publish all active publishers to the network and send last values
// only send value once per topic
for (auto&& topic : m_impl->m_topics) {
PublisherData* anyPublisher = nullptr;
for (auto&& publisher : topic->localPublishers) {
if (publisher->active) {
startup.Publish(publisher->handle, topic->handle, topic->name,
topic->typeStr, topic->properties, publisher->config);
network->Publish(publisher->handle, topic->handle, topic->name,
topic->typeStr, topic->properties, publisher->config);
anyPublisher = publisher;
}
}
if (anyPublisher && topic->lastValue) {
startup.SetValue(anyPublisher->handle, topic->lastValue);
network->SetValue(anyPublisher->handle, topic->lastValue);
}
}
for (auto&& subscriber : m_impl->m_subscribers) {
startup.Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
network->Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
}
for (auto&& subscriber : m_impl->m_multiSubscribers) {
startup.Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
network->Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
}
m_impl->m_network = network;
}

void LocalStorage::ClearNetwork() {
WPI_DEBUG4(m_impl->m_logger, "ClearNetwork()");
std::scoped_lock lock{m_mutex};
m_impl->m_network = nullptr;
// treat as an unannounce all from the network side
Expand Down
3 changes: 1 addition & 2 deletions ntcore/src/main/native/cpp/LocalStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class LocalStorage final : public net::ILocalStorage {
bool ack) final;
void NetworkSetValue(NT_Topic topicHandle, const Value& value) final;

void StartNetwork(net::NetworkStartupInterface& startup,
net::NetworkInterface* network) final;
void StartNetwork(net::NetworkInterface* network) final;
void ClearNetwork() final;

// User functions. These are the actual implementations of the corresponding
Expand Down
13 changes: 5 additions & 8 deletions ntcore/src/main/native/cpp/NetworkClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,9 @@ void NCImpl3::TcpConnected(uv::Tcp& tcp) {
}
});

{
net3::ClientStartup3 startup{*m_clientImpl};
m_localStorage.StartNetwork(startup, &m_localQueue);
}
m_clientImpl->SetLocal(&m_localStorage);
m_localStorage.StartNetwork(&m_localQueue);
HandleLocal();
});

tcp.SetData(clientImpl);
Expand Down Expand Up @@ -429,11 +427,10 @@ void NCImpl4::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp) {
m_sendValuesTimer->Start(uv::Timer::Time{repeatMs},
uv::Timer::Time{repeatMs});
});
{
net::ClientStartup startup{*m_clientImpl};
m_localStorage.StartNetwork(startup, &m_localQueue);
}
m_clientImpl->SetLocal(&m_localStorage);
m_localStorage.StartNetwork(&m_localQueue);
HandleLocal();
m_clientImpl->SendInitial();
ws.closed.connect([this, &ws](uint16_t, std::string_view reason) {
if (!ws.GetStream().IsLoopClosing()) {
Disconnect(reason);
Expand Down
6 changes: 2 additions & 4 deletions ntcore/src/main/native/cpp/NetworkServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,9 @@ NSImpl::NSImpl(std::string_view persistentFilename,
m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize);
m_loopRunner.ExecAsync([=, this](uv::Loop& loop) {
// connect local storage to server
{
net::ServerStartup startup{m_serverImpl};
m_localStorage.StartNetwork(startup, &m_localQueue);
}
m_serverImpl.SetLocal(&m_localStorage);
m_localStorage.StartNetwork(&m_localQueue);
HandleLocal();

// load persistent file first, then initialize
uv::QueueWork(
Expand Down
78 changes: 35 additions & 43 deletions ntcore/src/main/native/cpp/net/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class CImpl : public ServerMessageHandler {
void HandleLocal(std::vector<ClientMessage>&& msgs);
bool SendControl(uint64_t curTimeMs);
void SendValues(uint64_t curTimeMs);
void SendInitialValues();
bool CheckNetworkReady();

// ServerMessageHandler interface
Expand Down Expand Up @@ -229,7 +230,7 @@ bool CImpl::SendControl(uint64_t curTimeMs) {
}

void CImpl::SendValues(uint64_t curTimeMs) {
DEBUG4("SendPeriodic({})", curTimeMs);
DEBUG4("SendValues({})", curTimeMs);

// can't send value updates until we have a RTT
if (!m_haveTimeOffset) {
Expand Down Expand Up @@ -268,6 +269,36 @@ void CImpl::SendValues(uint64_t curTimeMs) {
}
}

void CImpl::SendInitialValues() {
DEBUG4("SendInitialValues()");

// ensure all control messages are sent ahead of value updates
if (!SendControl(0)) {
return;
}

// only send time=0 values (as we don't have a RTT yet)
auto writer = m_wire.SendBinary();
for (auto&& pub : m_publishers) {
if (pub && !pub->outValues.empty()) {
bool sent = false;
for (auto&& val : pub->outValues) {
if (val.server_time() == 0) {
DEBUG4("Sending {} value time={} server_time={}", pub->handle,
val.time(), val.server_time());
WireEncodeBinary(writer.Add(), Handle{pub->handle}.GetIndex(), 0,
val);
sent = true;
}
}
if (sent) {
std::erase_if(pub->outValues,
[](const auto& v) { return v.server_time() == 0; });
}
}
}
}

bool CImpl::CheckNetworkReady() {
if (!m_wire.Ready()) {
++m_notReadyCount;
Expand Down Expand Up @@ -434,46 +465,7 @@ void ClientImpl::SetLocal(LocalInterface* local) {
m_impl->m_local = local;
}

ClientStartup::ClientStartup(ClientImpl& client)
: m_client{client},
m_binaryWriter{client.m_impl->m_wire.SendBinary()},
m_textWriter{client.m_impl->m_wire.SendText()} {}

ClientStartup::~ClientStartup() {
m_client.m_impl->m_wire.Flush();
}

void ClientStartup::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupPublish({}, {}, {}, {})",
pubHandle, topicHandle, name, typeStr);
m_client.m_impl->Publish(pubHandle, topicHandle, name, typeStr, properties,
options);
WireEncodePublish(m_textWriter.Add(), Handle{pubHandle}.GetIndex(), name,
typeStr, properties);
}

void ClientStartup::Subscribe(NT_Subscriber subHandle,
std::span<const std::string> prefixes,
const PubSubOptionsImpl& options) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSubscribe({})", subHandle);
WireEncodeSubscribe(m_textWriter.Add(), Handle{subHandle}.GetIndex(),
prefixes, options);
}

void ClientStartup::SetValue(NT_Publisher pubHandle, const Value& value) {
WPI_DEBUG4(m_client.m_impl->m_logger, "StartupSetValue({})", pubHandle);
// Similar to Client::SetValue(), except always set lastValue and send
unsigned int index = Handle{pubHandle}.GetIndex();
assert(index < m_client.m_impl->m_publishers.size() &&
m_client.m_impl->m_publishers[index]);
auto& publisher = *m_client.m_impl->m_publishers[index];
// only send time 0 values until we have a RTT
if (value.server_time() == 0) {
WireEncodeBinary(m_binaryWriter.Add(), index, 0, value);
} else {
publisher.outValues.emplace_back(value);
}
void ClientImpl::SendInitial() {
m_impl->SendInitialValues();
m_impl->m_wire.Flush();
}
26 changes: 1 addition & 25 deletions ntcore/src/main/native/cpp/net/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ class Value;
namespace nt::net {

struct ClientMessage;
class ClientStartup;
class WireConnection;

class ClientImpl {
friend class ClientStartup;

public:
ClientImpl(uint64_t curTimeMs, int inst, WireConnection& wire,
wpi::Logger& logger,
Expand All @@ -48,32 +45,11 @@ class ClientImpl {
void SendValues(uint64_t curTimeMs);

void SetLocal(LocalInterface* local);
void SendInitial();

private:
class Impl;
std::unique_ptr<Impl> m_impl;
};

class ClientStartup final : public NetworkStartupInterface {
public:
explicit ClientStartup(ClientImpl& client);
~ClientStartup() final;
ClientStartup(const ClientStartup&) = delete;
ClientStartup& operator=(const ClientStartup&) = delete;

// NetworkStartupInterface interface
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void Subscribe(NT_Subscriber subHandle, std::span<const std::string> prefixes,
const PubSubOptionsImpl& options) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;

private:
ClientImpl& m_client;
BinaryWriter m_binaryWriter;
TextWriter m_textWriter;
};

} // namespace nt::net
19 changes: 7 additions & 12 deletions ntcore/src/main/native/cpp/net/NetworkInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,27 @@ class LocalInterface {
virtual void NetworkSetValue(NT_Topic topicHandle, const Value& value) = 0;
};

class NetworkStartupInterface {
class NetworkInterface {
public:
virtual ~NetworkStartupInterface() = default;
virtual ~NetworkInterface() = default;

virtual void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) = 0;
virtual void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0;
};

class NetworkInterface : public NetworkStartupInterface {
public:
virtual void Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle) = 0;
virtual void SetProperties(NT_Topic topicHandle, std::string_view name,
const wpi::json& update) = 0;
virtual void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) = 0;
virtual void Unsubscribe(NT_Subscriber subHandle) = 0;
virtual void SetValue(NT_Publisher pubHandle, const Value& value) = 0;
};

class ILocalStorage : public LocalInterface {
public:
virtual void StartNetwork(NetworkStartupInterface& startup,
NetworkInterface* network) = 0;
virtual void StartNetwork(NetworkInterface* network) = 0;
virtual void ClearNetwork() = 0;
};

Expand Down
22 changes: 1 addition & 21 deletions ntcore/src/main/native/cpp/net/ServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ class ClientData4Base : public ClientData, protected ClientMessageHandler {
};

class ClientDataLocal final : public ClientData4Base {
friend class net::ServerStartup;

public:
ClientDataLocal(SImpl& server, int id, wpi::Logger& logger)
: ClientData4Base{"", "", "", true, [](uint32_t) {}, server, id, logger} {
Expand Down Expand Up @@ -2300,6 +2298,7 @@ void ServerImpl::HandleLocal(std::span<const ClientMessage> msgs) {
}

void ServerImpl::SetLocal(LocalInterface* local) {
WPI_DEBUG4(m_impl->m_logger, "SetLocal()");
m_impl->m_local = local;

// create server meta topics
Expand Down Expand Up @@ -2362,22 +2361,3 @@ std::string ServerImpl::DumpPersistent() {
std::string ServerImpl::LoadPersistent(std::string_view in) {
return m_impl->LoadPersistent(in);
}

void ServerStartup::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) {
m_server.m_impl->m_localClient->ClientPublish(pubHandle, name, typeStr,
properties);
}

void ServerStartup::Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
m_server.m_impl->m_localClient->ClientSubscribe(subHandle, topicNames,
options);
}

void ServerStartup::SetValue(NT_Publisher pubHandle, const Value& value) {
m_server.m_impl->m_localClient->ClientSetValue(pubHandle, value);
}
21 changes: 0 additions & 21 deletions ntcore/src/main/native/cpp/net/ServerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ namespace nt::net {

struct ClientMessage;
class LocalInterface;
class ServerStartup;
class WireConnection;

class ServerImpl final {
friend class ServerStartup;

public:
using SetPeriodicFunc = std::function<void(uint32_t repeatMs)>;
using Connected3Func =
Expand Down Expand Up @@ -76,22 +73,4 @@ class ServerImpl final {
std::unique_ptr<Impl> m_impl;
};

class ServerStartup final : public NetworkStartupInterface {
public:
explicit ServerStartup(ServerImpl& server) : m_server{server} {}

// NetworkStartupInterface interface
void Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
const PubSubOptionsImpl& options) final;
void Subscribe(NT_Subscriber subHandle,
std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) final;
void SetValue(NT_Publisher pubHandle, const Value& value) final;

private:
ServerImpl& m_server;
};

} // namespace nt::net
Loading