Skip to content

Commit

Permalink
Testing out a sub pub zmq proxy. The proxy subscribes to a publisher …
Browse files Browse the repository at this point in the history
…and then publishes to subscribers.
  • Loading branch information
0gap committed Dec 12, 2021
1 parent 8fa9ca3 commit 18ae3d1
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 0 deletions.
14 changes: 14 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ add_library(flat_msg OBJECT lib/LogEntrySerializer.cc lib/LogEntrySerializer.h
)
add_dependencies(flat_msg flat_compiled_headers)

add_library(zmq_components OBJECT
lib/publisher.h
lib/subscriber.h
lib/pub_sub_proxy.h
lib/binding.h)
target_include_directories(zmq_components PRIVATE lib)
set_target_properties(zmq_components PROPERTIES LINKER_LANGUAGE CXX)

add_executable(msg_logging main.cpp)
add_dependencies(msg_logging flat_msg)
target_include_directories(msg_logging BEFORE PRIVATE lib)
Expand All @@ -46,4 +54,10 @@ add_executable(zmq_server
zmq_consumer/zmq_consumer.cc)
target_link_libraries(zmq_server ${CONAN_LIBS} flat_msg)

add_executable(try_pub_sub_zmq_proxy
pub_sub_zmq_proxy/pub_sub_zmq_proxy_main.cc)
target_include_directories(try_pub_sub_zmq_proxy PRIVATE lib)
target_link_libraries(try_pub_sub_zmq_proxy ${CONAN_LIBS} flat_msg zmq_components)


set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin/)
60 changes: 60 additions & 0 deletions lib/binding.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//
// Created by zerogap on 12/12/21.
//

#ifndef MSG_TEST_BINDING_H
#define MSG_TEST_BINDING_H

#include <iostream>
#include <vector>
#include <absl/strings/str_split.h>
#include <absl/strings/str_cat.h>
#include <glog/logging.h>

struct Binding
{
std::string protocol;
std::string host;
int64_t port;
std::string full_string;
std::string starred_string;
explicit Binding(std::string b)
{
auto it = std::remove(b.begin(), b.end(), '/');
b.erase(it, b.end());
std::vector<std::string> parts = absl::StrSplit(b, ':');

protocol = parts[0];
host = parts[1];
LOG(INFO) << "Creating binding from " << b;
if (parts.size() > 2)
{
LOG(INFO) << "Port string: " << parts[2];
std::stringstream s(parts[2]);
s >> port;
LOG(INFO) << "To finally get: " << protocol << "://" << host << ":"
<< port;
full_string = protocol + "://" + host + ":" + absl::StrCat(port);
starred_string = protocol + "://*:" + absl::StrCat(port);
}
else
{
full_string = protocol + "://" + host;
starred_string = protocol + "://*";
LOG(INFO) << "To finally get: " << protocol << "://" << host;
}
}
[[nodiscard]] std::string star_host() const
{
// return protocol + "://*:" + std::to_string(port);
return starred_string;
}

operator std::string() const
{
return full_string;
// return protocol + "://" + host + ":" + absl::StrCat(port);
}
};

#endif//MSG_TEST_BINDING_H
89 changes: 89 additions & 0 deletions lib/pub_sub_proxy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//
// Created by zerogap on 12/12/21.
//

#ifndef MSG_TEST_PUB_SUB_PROXY_H
#define MSG_TEST_PUB_SUB_PROXY_H

#include "binding.h"

struct PubSubProxy
{
const Binding frontend_binding_;
const Binding backend_binding_;
const Binding control_binding_;

zmq::socket_ref front_sock_ref_;
zmq::context_t* ctx_;

std::thread work_thread_;

PubSubProxy(Binding frontend_connection, Binding backend_binding,
Binding control_bindind, zmq::context_t* ctx)
: frontend_binding_(frontend_connection),
backend_binding_(backend_binding),
control_binding_(control_bindind),
ctx_{ ctx },
work_thread_(&PubSubProxy::run, this)
{
}
~PubSubProxy()
{
ctx_->shutdown();
work_thread_.join();
}
void stop()
{
zmq::context_t ctx(1);
zmq::socket_t control_sock(ctx, zmq::socket_type::push);
control_sock.connect(control_binding_);

const std::string terminate_com = "TERMINATE";
if (control_sock.send(zmq::const_buffer(zmq::buffer(terminate_com)),
zmq::send_flags::dontwait))
{
LOG(INFO) << "Control socket in proxy stop was sent correctly";
}
else
{
LOG(INFO) << "Control socket in proxy stop was NOT sent";
}
}
void run()
{
zmq::socket_t frontend(*ctx_, zmq::socket_type::sub);
zmq::socket_t backend(*ctx_, zmq::socket_type::pub);
zmq::socket_t control_sock(*ctx_, zmq::socket_type::pull);
try
{
front_sock_ref_ = zmq::socket_ref(frontend);
frontend.connect(frontend_binding_);
frontend.set(zmq::sockopt::subscribe, "");
}
catch (const zmq::error_t& e)
{
LOG(INFO) << "frontend proxy, num: " << e.num() << " msg : " << e.what();
}
try
{
backend.bind(backend_binding_.star_host());
}
catch (const zmq::error_t& e)
{
LOG(INFO) << "backend proxy, num: " << e.num() << " msg : " << e.what();
}
try
{
control_sock.bind(control_binding_.star_host());
zmq::proxy_steerable(zmq::socket_ref(frontend), zmq::socket_ref(backend),
nullptr, zmq::socket_ref(control_sock));
}
catch (const zmq::error_t& e)
{
LOG(INFO) << "proxy, num: " << e.num() << " msg : " << e.what();
}
LOG(INFO) << "proxy terminated";
}
};

#endif//MSG_TEST_PUB_SUB_PROXY_H
84 changes: 84 additions & 0 deletions lib/publisher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// Created by zerogap on 12/12/21.
//

#ifndef MSG_TEST_PUBLISHER_H
#define MSG_TEST_PUBLISHER_H

#include <LogEntrySerializer.h>

#include <atomic>
#include <thread>
#include <zmq.hpp>
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
#define __CODE_POINT__ (__FILE__ "::" STR(__LINE__))

struct Publisher
{
std::atomic_bool terminate;
int id_;
std::thread worker_thread_;
zmq::context_t *ctx_;
std::string binding_server_;

Publisher(int id, std::string host)
: terminate{false},
id_{id},
worker_thread_(&Publisher::run, this),
binding_server_{host}
{
}

~Publisher()
{
stop();
terminate.store(true);
worker_thread_.join();
ctx_->close();
LOG(INFO) << "Publisher " << id_ << " terminating";
}
void stop()
{
if (!terminate)
{
terminate.store(true);
ctx_->shutdown();
}
}
void run()
{
LOG(INFO) << "Publisher starting";
zmq::context_t ctx(1);
ctx_ = &ctx;
zmq::socket_t publisher_socket(ctx, zmq::socket_type::pub);
// Remember, you cannot have multiple binds to a port
std::this_thread::sleep_for(std::chrono::seconds(3));
publisher_socket.bind(binding_server_);//"tcp://*:9999");
LOG(INFO) << "Publisher " << id_ << " running";
int c{0};
try
{
while (!terminate)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
LogEntrySerializer log_to_send(Logger::LogLevel::INSANE, __CODE_POINT__,
"hello from msg " + std::to_string(c),
"publisher " + std::to_string(id_),
"hostname" + std::to_string(id_));
log_to_send.export_fb();
publisher_socket.send(zmq::const_buffer(zmq::buffer(log_to_send.get_data())),
zmq::send_flags::dontwait);
++c;
}
}
catch (zmq::error_t &e)
{
LOG(INFO) << "error from publisher " << id_ << ", num: " << e.num()
<< " msg : " << e.what();
}
LOG(INFO) << "publisher " << id_ << " terminated work loop";
}
};

#endif//MSG_TEST_PUBLISHER_H
83 changes: 83 additions & 0 deletions lib/subscriber.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Created by zerogap on 12/12/21.
//

#ifndef MSG_TEST_SUBSCRIBER_H
#define MSG_TEST_SUBSCRIBER_H

#include <LogEntrySerializer.h>

#include <atomic>
#include <thread>
#include <utility>
#include <zmq.hpp>

struct Subscriber
{
std::atomic_bool terminate;
int id_;
std::thread consuming_thread_;
zmq::socket_ref subscriber_socket_;
zmq::context_t *ctx_;
std::string connecting_server_;

Subscriber(int id, std::string host)
: terminate{ false },
id_{ id },
consuming_thread_(&Subscriber::run, this),
connecting_server_{ std::move(host) }
{
}
~Subscriber()
{
stop();
LOG(INFO) << "Subscriber " << id_ << " deleted";
terminate.store(true);
consuming_thread_.join();
ctx_->close();
}
void stop()
{
if (!terminate)
{
terminate.store(true);
subscriber_socket_.disconnect(connecting_server_);
ctx_->shutdown();
}
}
void run()
{
zmq::context_t ctx(1);
try
{
zmq::socket_t subscriber_socket(ctx, zmq::socket_type::sub);
ctx_ = &ctx;
subscriber_socket_ = zmq::socket_ref(subscriber_socket);
std::this_thread::sleep_for(std::chrono::seconds(2));

subscriber_socket.connect(connecting_server_);
subscriber_socket.set(zmq::sockopt::subscribe, "");
LOG(INFO) << "Subscriber " << id_ << " running";
zmq::message_t body;
while (!terminate)
{
auto s = subscriber_socket.recv(body);
if (s)
{
auto log_entry = Logger::GetLogEntry(body.data());
LOG(INFO) << "Subscriber " << id_ << " received a log entry \n\t"
<< log_entry->log_msg()->str() << " from service "
<< log_entry->srvc_name()->str();
}
}
}
catch (zmq::error_t &e)
{
LOG(INFO) << "error from subscriber " << id_ << ", num: " << e.num()
<< " msg : " << e.what();
}
LOG(INFO) << "subscriber " << id_ << " terminated work loop";
}
};

#endif//MSG_TEST_SUBSCRIBER_H
36 changes: 36 additions & 0 deletions pub_sub_zmq_proxy/pub_sub_zmq_proxy_main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Created by zerogap on 12/12/21.
//

#include <glog/logging.h>

#include <thread>
#include <zmq.hpp>

#include "publisher.h"
#include "subscriber.h"
#include "pub_sub_proxy.h"

int main()
{
zmq::context_t ctx(1);
PubSubProxy proxy(Binding("tcp://127.0.0.1:9999"),
Binding("tcp://127.0.0.1:8888"),
Binding("tcp://127.0.0.1:8890"), &ctx);
std::this_thread::sleep_for(std::chrono::seconds (1));
Subscriber sub1(1, "tcp://127.0.0.1:8888");
Subscriber sub2(2, "tcp://127.0.0.1:8888");

// Just, give everyone a bit of time to settle until you start publishing
std::this_thread::sleep_for(std::chrono::seconds(2));
{
Publisher pub(1, "tcp://*:9999");
std::this_thread::sleep_for(std::chrono::seconds(6));
pub.stop();
}
sub1.stop();
sub2.stop();
std::this_thread::sleep_for(std::chrono::seconds(6));
LOG(INFO) << "Shutting down";
proxy.stop();
}

0 comments on commit 18ae3d1

Please sign in to comment.