2323 MetricReader ,
2424)
2525
26- from azure .core .exceptions import HttpResponseError
2726from azure .core .pipeline .policies import ContentDecodePolicy
2827from azure .monitor .opentelemetry .exporter ._quickpulse ._constants import (
2928 _LONG_PING_INTERVAL_SECONDS ,
3029 _POST_CANCEL_INTERVAL_SECONDS ,
3130 _POST_INTERVAL_SECONDS ,
31+ _QUICKPULSE_ETAG_HEADER_NAME ,
3232 _QUICKPULSE_SUBSCRIBED_HEADER_NAME ,
3333)
3434from azure .monitor .opentelemetry .exporter ._quickpulse ._generated ._configuration import QuickpulseClientConfiguration
3535from azure .monitor .opentelemetry .exporter ._quickpulse ._generated ._client import QuickpulseClient
3636from azure .monitor .opentelemetry .exporter ._quickpulse ._generated .models import MonitoringDataPoint
3737from azure .monitor .opentelemetry .exporter ._quickpulse ._policy import _QuickpulseRedirectPolicy
3838from azure .monitor .opentelemetry .exporter ._quickpulse ._state import (
39+ _get_and_clear_quickpulse_documents ,
3940 _get_global_quickpulse_state ,
41+ _get_quickpulse_etag ,
4042 _is_ping_state ,
4143 _set_global_quickpulse_state ,
42- _get_and_clear_quickpulse_documents ,
44+ _set_quickpulse_etag ,
4345 _QuickpulseState ,
4446)
4547from azure .monitor .opentelemetry .exporter ._quickpulse ._utils import (
4648 _metric_to_quick_pulse_data_points ,
49+ _update_filter_configuration ,
4750)
4851from azure .monitor .opentelemetry .exporter ._connection_string_parser import ConnectionStringParser
4952from azure .monitor .opentelemetry .exporter ._utils import (
@@ -143,13 +146,14 @@ def export(
143146 base_monitoring_data_point = base_monitoring_data_point ,
144147 documents = _get_and_clear_quickpulse_documents (),
145148 )
146-
149+ configuration_etag = _get_quickpulse_etag () or ""
147150 token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
148151 try :
149152 post_response = self ._client .publish ( # type: ignore
150153 endpoint = self ._live_endpoint ,
151154 monitoring_data_points = data_points ,
152- ikey = self ._instrumentation_key ,
155+ ikey = self ._instrumentation_key , # type: ignore
156+ configuration_etag = configuration_etag ,
153157 transmission_time = _ticks_since_dot_net_epoch (),
154158 cls = _Response ,
155159 )
@@ -163,6 +167,19 @@ def export(
163167 if header != "true" :
164168 # User leaving the live metrics page will be treated as an unsuccessful
165169 result = MetricExportResult .FAILURE
170+ else :
171+ # Check if etag has changed
172+ etag = post_response ._response_headers .get ( # pylint: disable=protected-access
173+ _QUICKPULSE_ETAG_HEADER_NAME # pylint: disable=protected-access
174+ )
175+ if etag and etag != configuration_etag :
176+ config = (
177+ post_response ._pipeline_response .http_response .content # pylint: disable=protected-access
178+ )
179+ # Content will only be populated if configuration has changed (etag is different)
180+ if config :
181+ # Update and apply configuration changes
182+ _update_filter_configuration (etag , config )
166183 except Exception : # pylint: disable=broad-except,invalid-name
167184 _logger .exception ("Exception occurred while publishing live metrics." )
168185 result = MetricExportResult .FAILURE
@@ -201,21 +218,23 @@ def shutdown(
201218 def _ping (self , monitoring_data_point : MonitoringDataPoint ) -> Optional [_Response ]:
202219 ping_response = None
203220 token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
221+ etag = _get_quickpulse_etag () or ""
204222 try :
205223 ping_response = self ._client .is_subscribed ( # type: ignore
206224 endpoint = self ._live_endpoint ,
207225 monitoring_data_point = monitoring_data_point ,
208- ikey = self ._instrumentation_key ,
226+ ikey = self ._instrumentation_key , # type: ignore
209227 transmission_time = _ticks_since_dot_net_epoch (),
210228 machine_name = monitoring_data_point .machine_name ,
211229 instance_name = monitoring_data_point .instance ,
212230 stream_id = monitoring_data_point .stream_id ,
213231 role_name = monitoring_data_point .role_name ,
214- invariant_version = monitoring_data_point .invariant_version ,
232+ invariant_version = monitoring_data_point .invariant_version , # type: ignore
233+ configuration_etag = etag ,
215234 cls = _Response ,
216235 )
217236 return ping_response # type: ignore
218- except HttpResponseError :
237+ except Exception : # pylint: disable=broad-except,invalid-name
219238 _logger .exception ("Exception occurred while pinging live metrics." )
220239 detach (token )
221240 return ping_response
@@ -243,28 +262,42 @@ def __init__(
243262 )
244263 self ._worker .start ()
245264
265+ # pylint: disable=protected-access
266+ # pylint: disable=too-many-nested-blocks
246267 def _ticker (self ) -> None :
247268 if _is_ping_state ():
248269 # Send a ping if elapsed number of request meets the threshold
249270 if self ._elapsed_num_seconds % _get_global_quickpulse_state ().value == 0 :
250- ping_response = self ._exporter ._ping ( # pylint: disable=protected-access
271+ ping_response = self ._exporter ._ping (
251272 self ._base_monitoring_data_point ,
252273 )
253274 if ping_response :
254- header = ping_response ._response_headers .get ( # pylint: disable=protected-access
255- _QUICKPULSE_SUBSCRIBED_HEADER_NAME
256- )
257- if header and header == "true" :
258- # Switch state to post if subscribed
259- _set_global_quickpulse_state (_QuickpulseState .POST_SHORT )
260- self ._elapsed_num_seconds = 0
261- else :
262- # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
263- if (
264- _get_global_quickpulse_state () is _QuickpulseState .PING_SHORT
265- and self ._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
266- ):
267- _set_global_quickpulse_state (_QuickpulseState .PING_LONG )
275+ try :
276+ subscribed = ping_response ._response_headers .get (_QUICKPULSE_SUBSCRIBED_HEADER_NAME )
277+ if subscribed and subscribed == "true" :
278+ # Switch state to post if subscribed
279+ _set_global_quickpulse_state (_QuickpulseState .POST_SHORT )
280+ self ._elapsed_num_seconds = 0
281+ # Update config etag
282+ etag = ping_response ._response_headers .get (_QUICKPULSE_ETAG_HEADER_NAME )
283+ if etag is None :
284+ etag = ""
285+ if _get_quickpulse_etag () != etag :
286+ _set_quickpulse_etag (etag )
287+ # TODO: Set default document filter config from response body
288+ # config = ping_response._pipeline_response.http_response.content
289+ else :
290+ # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
291+ if (
292+ _get_global_quickpulse_state () is _QuickpulseState .PING_SHORT
293+ and self ._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
294+ ):
295+ _set_global_quickpulse_state (_QuickpulseState .PING_LONG )
296+ # Reset etag to default if not subscribed
297+ _set_quickpulse_etag ("" )
298+ except Exception : # pylint: disable=broad-except,invalid-name
299+ _logger .exception ("Exception occurred while pinging live metrics." )
300+ _set_quickpulse_etag ("" )
268301 # TODO: Implement redirect
269302 else :
270303 # Erroneous ping responses instigate backoff logic
@@ -274,6 +307,8 @@ def _ticker(self) -> None:
274307 and self ._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
275308 ):
276309 _set_global_quickpulse_state (_QuickpulseState .PING_LONG )
310+ # Reset etag to default if error
311+ _set_quickpulse_etag ("" )
277312 else :
278313 try :
279314 self .collect ()
@@ -283,6 +318,8 @@ def _ticker(self) -> None:
283318 # And resume pinging
284319 if self ._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS :
285320 _set_global_quickpulse_state (_QuickpulseState .PING_SHORT )
321+ # Reset etag to default
322+ _set_quickpulse_etag ("" )
286323 self ._elapsed_num_seconds = 0
287324
288325 self ._elapsed_num_seconds += 1
0 commit comments