diff --git a/CHANGELOG.textile b/CHANGELOG.textile index 67853f27..9d532538 100644 --- a/CHANGELOG.textile +++ b/CHANGELOG.textile @@ -2,6 +2,8 @@ h1(#changelog). Changelog h2. Version 0.6.0 +* Fix #292 adding support for server-specific channel statistics to prevent information leakage between different server instances + * Fix #307 adding support for Nginx 1.23.0+ * Update gems and other test dependencies * [javascript] Remove deprecated javascript event usage (Thanks @d-javu) diff --git a/README.textile b/README.textile index 6a478a6f..b8a9ac70 100644 --- a/README.textile +++ b/README.textile @@ -135,45 +135,46 @@ There is a javascript client implementation "here":javascript_client, which is f h1(#directives). Directives -(1) Defining locations, (2) Main configuration, (3) Subscribers configuration, (4) Publishers configuration, (5) Channels Statistics configuration, (6) WebSocket configuration - -(head). | Directive | (1) | (2) | (3) | (4) | (5) | (6) | -| "push_stream_channels_statistics":push_stream_channels_statistics | x | - | - | - | - | - | -| "push_stream_publisher":push_stream_publisher | x | - | - | - | - | - | -| "push_stream_subscriber":push_stream_subscriber | x | - | - | - | - | - | -| "push_stream_shared_memory_size":push_stream_shared_memory_size | - | x | - | - | - | - | -| "push_stream_channel_deleted_message_text":push_stream_channel_deleted_message_text | - | x | - | - | - | - | -| "push_stream_channel_inactivity_time":push_stream_channel_inactivity_time | - | x | - | - | - | - | -| "push_stream_ping_message_text":push_stream_ping_message_text | - | x | - | - | - | - | -| "push_stream_timeout_with_body":push_stream_timeout_with_body | - | x | - | - | - | - | -| "push_stream_message_ttl":push_stream_message_ttl | - | x | - | - | - | - | -| "push_stream_max_subscribers_per_channel":push_stream_max_subscribers_per_channel | - | x | - | - | - | - | -| "push_stream_max_messages_stored_per_channel":push_stream_max_messages_stored_per_channel | - | x | - | - | - | - | -| "push_stream_max_channel_id_length":push_stream_max_channel_id_length | - | x | - | - | - | - | -| "push_stream_max_number_of_channels":push_stream_max_number_of_channels | - | x | - | - | - | - | -| "push_stream_max_number_of_wildcard_channels":push_stream_max_number_of_wildcard_channels | - | x | - | - | - | - | -| "push_stream_wildcard_channel_prefix":push_stream_wildcard_channel_prefix | - | x | - | - | - | - | -| "push_stream_events_channel_id":push_stream_events_channel_id | - | x | - | - | - | - | -| "push_stream_channels_path":push_stream_channels_path | - | - | x | x | x | x | -| "push_stream_store_messages":push_stream_store_messages | - | - | - | x | - | x | -| "push_stream_channel_info_on_publish":push_stream_channel_info_on_publish | - | - | - | x | - | - | -| "push_stream_authorized_channels_only":push_stream_authorized_channels_only | - | - | x | - | - | x | -| "push_stream_header_template_file":push_stream_header_template_file | - | - | x | - | - | x | -| "push_stream_header_template":push_stream_header_template | - | - | x | - | - | x | -| "push_stream_message_template":push_stream_message_template | - | - | x | - | - | x | -| "push_stream_footer_template":push_stream_footer_template | - | - | x | - | - | x | -| "push_stream_wildcard_channel_max_qtd":push_stream_wildcard_channel_max_qtd | - | - | x | - | - | x | -| "push_stream_ping_message_interval":push_stream_ping_message_interval | - | - | x | - | - | x | -| "push_stream_subscriber_connection_ttl":push_stream_subscriber_connection_ttl | - | - | x | - | - | x | -| "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl | - | - | x | - | - | - | -| "push_stream_websocket_allow_publish":push_stream_websocket_allow_publish | - | - | - | - | - | x | -| "push_stream_last_received_message_time":push_stream_last_received_message_time | - | - | x | - | - | - | -| "push_stream_last_received_message_tag":push_stream_last_received_message_tag | - | - | x | - | - | - | -| "push_stream_last_event_id":push_stream_last_event_id | - | - | x | - | - | - | -| "push_stream_user_agent":push_stream_user_agent | - | - | x | - | - | - | -| "push_stream_padding_by_user_agent":push_stream_padding_by_user_agent | - | - | x | - | - | - | -| "push_stream_allowed_origins":push_stream_allowed_origins | - | - | x | - | - | - | -| "push_stream_allow_connections_to_events_channel":push_stream_allow_connections_to_events_channel | - | - | x | - | - | x | +(1) Defining locations, (2) Main configuration, (3) Subscribers configuration, (4) Publishers configuration, (5) Channels Statistics configuration, (6) WebSocket configuration, (7) Server configuration + +(head). | Directive | (1) | (2) | (3) | (4) | (5) | (6) | (7) | +| "push_stream_channels_statistics":push_stream_channels_statistics | x | - | - | - | - | - | - | +| "push_stream_publisher":push_stream_publisher | x | - | - | - | - | - | - | +| "push_stream_subscriber":push_stream_subscriber | x | - | - | - | - | - | - | +| "push_stream_shared_memory_size":push_stream_shared_memory_size | - | x | - | - | - | - | - | +| "push_stream_channel_deleted_message_text":push_stream_channel_deleted_message_text | - | x | - | - | - | - | - | +| "push_stream_channel_inactivity_time":push_stream_channel_inactivity_time | - | x | - | - | - | - | - | +| "push_stream_ping_message_text":push_stream_ping_message_text | - | x | - | - | - | - | - | +| "push_stream_timeout_with_body":push_stream_timeout_with_body | - | x | - | - | - | - | - | +| "push_stream_message_ttl":push_stream_message_ttl | - | x | - | - | - | - | - | +| "push_stream_max_subscribers_per_channel":push_stream_max_subscribers_per_channel | - | x | - | - | - | - | - | +| "push_stream_max_messages_stored_per_channel":push_stream_max_messages_stored_per_channel | - | x | - | - | - | - | - | +| "push_stream_max_channel_id_length":push_stream_max_channel_id_length | - | x | - | - | - | - | - | +| "push_stream_max_number_of_channels":push_stream_max_number_of_channels | - | x | - | - | - | - | - | +| "push_stream_max_number_of_wildcard_channels":push_stream_max_number_of_wildcard_channels | - | x | - | - | - | - | - | +| "push_stream_wildcard_channel_prefix":push_stream_wildcard_channel_prefix | - | x | - | - | - | - | - | +| "push_stream_events_channel_id":push_stream_events_channel_id | - | x | - | - | - | - | - | +| "push_stream_server_id":push_stream_server_id | - | - | - | - | - | - | x | +| "push_stream_channels_path":push_stream_channels_path | - | - | x | x | x | x | - | +| "push_stream_store_messages":push_stream_store_messages | - | - | - | x | - | x | - | +| "push_stream_channel_info_on_publish":push_stream_channel_info_on_publish | - | - | - | x | - | - | - | +| "push_stream_authorized_channels_only":push_stream_authorized_channels_only | - | - | x | - | - | x | - | +| "push_stream_header_template_file":push_stream_header_template_file | - | - | x | - | - | x | - | +| "push_stream_header_template":push_stream_header_template | - | - | x | - | - | x | - | +| "push_stream_message_template":push_stream_message_template | - | - | x | - | - | x | - | +| "push_stream_footer_template":push_stream_footer_template | - | - | x | - | - | x | - | +| "push_stream_wildcard_channel_max_qtd":push_stream_wildcard_channel_max_qtd | - | - | x | - | - | x | - | +| "push_stream_ping_message_interval":push_stream_ping_message_interval | - | - | x | - | - | x | - | +| "push_stream_subscriber_connection_ttl":push_stream_subscriber_connection_ttl | - | - | x | - | - | x | - | +| "push_stream_longpolling_connection_ttl":push_stream_longpolling_connection_ttl | - | - | x | - | - | - | - | +| "push_stream_websocket_allow_publish":push_stream_websocket_allow_publish | - | - | - | - | - | x | - | +| "push_stream_last_received_message_time":push_stream_last_received_message_time | - | - | x | - | - | - | - | +| "push_stream_last_received_message_tag":push_stream_last_received_message_tag | - | - | x | - | - | - | - | +| "push_stream_last_event_id":push_stream_last_event_id | - | - | x | - | - | - | - | +| "push_stream_user_agent":push_stream_user_agent | - | - | x | - | - | - | - | +| "push_stream_padding_by_user_agent":push_stream_padding_by_user_agent | - | - | x | - | - | - | - | +| "push_stream_allowed_origins":push_stream_allowed_origins | - | - | x | - | - | - | - | +| "push_stream_allow_connections_to_events_channel":push_stream_allow_connections_to_events_channel | - | - | x | - | - | x | - | h1(#installation). Installation @@ -284,5 +285,6 @@ h1(#contributors). Contributors [push_stream_allowed_origins]docs/directives/subscribers.textile#push_stream_allowed_origins [push_stream_websocket_allow_publish]docs/directives/subscribers.textile#push_stream_websocket_allow_publish [push_stream_allow_connections_to_events_channel]docs/directives/subscribers.textile#push_stream_allow_connections_to_events_channel +[push_stream_server_id]docs/directives/server.textile#push_stream_server_id [wiki]https://github.com/wandenberg/nginx-push-stream-module/wiki/_pages [nginx_debugging]http://wiki.nginx.org/Debugging diff --git a/docs/directives/server.textile b/docs/directives/server.textile new file mode 100644 index 00000000..ab65e2c7 --- /dev/null +++ b/docs/directives/server.textile @@ -0,0 +1,26 @@ +h1(#server_configuration). Server Configuration + +h2(#push_stream_server_id). push_stream_server_id + +*syntax:* _push_stream_server_id string_ + +*default:* _none_ + +*context:* _server_ + +Sets a unique identifier for the server instance. This is used to separate statistics between different server instances running on the same host. When this directive is set, the channels-stats endpoint will only report published_messages and stored_messages for the current server instance, preventing information leakage between different server instances. + +Example: +
+server {
+    listen 5601;
+    push_stream_server_id "server_5601";
+    ...
+}
+
+server {
+    listen 5602;
+    push_stream_server_id "server_5602";
+    ...
+}
+
diff --git a/include/ngx_http_push_stream_module.h b/include/ngx_http_push_stream_module.h
index 3f6303ce..fbe0d3c0 100644
--- a/include/ngx_http_push_stream_module.h
+++ b/include/ngx_http_push_stream_module.h
@@ -276,6 +276,9 @@ struct ngx_http_push_stream_shm_data_s {
     ngx_uint_t                              published_messages; // # of published messagens in all channels
     ngx_uint_t                              stored_messages;    // # of messages being stored
     ngx_uint_t                              subscribers;        // # of subscribers in all channels
+    ngx_uint_t                              server_published_messages; // # of published messages in current server instance
+    ngx_uint_t                              server_stored_messages;    // # of messages being stored in current server instance
+    ngx_str_t                               server_id;          // server instance identifier
     ngx_queue_t                             messages_trash;
     ngx_shmtx_t                             messages_trash_mutex;
     ngx_shmtx_sh_t                          messages_trash_lock;
diff --git a/include/ngx_http_push_stream_module_setup.h b/include/ngx_http_push_stream_module_setup.h
index c6706132..5f338f1b 100644
--- a/include/ngx_http_push_stream_module_setup.h
+++ b/include/ngx_http_push_stream_module_setup.h
@@ -74,6 +74,7 @@ static void *       ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf);
 static char *       ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
 
 // shared memory
+char *              ngx_http_push_stream_set_server_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
 char *              ngx_http_push_stream_set_shm_size_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
 ngx_int_t           ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data);
 ngx_int_t           ngx_http_push_stream_init_global_shm_zone(ngx_shm_zone_t *shm_zone, void *data);
diff --git a/src/ngx_http_push_stream_module.c b/src/ngx_http_push_stream_module.c
index ce74dcb9..fa25b702 100644
--- a/src/ngx_http_push_stream_module.c
+++ b/src/ngx_http_push_stream_module.c
@@ -102,7 +102,12 @@ ngx_http_push_stream_send_response_all_channels_info_summarized(ngx_http_request
         return NGX_HTTP_INTERNAL_SERVER_ERROR;
     }
 
-    ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->wildcard_channels, data->published_messages, data->stored_messages, data->messages_in_trash, data->channels_in_delete, data->channels_in_trash, data->subscribers, ngx_time() - data->startup, subscribers_by_workers);
+    // Use server-specific counters if available
+    if (data->server_id.len > 0) {
+        ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->wildcard_channels, data->server_published_messages, data->server_stored_messages, data->messages_in_trash, data->channels_in_delete, data->channels_in_trash, data->subscribers, ngx_time() - data->startup, subscribers_by_workers);
+    } else {
+        ngx_sprintf(text->data, (char *) subtype->format_summarized->data, hostname->data, currenttime->data, data->channels, data->wildcard_channels, data->published_messages, data->stored_messages, data->messages_in_trash, data->channels_in_delete, data->channels_in_trash, data->subscribers, ngx_time() - data->startup, subscribers_by_workers);
+    }
     text->len = ngx_strlen(text->data);
 
     return ngx_http_push_stream_send_response(r, text, subtype->content_type, NGX_HTTP_OK);
diff --git a/src/ngx_http_push_stream_module_setup.c b/src/ngx_http_push_stream_module_setup.c
index c9ed8f18..9fa95032 100644
--- a/src/ngx_http_push_stream_module_setup.c
+++ b/src/ngx_http_push_stream_module_setup.c
@@ -127,6 +127,12 @@ static ngx_command_t    ngx_http_push_stream_commands[] = {
         NGX_HTTP_MAIN_CONF_OFFSET,
         offsetof(ngx_http_push_stream_main_conf_t, events_channel_id),
         NULL },
+    { ngx_string("push_stream_server_id"),
+        NGX_HTTP_SRV_CONF|NGX_CONF_TAKE1,
+        ngx_http_push_stream_set_server_id,
+        0,
+        0,
+        NULL },
 
     /* Location directives */
     { ngx_string("push_stream_channels_path"),
@@ -888,6 +894,58 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
 }
 
 
+// server id
+char *
+ngx_http_push_stream_set_server_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+    ngx_str_t                           *value;
+    ngx_http_push_stream_global_shm_data_t *global_data;
+    ngx_queue_t                            *q;
+    ngx_http_push_stream_shm_data_t        *data;
+    ngx_slab_pool_t                        *shpool;
+    ngx_str_t                              *server_id;
+
+    if (ngx_http_push_stream_global_shm_zone == NULL) {
+        ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push_stream_server_id can only be used after push_stream_shared_memory_size");
+        return NGX_CONF_ERROR;
+    }
+
+    value = cf->args->elts;
+    global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
+
+    for (q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) {
+        data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue);
+        shpool = data->shpool;
+
+        ngx_shmtx_lock(&shpool->mutex);
+
+        // Free previous server_id if it exists
+        if (data->server_id.data != NULL) {
+            ngx_slab_free_locked(shpool, data->server_id.data);
+            data->server_id.data = NULL;
+            data->server_id.len = 0;
+        }
+
+        // Allocate and set the new server_id
+        server_id = &value[1];
+        data->server_id.len = server_id->len;
+        if ((data->server_id.data = ngx_slab_alloc_locked(shpool, server_id->len)) == NULL) {
+            ngx_shmtx_unlock(&shpool->mutex);
+            ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: unable to allocate memory to store the server id");
+            return NGX_CONF_ERROR;
+        }
+        ngx_memcpy(data->server_id.data, server_id->data, server_id->len);
+
+        // Reset the server-specific counters
+        data->server_published_messages = 0;
+        data->server_stored_messages = 0;
+
+        ngx_shmtx_unlock(&shpool->mutex);
+    }
+
+    return NGX_CONF_OK;
+}
+
 // shared memory
 char *
 ngx_http_push_stream_set_shm_size_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
@@ -1076,6 +1134,10 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
     d->published_messages = 0;
     d->stored_messages = 0;
     d->subscribers = 0;
+    d->server_published_messages = 0;
+    d->server_stored_messages = 0;
+    d->server_id.len = 0;
+    d->server_id.data = NULL;
     d->channels_in_delete = 0;
     d->channels_in_trash = 0;
     d->messages_in_trash = 0;
diff --git a/src/ngx_http_push_stream_module_utils.c b/src/ngx_http_push_stream_module_utils.c
index fdc29618..b837f734 100644
--- a/src/ngx_http_push_stream_module_utils.c
+++ b/src/ngx_http_push_stream_module_utils.c
@@ -168,6 +168,9 @@ ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data
         if (qtd_removed > 0) {
             ngx_shmtx_lock(&data->channels_queue_mutex);
             NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed);
+            if (data->server_id.len > 0) {
+                NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->server_stored_messages, qtd_removed);
+            }
             ngx_shmtx_unlock(&data->channels_queue_mutex);
         }
 
@@ -437,6 +440,15 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, n
         if (store_messages) {
             data->stored_messages++;
         }
+
+        // Update server-specific counters
+        if (data->server_id.len > 0) {
+            data->server_published_messages++;
+            if (store_messages) {
+                data->server_stored_messages++;
+            }
+            NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->server_stored_messages, qtd_removed);
+        }
         ngx_shmtx_unlock(&data->channels_queue_mutex);
     }
 
@@ -1064,6 +1076,9 @@ ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data
 
         qtd_removed = ngx_http_push_stream_ensure_qtd_of_messages(data, channel, (force) ? 0 : channel->stored_messages, 1);
         NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->stored_messages, qtd_removed);
+        if (data->server_id.len > 0) {
+            NGX_HTTP_PUSH_STREAM_DECREMENT_COUNTER_BY(data->server_stored_messages, qtd_removed);
+        }
     }
 
     ngx_shmtx_unlock(&data->channels_queue_mutex);