From d2186c39b1c90e942252e59253d558707c19f36f Mon Sep 17 00:00:00 2001 From: Mad Bergo Date: Fri, 18 Apr 2025 17:18:04 -0300 Subject: [PATCH] Fix #292: Add server-specific channel statistics This commit adds support for server-specific channel statistics to prevent information leakage between different server instances running on the same host. The issue was that the published_messages and stored_messages counters were stored in a shared memory segment that is shared across all server instances running on the same host. When a user requests channel statistics from one server instance, they were seeing the combined statistics from all server instances, which could leak information. Changes: - Added a new directive push_stream_server_id to set a unique identifier for each server instance - Added server-specific counters for published_messages and stored_messages - Modified the channel statistics handler to use the server-specific counters when available - Updated the documentation to include the new directive This fix ensures that each server instance only reports its own statistics, preventing information leakage between different server instances. --- CHANGELOG.textile | 2 + README.textile | 80 +++++++++++---------- docs/directives/server.textile | 26 +++++++ include/ngx_http_push_stream_module.h | 3 + include/ngx_http_push_stream_module_setup.h | 1 + src/ngx_http_push_stream_module.c | 7 +- src/ngx_http_push_stream_module_setup.c | 62 ++++++++++++++++ src/ngx_http_push_stream_module_utils.c | 15 ++++ 8 files changed, 156 insertions(+), 40 deletions(-) create mode 100644 docs/directives/server.textile diff --git a/CHANGELOG.textile b/CHANGELOG.textile index 67853f27..9d532538 100644 --- a/CHANGELOG.textile +++ b/CHANGELOG.textile @@ -2,6 +2,8 @@ h1(#changelog). Changelog h2. Version 0.6.0 +* Fix #292 adding support for server-specific channel statistics to prevent information leakage between different server instances + * Fix #307 adding support for Nginx 1.23.0+ * Update gems and other test dependencies * [javascript] Remove deprecated javascript event usage (Thanks @d-javu) diff --git a/README.textile b/README.textile index 6a478a6f..b8a9ac70 100644 --- a/README.textile +++ b/README.textile @@ -135,45 +135,46 @@ There is a javascript client implementation "here":javascript_client, which is f h1(#directives). Directives -(1) Defining locations, (2) Main configuration, (3) Subscribers configuration, (4) Publishers configuration, (5) Channels Statistics configuration, (6) WebSocket configuration - -(head). | Directive | (1) | (2) | (3) | (4) | (5) | (6) | -| "push_stream_channels_statistics":push_stream_channels_statistics |   x |   - |   - |   - |   - |   - | -| "push_stream_publisher":push_stream_publisher |   x |   - |   - |   - |   - |   - | -| "push_stream_subscriber":push_stream_subscriber |   x |   - |   - |   - |   - |   - | -| "push_stream_shared_memory_size":push_stream_shared_memory_size |   - |   x |   - |   - |   - |   - | -| "push_stream_channel_deleted_message_text":push_stream_channel_deleted_message_text |   - |   x |   - |   - |   - |   - | -| "push_stream_channel_inactivity_time":push_stream_channel_inactivity_time |   - |   x |   - |   - |   - |   - | -| "push_stream_ping_message_text":push_stream_ping_message_text |   - |   x |   - |   - |   - |   - | -| "push_stream_timeout_with_body":push_stream_timeout_with_body |   - |   x |   - |   - |   - |   - | -| "push_stream_message_ttl":push_stream_message_ttl |   - |   x |   - |   - |   - |   - | -| "push_stream_max_subscribers_per_channel":push_stream_max_subscribers_per_channel |   - |   x |   - |   - |   - |   - | -| "push_stream_max_messages_stored_per_channel":push_stream_max_messages_stored_per_channel |   - |   x |   - |   - |   - |   - | -| "push_stream_max_channel_id_length":push_stream_max_channel_id_length |   - |   x |   - |   - |   - |   - | -| "push_stream_max_number_of_channels":push_stream_max_number_of_channels |   - |   x |   - |   - |   - |   - | -| "push_stream_max_number_of_wildcard_channels":push_stream_max_number_of_wildcard_channels |   - |   x |   - |   - |   - |   - | -| "push_stream_wildcard_channel_prefix":push_stream_wildcard_channel_prefix |   - |   x |   - |   - |   - |   - | -| "push_stream_events_channel_id":push_stream_events_channel_id |   - |   x |   - |   - |   - |   - | -| "push_stream_channels_path":push_stream_channels_path |   - |   - |   x |   x |   x |   x | -| "push_stream_store_messages":push_stream_store_messages |   - |   - |   - |   x |   - |   x | -| "push_stream_channel_info_on_publish":push_stream_channel_info_on_publish |   - |   - |   - |   x |   - |   - | -| "push_stream_authorized_channels_only":push_stream_authorized_channels_only |   - |   - |   x |   - |   - |   x | -| "push_stream_header_template_file":push_stream_header_template_file |   - |   - |   x |   - |   - |   x | -| "push_stream_header_template":push_stream_header_template |   - |   - |   x |   - |   - |   x | -| "push_stream_message_template":push_stream_message_template |   - |   - |   x |   - |   - |   x | -| "push_stream_footer_template":push_stream_footer_template |   - |   - |   x |   - |   - |   x | -| "push_stream_wildcard_channel_max_qtd":push_stream_wildcard_channel_max_qtd |   - |   - |   x |   - |   - |   x | -| "push_stream_ping_message_interval":push_stream_ping_message_interval |   - |   - |   x |   - |   - |   x | -| "push_stream_subscriber_connection_ttl":push_stream_subscriber_connection_ttl |   - |   - |   x |   - |   - |   x | -| "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl |   - |   - |   x |   - |   - |   - | -| "push_stream_websocket_allow_publish":push_stream_websocket_allow_publish |   - |   - |   - |   - |   - |   x | -| "push_stream_last_received_message_time":push_stream_last_received_message_time |   - |   - |   x |   - |   - |   - | -| "push_stream_last_received_message_tag":push_stream_last_received_message_tag |   - |   - |   x |   - |   - |   - | -| "push_stream_last_event_id":push_stream_last_event_id |   - |   - |   x |   - |   - |   - | -| "push_stream_user_agent":push_stream_user_agent |   - |   - |   x |   - |   - |   - | -| "push_stream_padding_by_user_agent":push_stream_padding_by_user_agent |   - |   - |   x |   - |   - |   - | -| "push_stream_allowed_origins":push_stream_allowed_origins |   - |   - |   x |   - |   - |   - | -| "push_stream_allow_connections_to_events_channel":push_stream_allow_connections_to_events_channel |   - |   - |   x |   - |   - |   x | +(1) Defining locations, (2) Main configuration, (3) Subscribers configuration, (4) Publishers configuration, (5) Channels Statistics configuration, (6) WebSocket configuration, (7) Server configuration + +(head). | Directive | (1) | (2) | (3) | (4) | (5) | (6) | (7) | +| "push_stream_channels_statistics":push_stream_channels_statistics |   x |   - |   - |   - |   - |   - |   - | +| "push_stream_publisher":push_stream_publisher |   x |   - |   - |   - |   - |   - |   - | +| "push_stream_subscriber":push_stream_subscriber |   x |   - |   - |   - |   - |   - |   - | +| "push_stream_shared_memory_size":push_stream_shared_memory_size |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_channel_deleted_message_text":push_stream_channel_deleted_message_text |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_channel_inactivity_time":push_stream_channel_inactivity_time |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_ping_message_text":push_stream_ping_message_text |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_timeout_with_body":push_stream_timeout_with_body |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_message_ttl":push_stream_message_ttl |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_max_subscribers_per_channel":push_stream_max_subscribers_per_channel |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_max_messages_stored_per_channel":push_stream_max_messages_stored_per_channel |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_max_channel_id_length":push_stream_max_channel_id_length |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_max_number_of_channels":push_stream_max_number_of_channels |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_max_number_of_wildcard_channels":push_stream_max_number_of_wildcard_channels |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_wildcard_channel_prefix":push_stream_wildcard_channel_prefix |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_events_channel_id":push_stream_events_channel_id |   - |   x |   - |   - |   - |   - |   - | +| "push_stream_server_id":push_stream_server_id |   - |   - |   - |   - |   - |   - |   x | +| "push_stream_channels_path":push_stream_channels_path |   - |   - |   x |   x |   x |   x |   - | +| "push_stream_store_messages":push_stream_store_messages |   - |   - |   - |   x |   - |   x |   - | +| "push_stream_channel_info_on_publish":push_stream_channel_info_on_publish |   - |   - |   - |   x |   - |   - |   - | +| "push_stream_authorized_channels_only":push_stream_authorized_channels_only |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_header_template_file":push_stream_header_template_file |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_header_template":push_stream_header_template |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_message_template":push_stream_message_template |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_footer_template":push_stream_footer_template |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_wildcard_channel_max_qtd":push_stream_wildcard_channel_max_qtd |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_ping_message_interval":push_stream_ping_message_interval |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_subscriber_connection_ttl":push_stream_subscriber_connection_ttl |   - |   - |   x |   - |   - |   x |   - | +| "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl |   - |   - |   x |   - |   - |   - |   - | +| "push_stream_websocket_allow_publish":push_stream_websocket_allow_publish |   - |   - |   - |   - |   - |   x |   - | +| "push_stream_last_received_message_time":push_stream_last_received_message_time |   - |   - |   x |   - |   - |   - |   - | +| "push_stream_last_received_message_tag":push_stream_last_received_message_tag |   - |   - |   x |   - |   - |   - |   - | +| "push_stream_last_event_id":push_stream_last_event_id |   - |   - |   x |   - |   - |   - |   - | +| "push_stream_user_agent":push_stream_user_agent |   - |   - |   x |   - |   - |   - |   - | +| "push_stream_padding_by_user_agent":push_stream_padding_by_user_agent |   - |   - |   x |   - |   - |   - |   - | +| "push_stream_allowed_origins":push_stream_allowed_origins |   - |   - |   x |   - |   - |   - |   - | +| "push_stream_allow_connections_to_events_channel":push_stream_allow_connections_to_events_channel |   - |   - |   x |   - |   - |   x |   - | h1(#installation). Installation   @@ -284,5 +285,6 @@ h1(#contributors). Contributors [push_stream_allowed_origins]docs/directives/subscribers.textile#push_stream_allowed_origins [push_stream_websocket_allow_publish]docs/directives/subscribers.textile#push_stream_websocket_allow_publish [push_stream_allow_connections_to_events_channel]docs/directives/subscribers.textile#push_stream_allow_connections_to_events_channel +[push_stream_server_id]docs/directives/server.textile#push_stream_server_id [wiki]https://github.com/wandenberg/nginx-push-stream-module/wiki/_pages [nginx_debugging]http://wiki.nginx.org/Debugging diff --git a/docs/directives/server.textile b/docs/directives/server.textile new file mode 100644 index 00000000..ab65e2c7 --- /dev/null +++ b/docs/directives/server.textile @@ -0,0 +1,26 @@ +h1(#server_configuration). Server Configuration + +h2(#push_stream_server_id). push_stream_server_id   + +*syntax:* _push_stream_server_id string_ + +*default:* _none_ + +*context:* _server_ + +Sets a unique identifier for the server instance. This is used to separate statistics between different server instances running on the same host. When this directive is set, the channels-stats endpoint will only report published_messages and stored_messages for the current server instance, preventing information leakage between different server instances. + +Example: +
+server {
+    listen 5601;
+    push_stream_server_id "server_5601";
+    ...
+}
+
+server {
+    listen 5602;
+    push_stream_server_id "server_5602";
+    ...
+}
+
diff --git a/include/ngx_http_push_stream_module.h b/include/ngx_http_push_stream_module.h index 3f6303ce..fbe0d3c0 100644 --- a/include/ngx_http_push_stream_module.h +++ b/include/ngx_http_push_stream_module.h @@ -276,6 +276,9 @@ struct ngx_http_push_stream_shm_data_s { ngx_uint_t published_messages; // # of published messagens in all channels ngx_uint_t stored_messages; // # of messages being stored ngx_uint_t subscribers; // # of subscribers in all channels + ngx_uint_t server_published_messages; // # of published messages in current server instance + ngx_uint_t server_stored_messages; // # of messages being stored in current server instance + ngx_str_t server_id; // server instance identifier ngx_queue_t messages_trash; ngx_shmtx_t messages_trash_mutex; ngx_shmtx_sh_t messages_trash_lock; diff --git a/include/ngx_http_push_stream_module_setup.h b/include/ngx_http_push_stream_module_setup.h index c6706132..5f338f1b 100644 --- a/include/ngx_http_push_stream_module_setup.h +++ b/include/ngx_http_push_stream_module_setup.h @@ -74,6 +74,7 @@ static void * ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf); static char * ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); // shared memory +char * ngx_http_push_stream_set_server_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); char * ngx_http_push_stream_set_shm_size_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); ngx_int_t ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data); ngx_int_t ngx_http_push_stream_init_global_shm_zone(ngx_shm_zone_t *shm_zone, void *data); diff --git a/src/ngx_http_push_stream_module.c b/src/ngx_http_push_stream_module.c index ce74dcb9..fa25b702 100644 --- a/src/ngx_http_push_stream_module.c +++ b/src/ngx_http_push_stream_module.c @@ -102,7 +102,12 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request return NGX_HTTP_INTERNAL_SERVER_ERROR; } - ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->wildcard_channels, data->published_messages, data->stored_messages, data->messages_in_trash, data->channels_in_delete, data->channels_in_trash, data->subscribers, ngx_time() - data->startup, subscribers_by_workers); + // Use server-specific counters if available + if (data->server_id.len > 0) { + ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->wildcard_channels, data->server_published_messages, data->server_stored_messages, data->messages_in_trash, data->channels_in_delete, data->channels_in_trash, data->subscribers, ngx_time() - data->startup, subscribers_by_workers); + } else { + ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->wildcard_channels, data->published_messages, data->stored_messages, data->messages_in_trash, data->channels_in_delete, data->channels_in_trash, data->subscribers, ngx_time() - data->startup, subscribers_by_workers); + } text->len = ngx_strlen(text->data); return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK); diff --git a/src/ngx_http_push_stream_module_setup.c b/src/ngx_http_push_stream_module_setup.c index c9ed8f18..9fa95032 100644 --- a/src/ngx_http_push_stream_module_setup.c +++ b/src/ngx_http_push_stream_module_setup.c @@ -127,6 +127,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = { NGX_HTTP_MAIN_CONF_OFFSET, offsetof(ngx_http_push_stream_main_conf_t, events_channel_id), NULL }, + { ngx_string("push_stream_server_id"), + NGX_HTTP_SRV_CONF|NGX_CONF_TAKE1, + ngx_http_push_stream_set_server_id, + 0, + 0, + NULL }, /* Location directives */ { ngx_string("push_stream_channels_path"), @@ -888,6 +894,58 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) } +// server id +char * +ngx_http_push_stream_set_server_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_str_t *value; + ngx_http_push_stream_global_shm_data_t *global_data; + ngx_queue_t *q; + ngx_http_push_stream_shm_data_t *data; + ngx_slab_pool_t *shpool; + ngx_str_t *server_id; + + if (ngx_http_push_stream_global_shm_zone == NULL) { + ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_server_id can only be used after push_stream_shared_memory_size"); + return NGX_CONF_ERROR; + } + + value = cf->args->elts; + global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data; + + for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) { + data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue); + shpool = data->shpool; + + ngx_shmtx_lock(&shpool->mutex); + + // Free previous server_id if it exists + if (data->server_id.data != NULL) { + ngx_slab_free_locked(shpool, data->server_id.data); + data->server_id.data = NULL; + data->server_id.len = 0; + } + + // Allocate and set the new server_id + server_id = &value[1]; + data->server_id.len = server_id->len; + if ((data->server_id.data = ngx_slab_alloc_locked(shpool, server_id->len)) == NULL) { + ngx_shmtx_unlock(&shpool->mutex); + ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to store the server id"); + return NGX_CONF_ERROR; + } + ngx_memcpy(data->server_id.data, server_id->data, server_id->len); + + // Reset the server-specific counters + data->server_published_messages = 0; + data->server_stored_messages = 0; + + ngx_shmtx_unlock(&shpool->mutex); + } + + return NGX_CONF_OK; +} + // shared memory char * ngx_http_push_stream_set_shm_size_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) @@ -1076,6 +1134,10 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) d->published_messages = 0; d->stored_messages = 0; d->subscribers = 0; + d->server_published_messages = 0; + d->server_stored_messages = 0; + d->server_id.len = 0; + d->server_id.data = NULL; d->channels_in_delete = 0; d->channels_in_trash = 0; d->messages_in_trash = 0; diff --git a/src/ngx_http_push_stream_module_utils.c b/src/ngx_http_push_stream_module_utils.c index fdc29618..b837f734 100644 --- a/src/ngx_http_push_stream_module_utils.c +++ b/src/ngx_http_push_stream_module_utils.c @@ -168,6 +168,9 @@ ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data if (qtd_removed > 0) { ngx_shmtx_lock(&data->channels_queue_mutex); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed); + if (data->server_id.len > 0) { + NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->server_stored_messages, qtd_removed); + } ngx_shmtx_unlock(&data->channels_queue_mutex); } @@ -437,6 +440,15 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, n if (store_messages) { data->stored_messages++; } + + // Update server-specific counters + if (data->server_id.len > 0) { + data->server_published_messages++; + if (store_messages) { + data->server_stored_messages++; + } + NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->server_stored_messages, qtd_removed); + } ngx_shmtx_unlock(&data->channels_queue_mutex); } @@ -1064,6 +1076,9 @@ ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data qtd_removed = ngx_http_push_stream_ensure_qtd_of_messages(data, channel, (force) ? 0 : channel->stored_messages, 1); NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed); + if (data->server_id.len > 0) { + NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->server_stored_messages, qtd_removed); + } } ngx_shmtx_unlock(&data->channels_queue_mutex);