diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 79f3b16e676..51be85381ad 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -1085,8 +1085,31 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "set a database sync method. values: extra, full, normal and off." }, + { + FLB_CONFIG_MAP_BOOL, "db.locking", "false", + 0, FLB_TRUE, offsetof(struct k8s_events, db_locking), + "set exclusive locking mode, increase performance but don't allow " + "external connections to the database file." + }, + { + FLB_CONFIG_MAP_STR, "db.journal_mode", "WAL", + 0, FLB_TRUE, offsetof(struct k8s_events, db_journal_mode), + "set the journal mode for the database. values: DELETE, TRUNCATE, " + "PERSIST, MEMORY, WAL, OFF." + }, #endif + { + FLB_CONFIG_MAP_INT, "dns_retries", "6", + 0, FLB_TRUE, offsetof(struct k8s_events, dns_retries), + "dns lookup retries N times until the network starts working" + }, + { + FLB_CONFIG_MAP_TIME, "dns_wait_time", "30", + 0, FLB_TRUE, offsetof(struct k8s_events, dns_wait_time), + "dns interval between network status checks" + }, + /* EOF */ {0} }; diff --git a/plugins/in_kubernetes_events/kubernetes_events_conf.c b/plugins/in_kubernetes_events/kubernetes_events_conf.c index e84872f70e7..be28f49233b 100644 --- a/plugins/in_kubernetes_events/kubernetes_events_conf.c +++ b/plugins/in_kubernetes_events/kubernetes_events_conf.c @@ -232,6 +232,43 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) } #ifdef FLB_HAVE_SQLDB + /* Database sync mode (needs to be set before opening the database) */ + ctx->db_sync = 1; /* default: sqlite sync 'normal' */ + tmp = flb_input_get_property("db.sync", ins); + if (tmp) { + if (strcasecmp(tmp, "extra") == 0) { + ctx->db_sync = 3; + } + else if (strcasecmp(tmp, "full") == 0) { + ctx->db_sync = 2; + } + else if (strcasecmp(tmp, "normal") == 0) { + ctx->db_sync = 1; + } + else if (strcasecmp(tmp, "off") == 0) { + ctx->db_sync = 0; + } + else { + flb_plg_error(ctx->ins, "invalid database 'db.sync' value: %s", tmp); + } + } + + /* Journal mode validation */ + tmp = flb_input_get_property("db.journal_mode", ins); + if (tmp) { + if (strcasecmp(tmp, "DELETE") != 0 && + strcasecmp(tmp, "TRUNCATE") != 0 && + strcasecmp(tmp, "PERSIST") != 0 && + strcasecmp(tmp, "MEMORY") != 0 && + strcasecmp(tmp, "WAL") != 0 && + strcasecmp(tmp, "OFF") != 0) { + + flb_plg_error(ctx->ins, "invalid db.journal_mode=%s", tmp); + k8s_events_conf_destroy(ctx); + return NULL; + } + } + /* Initialize database */ tmp = flb_input_get_property("db", ins); if (tmp) { diff --git a/tests/runtime/in_kubernetes_events.c b/tests/runtime/in_kubernetes_events.c index d17707057f2..4234f071859 100644 --- a/tests/runtime/in_kubernetes_events.c +++ b/tests/runtime/in_kubernetes_events.c @@ -303,6 +303,85 @@ static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data) return ctx; } +/* Create test context with additional config options for config parameter testing */ +static struct test_ctx *test_ctx_create_with_config(struct flb_lib_out_cb *data, + const char *db_sync, + const char *db_locking, + const char *db_journal_mode, + const char *dns_retries, + const char *dns_wait_time) +{ + int i_ffd; + int o_ffd; + int ret; + struct test_ctx *ctx = NULL; + char kube_url[512] = {0}; + + ctx = flb_calloc(1, sizeof(struct test_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("flb_calloc failed"); + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "3", + "Log_Level", "debug", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "kubernetes_events", NULL); + TEST_CHECK(i_ffd >= 0); + ctx->i_ffd = i_ffd; + + sprintf(kube_url, "http://%s:%d", KUBE_API_HOST, KUBE_API_PORT); + ret = flb_input_set(ctx->flb, i_ffd, + "kube_url", kube_url, + "kube_token_file", KUBE_TOKEN_FILE, + "kube_retention_time", "365000d", + "tls", "off", + "interval_sec", "1", + "interval_nsec", "0", + NULL); + TEST_CHECK(ret == 0); + + /* Set optional config parameters if provided */ + if (db_sync) { + ret = flb_input_set(ctx->flb, i_ffd, "db.sync", db_sync, NULL); + TEST_CHECK(ret == 0); + } + if (db_locking) { + ret = flb_input_set(ctx->flb, i_ffd, "db.locking", db_locking, NULL); + TEST_CHECK(ret == 0); + } + if (db_journal_mode) { + ret = flb_input_set(ctx->flb, i_ffd, "db.journal_mode", db_journal_mode, NULL); + TEST_CHECK(ret == 0); + } + if (dns_retries) { + ret = flb_input_set(ctx->flb, i_ffd, "dns_retries", dns_retries, NULL); + TEST_CHECK(ret == 0); + } + if (dns_wait_time) { + ret = flb_input_set(ctx->flb, i_ffd, "dns_wait_time", dns_wait_time, NULL); + TEST_CHECK(ret == 0); + } + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data); + ctx->o_ffd = o_ffd; + + flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + + return ctx; +} + static void test_ctx_destroy(struct test_ctx *ctx) { TEST_CHECK(ctx != NULL); @@ -444,10 +523,154 @@ void flb_test_events_with_chunkedrecv() test_ctx_destroy(ctx); } +/* Test valid db.sync values */ +void flb_test_config_db_sync_values() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + const char *sync_values[] = {"extra", "full", "normal", "off", NULL}; + int i; + + cb_data.cb = NULL; + cb_data.data = NULL; + + for (i = 0; sync_values[i] != NULL; i++) { + ctx = test_ctx_create_with_config(&cb_data, + sync_values[i], /* db.sync */ + NULL, /* db.locking */ + NULL, /* db.journal_mode */ + NULL, /* dns_retries */ + NULL); /* dns_wait_time */ + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create_with_config failed for db.sync=%s", sync_values[i]); + continue; + } + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + if (ret != 0) { + TEST_MSG("flb_start failed for db.sync=%s", sync_values[i]); + } + + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + } +} + +/* Test valid db.journal_mode values */ +void flb_test_config_db_journal_mode_values() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + const char *journal_modes[] = {"DELETE", "TRUNCATE", "PERSIST", "MEMORY", "WAL", "OFF", NULL}; + int i; + + cb_data.cb = NULL; + cb_data.data = NULL; + + for (i = 0; journal_modes[i] != NULL; i++) { + ctx = test_ctx_create_with_config(&cb_data, + NULL, /* db.sync */ + NULL, /* db.locking */ + journal_modes[i], /* db.journal_mode */ + NULL, /* dns_retries */ + NULL); /* dns_wait_time */ + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create_with_config failed for db.journal_mode=%s", journal_modes[i]); + continue; + } + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + if (ret != 0) { + TEST_MSG("flb_start failed for db.journal_mode=%s", journal_modes[i]); + } + + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + } +} + +/* Test valid db.locking values */ +void flb_test_config_db_locking_values() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + const char *locking_values[] = {"true", "false", NULL}; + int i; + + cb_data.cb = NULL; + cb_data.data = NULL; + + for (i = 0; locking_values[i] != NULL; i++) { + ctx = test_ctx_create_with_config(&cb_data, + NULL, /* db.sync */ + locking_values[i], /* db.locking */ + NULL, /* db.journal_mode */ + NULL, /* dns_retries */ + NULL); /* dns_wait_time */ + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create_with_config failed for db.locking=%s", locking_values[i]); + continue; + } + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + if (ret != 0) { + TEST_MSG("flb_start failed for db.locking=%s", locking_values[i]); + } + + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + } +} + +/* Test dns_retries and dns_wait_time config */ +void flb_test_config_dns_options() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + + cb_data.cb = NULL; + cb_data.data = NULL; + + ctx = test_ctx_create_with_config(&cb_data, + NULL, /* db.sync */ + NULL, /* db.locking */ + NULL, /* db.journal_mode */ + "10", /* dns_retries */ + "60"); /* dns_wait_time */ + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create_with_config failed for dns options"); + return; + } + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + if (ret != 0) { + TEST_MSG("flb_start failed for dns options"); + } + + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + TEST_LIST = { {"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp}, {"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp}, //{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv}, + {"config_db_sync_values", flb_test_config_db_sync_values}, + {"config_db_journal_mode_values", flb_test_config_db_journal_mode_values}, + {"config_db_locking_values", flb_test_config_db_locking_values}, + {"config_dns_options", flb_test_config_dns_options}, {NULL, NULL} };