Skip to content

Commit

Permalink
MDEV-26851 Add interface to monitor connections in Galera
Browse files Browse the repository at this point in the history
  • Loading branch information
janlindstrom committed Oct 14, 2024
1 parent 3f19350 commit 1df9e8a
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 7 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Authors from Codership Oy:
* Daniele Sciascia <[email protected]>, Codership Oy
* Philip Stoev <[email protected]>, Codership Oy
* Mario Karuza <[email protected]>, Codership Oy
* Jan Lindström <[email protected]>, Codership Oy
[Codership employees, add name and email/username above this line, but leave this line intact]

Other contributors:
Expand Down
2 changes: 2 additions & 0 deletions galera/src/galera-sym.map
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
wsrep_init_config_service_v1;
wsrep_deinit_config_service_v1;
wsrep_node_isolation_mode_set_v1;
wsrep_init_connection_monitor_service_v1;
wsrep_deinit_connection_monitor_service_v1;

local: *;
};
13 changes: 12 additions & 1 deletion galera/src/wsrep_provider.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2010-2021 Codership Oy <[email protected]>
// Copyright (C) 2010-2024 Codership Oy <[email protected]>
//

#include "wsrep_api.h"
Expand All @@ -23,6 +23,7 @@
#include "gu_event_service.hpp"
#include "wsrep_config_service.h"
#include "wsrep_node_isolation.h"
#include "wsrep_connection_monitor_service.h"

#include <cassert>

Expand Down Expand Up @@ -1941,4 +1942,14 @@ wsrep_node_isolation_mode_set_v1(enum wsrep_node_isolation_mode mode)
return WSREP_NODE_ISOLATION_SUCCESS;
}

extern "C"
int wsrep_init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t *connection_monitor_service)
{
return gu::init_connection_monitor_service_v1(connection_monitor_service);
}

extern "C" void wsrep_deinit_connection_monitor_service_v1()
{
gu::deinit_connection_monitor_service_v1();
}

101 changes: 100 additions & 1 deletion galerautils/src/gu_asio.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2014-2020 Codership Oy <[email protected]>
// Copyright (C) 2014-2024 Codership Oy <[email protected]>
//

#include "gu_config.hpp"
Expand Down Expand Up @@ -53,6 +53,8 @@ static wsrep_tls_service_v1_t* gu_tls_service(0);

static wsrep_allowlist_service_v1_t* gu_allowlist_service(0);

static wsrep_connection_monitor_service_v1_t* gu_connection_monitor_service(0);

//
// AsioIpAddress wrapper
//
Expand Down Expand Up @@ -955,3 +957,100 @@ void gu::deinit_allowlist_service_v1()
std::atomic<enum wsrep_node_isolation_mode> gu::gu_asio_node_isolation_mode{
WSREP_NODE_ISOLATION_NOT_ISOLATED
};

//
// ConnectionMonitor
//

static std::mutex gu_connection_monitor_service_init_mutex;
static size_t gu_connection_monitor_service_usage;

int gu::init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t* connection_monitor)
{
log_info << "init_connection_monitor_service_v1";
std::lock_guard<std::mutex> lock(gu_connection_monitor_service_init_mutex);
++gu_connection_monitor_service_usage;
if (gu_connection_monitor_service)
{
assert(gu_connection_monitor_service == connection_monitor);
return 0;
}
gu_connection_monitor_service = connection_monitor;
return 0;
}

void gu::deinit_connection_monitor_service_v1()
{
log_info << "deinit_connection_monitor_service_v1";
std::lock_guard<std::mutex> lock(gu_connection_monitor_service_init_mutex);
assert(gu_connection_monitor_service_usage > 0);
--gu_connection_monitor_service_usage;
if (gu_connection_monitor_service_usage == 0)
gu_connection_monitor_service = 0;
}

void gu::connection_monitor_connect(wsrep_connection_key_t id,
const std::string& scheme,
const std::string& local_addr,
const std::string& remote_uuid,
const std::string& remote_addr)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

// tcp://127.0.0.1:19006 --> 127.0.0.1:19006
std::string ra = remote_addr.substr(6, remote_addr.length());
std::string la = local_addr.substr(6, local_addr.length());

wsrep_buf_t const remote = {remote_uuid.c_str(), remote_uuid.length() };
wsrep_buf_t const lscheme = {scheme.c_str(), scheme.length() };
wsrep_buf_t const raddr = {ra.c_str(), ra.length() };
wsrep_buf_t const laddr = {la.c_str(), la.length() };

gu_connection_monitor_service->connection_monitor_connect_cb(
gu_connection_monitor_service->context,
id,
&lscheme,
&laddr,
&remote,
&raddr);
}

void gu::connection_monitor_disconnect(wsrep_connection_key_t id)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

gu_connection_monitor_service->connection_monitor_disconnect_cb(
gu_connection_monitor_service->context,
id);
}

void gu::connection_monitor_ssl_info(wsrep_connection_key_t id,
const std::string& chipher,
const std::string& issuer,
const std::string& subject,
const std::string& version)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

wsrep_buf_t const ch = {chipher.c_str(), chipher.length() };
wsrep_buf_t const iss = {issuer.c_str(), issuer.length() };
wsrep_buf_t const sub = {subject.c_str(), subject.length() };
wsrep_buf_t const vers = {version.c_str(), version.length() };

gu_connection_monitor_service->connection_monitor_ssl_info_cb(
gu_connection_monitor_service->context,
id,
&ch,
&iss,
&sub,
&vers);
}
18 changes: 18 additions & 0 deletions galerautils/src/gu_asio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "wsrep_tls_service.h"
#include "wsrep_allowlist_service.h"
#include "wsrep_node_isolation.h"
#include "wsrep_connection_monitor_service.h"

#include <netinet/tcp.h> // tcp_info

Expand Down Expand Up @@ -808,6 +809,23 @@ namespace gu
extern std::atomic<enum wsrep_node_isolation_mode>
gu_asio_node_isolation_mode;

/* Init/deinit global connection monitoring service hooks */
int init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t*);
void deinit_connection_monitor_service_v1();
/* Connection monitor connect callback */
void connection_monitor_connect(wsrep_connection_key_t id,
const std::string& scheme,
const std::string& local_addr,
const std::string& remote_uuid,
const std::string& remote_addr);
/* Connection monitor disconnect callback */
void connection_monitor_disconnect(wsrep_connection_key_t id);
/* Connection monitor ssl info callback */
void connection_monitor_ssl_info(wsrep_connection_key_t id,
const std::string& chipher,
const std::string& issuer,
const std::string& subject,
const std::string& version);
}

#endif // GU_ASIO_HPP
27 changes: 26 additions & 1 deletion galerautils/src/gu_asio_stream_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class AsioTcpStreamEngine : public gu::AsioStreamEngine
{
return gu::AsioErrorCode(last_error_, gu_asio_system_category);
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}

private:
void clear_error() { last_error_ = 0; }
int fd_;
Expand All @@ -99,7 +103,7 @@ class AsioTcpStreamEngine : public gu::AsioStreamEngine
#ifdef GALERA_HAVE_SSL

#include <openssl/ssl.h>

#include <openssl/x509.h>
#if OPENSSL_VERSION_NUMBER >= 0x1010100fL
#define HAVE_READ_EX
#define HAVE_WRITE_EX
Expand Down Expand Up @@ -187,6 +191,21 @@ class AsioSslStreamEngine : public gu::AsioStreamEngine
last_verify_error_);
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE
{
clear_error();
chipher = SSL_get_cipher(ssl_);
X509 *ssl_cert = SSL_get_peer_certificate(ssl_);
if (ssl_cert != nullptr)
{
subject = X509_NAME_oneline(X509_get_subject_name(ssl_cert), 0, 0);
issuer = X509_NAME_oneline(X509_get_issuer_name(ssl_cert), 0, 0);
X509_free(ssl_cert);
}
version = SSL_get_version(ssl_);
}

private:
void clear_error()
{
Expand Down Expand Up @@ -560,6 +579,9 @@ class AsioDynamicStreamEngine : public gu::AsioStreamEngine
return engine_->last_error();
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}

private:
bool socket_poll(long msec)
{
Expand Down Expand Up @@ -687,6 +709,9 @@ class AsioWsrepStreamEngine : public gu::AsioStreamEngine
return gu::AsioErrorCode(last_error_value_, last_error_category_,
&stream_);
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}
private:

enum op_status map_status(enum wsrep_tls_result status)
Expand Down
6 changes: 6 additions & 0 deletions galerautils/src/gu_asio_stream_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ namespace gu
AsioIoService&, const std::string& scheme, int fd,
bool non_blocking);

/**
* Fetch SSL/TLS information
*/
virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) = 0;

protected:
AsioStreamEngine() { }
};
Expand Down
42 changes: 42 additions & 0 deletions galerautils/src/gu_asio_stream_react.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ void gu::AsioStreamReact::close() try
{
GU_ASIO_DEBUG(debug_print() << "Socket not open on close");
}

gu::connection_monitor_disconnect((wsrep_connection_key_t)this);
socket_.close();
}
// Catch all the possible exceptions here, not only asio ones.
Expand Down Expand Up @@ -198,11 +200,32 @@ void gu::AsioStreamReact::connect(const gu::URI& uri) try
socket_.connect(resolve_result->endpoint());
connected_ = true;
prepare_engine(false);
assign_addresses();

auto result(engine_->client_handshake());
switch (result)
{
case AsioStreamEngine::success:
{
gu::connection_monitor_connect((wsrep_connection_key_t)this,
scheme_,
local_addr_,
"", // remote uuid
remote_addr_);
#ifdef GALERA_HAVE_SSL
if (engine_->scheme() == gu::scheme::ssl)
{
std::string chipher, issuer, subject, version;
engine_->get_SSL_info(chipher, issuer, subject, version);
gu::connection_monitor_ssl_info((wsrep_connection_key_t)this,
chipher,
issuer,
subject,
version);
}
#endif
return;
}
case AsioStreamEngine::want_read:
case AsioStreamEngine::want_write:
case AsioStreamEngine::eof:
Expand Down Expand Up @@ -389,6 +412,25 @@ void gu::AsioStreamReact::connect_handler(
set_socket_options(socket_);
prepare_engine(true);
assign_addresses();

gu::connection_monitor_connect((wsrep_connection_key_t)this,
scheme_,
local_addr_,
"", // remote uuid
remote_addr_);

#ifdef GALERA_HAVE_SSL
if (engine_->scheme() == gu::scheme::ssl)
{
std::string chipher, issuer, subject, version;
engine_->get_SSL_info(chipher, issuer, subject, version);
gu::connection_monitor_ssl_info((wsrep_connection_key_t)this,
chipher,
issuer,
subject,
version);
}
#endif
GU_ASIO_DEBUG(debug_print()
<< " AsioStreamReact::connect_handler: init handshake");
auto result(engine_->client_handshake());
Expand Down
3 changes: 3 additions & 0 deletions galerautils/tests/gu_asio_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class MockStreamEngine : public gu::AsioStreamEngine
}
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}

enum op_status next_result;
int next_error;
size_t count_client_handshake_called;
Expand Down
2 changes: 1 addition & 1 deletion gcomm/src/asio_tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class gcomm::AsioTcpSocket :
virtual std::string local_addr() const GALERA_OVERRIDE;
virtual std::string remote_addr() const GALERA_OVERRIDE;
virtual State state() const GALERA_OVERRIDE { return state_; }
virtual SocketId id() const GALERA_OVERRIDE { return &socket_; }
virtual SocketId id() const GALERA_OVERRIDE { return socket_.get(); }
virtual SocketStats stats() const GALERA_OVERRIDE;
private:
// AsioSocketHandler interface
Expand Down
21 changes: 20 additions & 1 deletion gcomm/src/gmcast.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2009-2019 Codership Oy <[email protected]>
* Copyright (C) 2009-2024 Codership Oy <[email protected]>
*/

#include "gmcast.hpp"
Expand Down Expand Up @@ -584,6 +584,16 @@ void gcomm::GMCast::gmcast_connect(const std::string& remote_addr)
segment_,
group_name_);

std::ostringstream os;
os << peer->remote_uuid().full_str();
log_info << ":::JAN:::gcomm::GMCast::gmcast_connect";
gu::connection_monitor_connect((wsrep_connection_key_t)tp->id(),
get_scheme(pnet_, use_ssl_, dynamic_socket_),
peer->local_addr(),
os.str(),
remote_addr);


std::pair<ProtoMap::iterator, bool> ret =
proto_map_->insert(std::make_pair(tp->id(), peer));

Expand Down Expand Up @@ -676,6 +686,15 @@ void gcomm::GMCast::handle_established(Proto* est)
// UUID checks are handled during protocol handshake
assert(est->remote_uuid() != uuid());

std::ostringstream os;
os << est->remote_uuid().full_str();
log_info << ":::JAN::: gcomm::GMCast::handle_established";
gu::connection_monitor_connect((wsrep_connection_key_t)est->socket()->id(),
get_scheme(pnet_, use_ssl_, dynamic_socket_),
est->local_addr(),
os.str(),
est->remote_addr());

if (is_evicted(est->remote_uuid()))
{
log_warn << "Closing connection to evicted node " << est->remote_uuid();
Expand Down
Loading

0 comments on commit 1df9e8a

Please sign in to comment.