Skip to content

Commit

Permalink
workers_enabled and fifo rc
Browse files Browse the repository at this point in the history
  • Loading branch information
macskas committed Nov 5, 2021
1 parent 7d6b597 commit 22e0686
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 39 deletions.
7 changes: 5 additions & 2 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

#define AUTHOR "macskas"
#include <cstring>
extern "C" {
// needed for the WORKERS_ENABLED
#include <sys/socket.h>
#include <event2/listener.h>
};

#if (__GNUC__ && __GNUC_MINOR__ && __GNUC_PATCHLEVEL__)
#define GCC_VERSION (__GNUC__ * 10000 \
Expand Down Expand Up @@ -49,8 +54,6 @@

#if defined(LEV_OPT_REUSEABLE_PORT) && defined(SO_REUSEPORT)
#define WORKERS_ENABLED 1
#else
#define WORKERS_ENABLED 0
#endif

#ifndef LEV_OPT_REUSEABLE_PORT
Expand Down
9 changes: 9 additions & 0 deletions config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,20 @@ int config::check_config() {
}
}

#ifndef WORKERS_ENABLED
if (nsca_workers != 1) {
nsca_workers = 1;
warning_sprintf("[config] nsca_workers not supported by the kernel or the libevent library, new value=%d", nsca_workers);
this->Set("nsca_workers", "1");
}
#endif

if (nsca_workers <= 0 || nsca_workers > 2000) {
warning_sprintf("[config] nsca_workers(%d) <= 0 || > 2000, new value=%d", nsca_workers, 4);
nsca_workers = 4;
this->Set("nsca_workers", "4");
}

if (nsca_threads_per_worker < 0) {
warning_sprintf("[config] nsca_threads_per_worker < 0 (%d), new value=%d", nsca_threads_per_worker, 4);
this->Set("nsca_threads_per_worker", "4");
Expand Down
5 changes: 3 additions & 2 deletions crypt_thread_t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
#include "log.h"
#include "crypt_thread_t.h"

crypt_thread_t::crypt_thread_t() {
crypt_thread_t::crypt_thread_t(int myTheadId) {
this->shutdown_requested = false;
this->myThread = new std::thread(crypt_thread_t::loop_proxy, this);
this->thread_id = myTheadId;
}

crypt_thread_t::~crypt_thread_t() {
Expand Down Expand Up @@ -56,7 +57,7 @@ void crypt_thread_t::loop() {
if (queueItem.method == THREADMANAGER_METHOD_DECRYPT_PACKET) {
if (this->CI) {
queueItem.networkClient->set_CI(this->CI);
queueItem.networkClient->process_queue();
queueItem.networkClient->process_queue(this->thread_id);
}
}
mtx.lock();
Expand Down
3 changes: 2 additions & 1 deletion crypt_thread_t.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ class crypt_queue_item_t {

class crypt_thread_t {
private:
int thread_id = 0;
std::thread *myThread = nullptr;
struct crypt_instance *CI = nullptr;
std::mutex mtx;
std::condition_variable cv;
volatile bool shutdown_requested;
std::queue<crypt_queue_item_t> queue_items;
public:
crypt_thread_t();
crypt_thread_t(int myThreadId);
~crypt_thread_t();

void loop();
Expand Down
7 changes: 7 additions & 0 deletions fifo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ fifo_client::fifo_client(std::string command_file) {
this->fp = nullptr;
debug_sprintf("[%s]", __PRETTY_FUNCTION__);
}

fifo_client::~fifo_client() {
if (this->fp) {
fflush(this->fp);
fclose(this->fp);
this->fp = nullptr;
this->fd = -1;
}
debug_sprintf("[%s]", __PRETTY_FUNCTION__);
}

Expand Down
5 changes: 1 addition & 4 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void do_help(const std::string &programName)
std::cout << " -c [FILE] configfile" << std::endl;
std::cout << " -d verbose output" << std::endl;
std::cout << " -f foreground" << std::endl;
#if WORKERS_ENABLED
#ifdef WORKERS_ENABLED
std::cout << " -n [MAX_WORKERS] max workers - between 0 and 100" << std::endl;
#endif
std::cout << " -t [THREADS] max_threads_per_worker - between 0 and 1000" << std::endl;
Expand Down Expand Up @@ -111,9 +111,6 @@ int mainloop(char **argv, bool daemonize)
atexit(signal_atexit);

int workers = (int)cfg->GetInt("nsca_workers", 4);
#if WORKERS_ENABLED == 0
workers = 1;
#endif
pm->setProcessMode(PROCESS_MAIN);
pm->setPids();
if (pm->lock(argv[0])) {
Expand Down
32 changes: 24 additions & 8 deletions network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extern "C"
network::network() : ev_base(nullptr), listener(nullptr), shutdown_requested(false), now(0) {
this->connections = 0;
this->counter = 0;
this->fifoClient = nullptr;
this->fifoClients = nullptr;
this->resultPathClient = nullptr;
time(&(this->now));
this->cnow.assign(ctime((&this->now)));
Expand All @@ -52,7 +52,11 @@ network::network() : ev_base(nullptr), listener(nullptr), shutdown_requested(fal
this->decryption_mode = (int)(cfg->GetInt("decryption_mode", 0));

if (!this->command_file.empty()) {
this->fifoClient = new fifo_client(this->command_file);
auto iters = this->nsca_threads_per_worker > 0 ? this->nsca_threads_per_worker : 1;
this->fifoClients = new fifo_client*[this->nsca_threads_per_worker];
for (auto i=0; i<iters; i++) {
this->fifoClients[i] = new fifo_client(this->command_file);
}
}
if (!this->check_result_path.empty()) {
this->resultPathClient = new result_path_client(this);
Expand Down Expand Up @@ -93,9 +97,13 @@ network::network() : ev_base(nullptr), listener(nullptr), shutdown_requested(fal
}

network::~network() {
if (this->fifoClient != nullptr) {
delete this->fifoClient;
this->fifoClient = nullptr;
if (this->fifoClients != nullptr) {
auto iters = this->nsca_threads_per_worker > 0 ? this->nsca_threads_per_worker : 1;
for (auto i=0; i<iters; i++) {
delete this->fifoClients[i];
}
delete [] this->fifoClients;
this->fifoClients = nullptr;
}

if (this->resultPathClient != nullptr) {
Expand Down Expand Up @@ -215,7 +223,11 @@ void network::run()
this->listener = evconnlistener_new_bind(this->ev_base,
reinterpret_cast<evconnlistener_cb>(network::listener_proxy),
(void *)this,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE_PORT,
#ifdef WORKERS_ENABLED
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE_PORT,
#else
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
#endif
-1,
(struct sockaddr*)&ss,
socklen);
Expand All @@ -224,8 +236,11 @@ void network::run()
}
assert(this->listener);

if (this->fifoClient) {
this->fifoClient->take_variables(this);
if (this->fifoClients) {
auto iters = this->nsca_threads_per_worker > 0 ? this->nsca_threads_per_worker : 1;
for (auto i=0; i<iters; i++) {
this->fifoClients[i]->take_variables(this);
}
}

this->start_timer();
Expand Down Expand Up @@ -255,6 +270,7 @@ void network::run()
if (this->resultPathClient) {
this->resultPathClient->uninit();
}

event_base_free(this->ev_base);
this->ev_base = nullptr;
debug_sprintf("[%s] finished.", __PRETTY_FUNCTION__);
Expand Down
3 changes: 2 additions & 1 deletion network.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define RPS_RESOLUTION 5

#include <string>
#include <vector>

#include "nsca_utils.h"
#include "stat_writer.h"
Expand Down Expand Up @@ -47,7 +48,7 @@ class network {
uint32_t connections = 0;
uint64_t counter = 0;
uint64_t counter_prev = 0;
class fifo_client *fifoClient = nullptr;
class fifo_client **fifoClients = nullptr;
class result_path_client *resultPathClient = nullptr;
class stat_writer StatWriter;

Expand Down
22 changes: 11 additions & 11 deletions network_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void network_client::connection_closed() {
if (this->parent->nsca_threads_per_worker > 0) {
threadManager::getInstance()->add_job(THREADMANAGER_METHOD_DECRYPT_PACKET, this);
} else {
this->process_queue();
this->process_queue(0);
}
} else {
delete this;
Expand Down Expand Up @@ -282,19 +282,19 @@ void network_client::readcb(struct bufferevent *bev)
this->data_packets.push_back(data_packet_pair);
}

void network_client::process_queue() {
void network_client::process_queue(int thread_id) {
if (this->parent->decryption_method > 1) {
this->process_queue_mcrypt();
this->process_queue_mcrypt(thread_id);
} else {
this->process_queue_nomcrypt();
this->process_queue_nomcrypt(thread_id);
}

debug_sprintf("[%s] [success=%d failed=%d]", __PRETTY_FUNCTION__, d_success, d_failed );
this->parent->report_success_failed(d_success, d_failed);
delete this;
}

void network_client::process_queue_mcrypt() {
void network_client::process_queue_mcrypt(int thread_id) {
this->debug_message(__PRETTY_FUNCTION__ );

int i = 1;
Expand Down Expand Up @@ -355,7 +355,7 @@ void network_client::process_queue_mcrypt() {
}
}

this->send_receive_message(receive_packet);
this->send_receive_message(receive_packet, thread_id);

i++;
free(data_packet_pair);
Expand All @@ -364,7 +364,7 @@ void network_client::process_queue_mcrypt() {
this->debug_message("process stop");
}

void network_client::process_queue_nomcrypt() {
void network_client::process_queue_nomcrypt(int thread_id) {
this->debug_message(__PRETTY_FUNCTION__ );

int i = 1;
Expand Down Expand Up @@ -412,7 +412,7 @@ void network_client::process_queue_nomcrypt() {
}
}

this->send_receive_message(receive_packet);
this->send_receive_message(receive_packet, thread_id);

i++;
free(data_packet_pair);
Expand All @@ -429,14 +429,14 @@ void network_client::set_CI(struct crypt_instance *myCI) {
//encrypt_init(this->parent->password.c_str(), this->parent->decryption_method, this->parent->shared_transmission_iv, &(this->CI));
}

void network_client::send_receive_message(data_packet *receive_packet)
void network_client::send_receive_message(data_packet *receive_packet, int thread_id)
{
int rc = 0;
int return_code = ntohs(receive_packet->return_code);

rc = 0;
if (this->parent->fifoClient && !this->parent->command_file.empty()) {
if (this->parent->fifoClient->command(receive_packet->host_name, receive_packet->svc_description,
if (this->parent->fifoClients && !this->parent->command_file.empty()) {
if (this->parent->fifoClients[thread_id]->command(receive_packet->host_name, receive_packet->svc_description,
return_code, receive_packet->plugin_output) == 0) {
rc++;
}
Expand Down
8 changes: 4 additions & 4 deletions network_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ class network_client {
static void writecb_proxy(bufferevent *, void *);
static void eventcb_proxy(struct bufferevent *, short , void *);

void process_queue();
void process_queue_mcrypt();
void process_queue_nomcrypt();
void process_queue(int thread_id);
void process_queue_mcrypt(int thread_id);
void process_queue_nomcrypt(int thread_id);

void set_CI(struct crypt_instance *);
void send_receive_message(data_packet*);
void send_receive_message(data_packet*, int thread_id);
};

#endif //NSCA_NETWORK_CLIENT_H
12 changes: 7 additions & 5 deletions nsca_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ int encrypt_init(const char *password,int encryption_method,char *received_iv,st
CI->iv_size = 0;
CI->blocksize=1; /* block size = 1 byte w/ CFB mode */
CI->keysize=7; /* default to 56 bit key length */
CI->mcrypt_mode = nullptr;
CI->mcrypt_algorithm = nullptr;

/* XOR or no encryption */
if(encryption_method==ENCRYPT_NONE || encryption_method==ENCRYPT_XOR)
return OK;

CI->mcrypt_mode = (char*)malloc(64); /* CFB = 8-bit cipher-feedback mode */
CI->mcrypt_algorithm = (char*)malloc(128);

Expand All @@ -101,11 +108,6 @@ int encrypt_init(const char *password,int encryption_method,char *received_iv,st
strcpy(CI->mcrypt_mode, "cfb");
strcpy(CI->mcrypt_algorithm, "unknown");

/* XOR or no encryption */
if(encryption_method==ENCRYPT_NONE || encryption_method==ENCRYPT_XOR)
return OK;


/* get the name of the mcrypt encryption algorithm to use */
switch(encryption_method){
case ENCRYPT_DES:
Expand Down
2 changes: 1 addition & 1 deletion threadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void threadManager::start() {
for (int i=0; i<this->max_crypt_threads; i++) {
lCI = nullptr;
encrypt_init(password.c_str(), decryption_method, this->shared_transmitted_iv, &lCI);
lct = new crypt_thread_t;
lct = new crypt_thread_t(i);
assert(lct);
lct->set_CI(lCI);
this->crypt_threads[i] = lct;
Expand Down

0 comments on commit 22e0686

Please sign in to comment.