Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ set (LIB_HEADERS
ack_tracker.h
host-id.h
resolved-configurable-paths.h
window-size-counter.h
${PROJECT_BINARY_DIR}/lib/cfg-grammar.h
${PROJECT_BINARY_DIR}/lib/block-ref-grammar.h
${PROJECT_BINARY_DIR}/lib/cfg-lex.h
Expand Down Expand Up @@ -215,6 +216,7 @@ set(LIB_SOURCES
utf8utils.c
host-id.c
resolved-configurable-paths.c
window-size-counter.c
${COMPAT_SOURCES}
${CONTROL_SOURCES}
${DEBUGGER_SOURCES}
Expand Down
4 changes: 3 additions & 1 deletion lib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ pkginclude_HEADERS += \
lib/late_ack_tracker.h \
lib/host-id.h \
lib/resolved-configurable-paths.h \
lib/pe-versioning.h
lib/pe-versioning.h \
lib/window-size-counter.h

# this is intentionally formatted so conflicts are less likely to arise. one name in every line.
lib_libsyslog_ng_la_SOURCES = \
Expand Down Expand Up @@ -255,6 +256,7 @@ lib_libsyslog_ng_la_SOURCES = \
$(transport_crypto_sources) \
lib/host-id.c \
lib/resolved-configurable-paths.c \
lib/window-size-counter.c \
\
lib/cfg-lex.l \
lib/cfg-grammar.y \
Expand Down
4 changes: 2 additions & 2 deletions lib/early_ack_tracker.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ early_ack_tracker_manage_msg_ack(AckTracker *s, LogMessage *msg, AckType ack_typ

if (ack_type == AT_SUSPENDED)
log_source_flow_control_suspend(self->super.source);
else
log_source_flow_control_adjust(self->super.source, 1);

log_source_flow_control_adjust(self->super.source, 1);

log_msg_unref(msg);
log_pipe_unref((LogPipe *)self->super.source);
Expand Down
5 changes: 4 additions & 1 deletion lib/late_ack_tracker.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ late_ack_tracker_manage_msg_ack(AckTracker *s, LogMessage *msg, AckType ack_type

ack_rec->acked = TRUE;

if (ack_type == AT_SUSPENDED)
log_source_flow_control_suspend(self->super.source);

late_ack_tracker_lock(s);
{
ack_range_length = _get_continuous_range_length(self);
Expand All @@ -172,7 +175,7 @@ late_ack_tracker_manage_msg_ack(AckTracker *s, LogMessage *msg, AckType ack_type
_drop_range(self, ack_range_length);

if (ack_type == AT_SUSPENDED)
log_source_flow_control_suspend(self->super.source);
log_source_flow_control_adjust_when_suspended(self->super.source, ack_range_length);
else
log_source_flow_control_adjust(self->super.source, ack_range_length);

Expand Down
53 changes: 36 additions & 17 deletions lib/logsource.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,25 @@ log_source_window_empty(LogSource *self)
}

static inline void
_flow_control_window_size_adjust(LogSource *self, guint32 window_size_increment)
_flow_control_window_size_adjust(LogSource *self, guint32 window_size_increment, gboolean last_ack_type_is_suspended)
{
guint32 old_window_size;
gboolean suspended;
gsize old_window_size = window_size_counter_add(&self->window_size, window_size_increment, &suspended);

window_size_increment += g_atomic_counter_get(&self->suspended_window_size);
old_window_size = g_atomic_counter_exchange_and_add(&self->window_size, window_size_increment);
g_atomic_counter_set(&self->suspended_window_size, 0);
msg_debug("Window size adjustment",
evt_tag_int("old_window_size", old_window_size),
evt_tag_int("window_size_increment", window_size_increment),
evt_tag_str("suspended_before_increment", suspended ? "TRUE" : "FALSE"),
evt_tag_str("last_ack_type_is_suspended", last_ack_type_is_suspended ? "TRUE" : "FALSE"));

if (old_window_size == 0)

gboolean need_to_resume_counter = !last_ack_type_is_suspended && suspended;
if (need_to_resume_counter)
window_size_counter_resume(&self->window_size);
if (old_window_size == 0 || need_to_resume_counter)
log_source_wakeup(self);
if (g_atomic_counter_get(&self->window_size) == self->options->init_window_size)

if (old_window_size+window_size_increment == self->options->init_window_size)
log_source_window_empty(self);
}

Expand Down Expand Up @@ -133,7 +141,14 @@ _flow_control_rate_adjust(LogSource *self)
void
log_source_flow_control_adjust(LogSource *self, guint32 window_size_increment)
{
_flow_control_window_size_adjust(self, window_size_increment);
_flow_control_window_size_adjust(self, window_size_increment, FALSE);
_flow_control_rate_adjust(self);
}

void
log_source_flow_control_adjust_when_suspended(LogSource *self, guint32 window_size_increment)
{
_flow_control_window_size_adjust(self, window_size_increment, TRUE);
_flow_control_rate_adjust(self);
}

Expand All @@ -153,11 +168,11 @@ log_source_msg_ack(LogMessage *msg, AckType ack_type)
void
log_source_flow_control_suspend(LogSource *self)
{
msg_debug("Source has been suspended", log_pipe_location_tag(&self->super));
msg_debug("Source has been suspended",
log_pipe_location_tag(&self->super),
evt_tag_str("function", __FUNCTION__));

g_atomic_counter_set(&self->suspended_window_size, g_atomic_counter_get(&self->window_size));
g_atomic_counter_set(&self->window_size, 0);
_flow_control_rate_adjust(self);
window_size_counter_suspend(&self->window_size);
}

void
Expand Down Expand Up @@ -259,10 +274,14 @@ log_source_post(LogSource *self, LogMessage *msg)
log_msg_add_ack(msg, &path_options);
msg->ack_func = log_source_msg_ack;

old_window_size = g_atomic_counter_exchange_and_add(&self->window_size, -1);
old_window_size = window_size_counter_sub(&self->window_size, 1, NULL);

if (G_UNLIKELY(old_window_size == 1))
msg_debug("Source has been suspended", log_pipe_location_tag(&self->super));
{
msg_debug("Source has been suspended",
log_pipe_location_tag(&self->super),
evt_tag_str("function", __FUNCTION__));
}

/*
* NOTE: this assertion validates that the source is not overflowing its
Expand Down Expand Up @@ -425,8 +444,8 @@ log_source_set_options(LogSource *self, LogSourceOptions *options,
* configuration and we received a SIGHUP. This means that opened
* connections will not have their window_size changed. */

if (g_atomic_counter_get(&self->window_size) == -1)
g_atomic_counter_set(&self->window_size, options->init_window_size);
if ((gint)window_size_counter_get(&self->window_size, NULL) == -1)
window_size_counter_set(&self->window_size, options->init_window_size);
self->options = options;
if (self->stats_id)
g_free(self->stats_id);
Expand All @@ -448,7 +467,7 @@ log_source_init_instance(LogSource *self, GlobalConfig *cfg)
self->super.free_fn = log_source_free;
self->super.init = log_source_init;
self->super.deinit = log_source_deinit;
g_atomic_counter_set(&self->window_size, -1);
window_size_counter_set(&self->window_size, (gsize)-1);
self->ack_tracker = NULL;
}

Expand Down
7 changes: 4 additions & 3 deletions lib/logsource.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "logpipe.h"
#include "stats/stats-registry.h"
#include "window-size-counter.h"

typedef struct _LogSourceOptions
{
Expand Down Expand Up @@ -66,8 +67,7 @@ struct _LogSource
gboolean pos_tracked;
gchar *stats_id;
gchar *stats_instance;
GAtomicCounter window_size;
GAtomicCounter suspended_window_size;
WindowSizeCounter window_size;
StatsCounterItem *last_message_seen;
StatsCounterItem *recvd_messages;
guint32 last_ack_count;
Expand All @@ -83,7 +83,7 @@ struct _LogSource
static inline gboolean
log_source_free_to_send(LogSource *self)
{
return g_atomic_counter_get(&self->window_size) > 0;
return !window_size_counter_suspended(&self->window_size);
}

static inline gint
Expand All @@ -109,6 +109,7 @@ void log_source_free(LogPipe *s);
void log_source_wakeup(LogSource *self);
void log_source_window_empty(LogSource *self);
void log_source_flow_control_adjust(LogSource *self, guint32 window_size_increment);
void log_source_flow_control_adjust_when_suspended(LogSource *self, guint32 window_size_increment);
void log_source_flow_control_suspend(LogSource *self);

void log_source_global_init(void);
Expand Down
1 change: 1 addition & 0 deletions lib/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_unit_test(CRITERION TARGET test_scratch_buffers)
add_unit_test(CRITERION TARGET test_timeutils)
add_unit_test(CRITERION TARGET test_messages)
add_unit_test(CRITERION TARGET test_atomic_gssize)
add_unit_test(CRITERION TARGET test_window_size_counter)

SET_DIRECTORY_PROPERTIES(PROPERTIES
ADDITIONAL_MAKE_CLEAN_FILES
Expand Down
8 changes: 7 additions & 1 deletion lib/tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ lib_tests_TESTS = \
lib/tests/test_utf8utils \
lib/tests/test_userdb \
lib/tests/test_str-utils \
lib/tests/test_atomic_gssize
lib/tests/test_atomic_gssize \
lib/tests/test_window_size_counter

EXTRA_DIST += lib/tests/CMakeLists.txt

Expand Down Expand Up @@ -111,6 +112,11 @@ lib_tests_test_atomic_gssize_CFLAGS = \
lib_tests_test_atomic_gssize_LDADD = \
$(TEST_LDADD)

lib_tests_test_window_size_counter_CFLAGS = \
$(TEST_CFLAGS)
lib_tests_test_window_size_counter_LDADD = \
$(TEST_LDADD)


CLEANFILES += \
test_values.persist \
Expand Down
94 changes: 94 additions & 0 deletions lib/tests/test_window_size_counter.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2018 Balabit
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#include "syslog-ng.h"
#include "window-size-counter.h"
#include <criterion/criterion.h>


Test(test_window_size_counter, suspend_resume)
{
WindowSizeCounter c;
gboolean suspended = FALSE;
window_size_counter_set(&c, 10);
cr_expect_not(window_size_counter_suspended(&c));

window_size_counter_sub(&c, 10, &suspended);
cr_expect_not(suspended);
cr_expect(window_size_counter_suspended(&c));

window_size_counter_add(&c, 10, &suspended);
cr_expect(suspended);
cr_expect_not(window_size_counter_suspended(&c));

window_size_counter_suspend(&c);
cr_expect(window_size_counter_suspended(&c));

gsize val = window_size_counter_get(&c, &suspended);
cr_expect(suspended);
cr_expect_eq(val, 10);

window_size_counter_add(&c, 1, &suspended);
cr_expect_eq(window_size_counter_get(&c, &suspended), 11);
window_size_counter_resume(&c);
cr_expect_not(window_size_counter_suspended(&c));
}

Test(test_window_size_counter, negative_value)
{
WindowSizeCounter c;
gboolean suspended = FALSE;
window_size_counter_set(&c, -1);
gint v = (gint)window_size_counter_get(&c, &suspended);
cr_assert_eq(v, -1);
}

Test(test_window_size_counter, suspend_resume_multiple_times)
{
WindowSizeCounter c;
window_size_counter_set(&c, window_size_counter_get_max());

window_size_counter_resume(&c);
cr_expect_not(window_size_counter_suspended(&c));
gboolean suspended;
cr_expect_eq(window_size_counter_get(&c, &suspended), window_size_counter_get_max());
cr_expect_not(suspended);
window_size_counter_resume(&c);
cr_expect_not(window_size_counter_suspended(&c));
cr_expect_eq(window_size_counter_get(&c, &suspended), window_size_counter_get_max());
cr_expect_not(suspended);

window_size_counter_suspend(&c);
cr_expect(window_size_counter_suspended(&c));
cr_expect_eq(window_size_counter_get(&c, &suspended), window_size_counter_get_max());
cr_expect(suspended);

window_size_counter_suspend(&c);
cr_expect(window_size_counter_suspended(&c));
cr_expect_eq(window_size_counter_get(&c, &suspended), window_size_counter_get_max());
cr_expect(suspended);
window_size_counter_resume(&c);
cr_expect_not(window_size_counter_suspended(&c));
cr_expect_eq(window_size_counter_get(&c, &suspended), window_size_counter_get_max());
cr_expect_not(suspended);
}
Loading