Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions newrelic/api/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ def normalize_name(self, name, rule_type="url"):
return self._agent.normalize_name(self._name, name, rule_type)
return name, False

def compute_sampled(self):
def compute_sampled(self, full_granularity, section, *args, **kwargs):
if not self.active or not self.settings.distributed_tracing.enabled:
return False

return self._agent.compute_sampled(self._name)
return self._agent.compute_sampled(self._name, full_granularity, section, *args, **kwargs)


def application_instance(name=None, activate=True):
Expand Down
29 changes: 13 additions & 16 deletions newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ def _update_agent_attributes(self):
def user_attributes(self):
return create_attributes(self._custom_params, DST_ALL, self.attribute_filter)

def sampling_algo_compute_sampled_and_priority(self, priority, sampled):
def sampling_algo_compute_sampled_and_priority(self, priority, sampled, sampler_kwargs):
# self._priority and self._sampled are set when parsing the W3C tracestate
# or newrelic DT headers and may be overridden in _make_sampling_decision
# based on the configuration. The only time they are set in here is when the
Expand All @@ -1016,25 +1016,21 @@ def sampling_algo_compute_sampled_and_priority(self, priority, sampled):
priority = float(f"{random.random():.6f}") # noqa: S311
if sampled is None:
_logger.debug("No trusted account id found. Sampling decision will be made by adaptive sampling algorithm.")
sampled = self._application.compute_sampled()
sampled = self._application.compute_sampled(**sampler_kwargs)
if sampled:
priority += 1
return priority, sampled

def _compute_sampled_and_priority(
self,
priority,
sampled,
remote_parent_sampled_path,
remote_parent_sampled_setting,
remote_parent_not_sampled_path,
remote_parent_not_sampled_setting,
self, priority, sampled, full_granularity, remote_parent_sampled_setting, remote_parent_not_sampled_setting
):
if self._remote_parent_sampled is None:
section = 0
config = "default" # Use sampling algo.
_logger.debug("Sampling decision made based on no remote parent sampling decision present.")
elif self._remote_parent_sampled:
setting_path = remote_parent_sampled_path
section = 1
setting_path = f"distributed_tracing.sampler.{'full_granularity' if full_granularity else 'partial_granularity'}.remote_parent_sampled"
config = remote_parent_sampled_setting
_logger.debug(
"Sampling decision made based on remote_parent_sampled=%s and %s=%s.",
Expand All @@ -1043,7 +1039,8 @@ def _compute_sampled_and_priority(
config,
)
else: # self._remote_parent_sampled is False.
setting_path = remote_parent_not_sampled_path
section = 2
setting_path = f"distributed_tracing.sampler.{'full_granularity' if full_granularity else 'partial_granularity'}.remote_parent_not_sampled"
config = remote_parent_not_sampled_setting
_logger.debug(
"Sampling decision made based on remote_parent_sampled=%s and %s=%s.",
Expand All @@ -1064,7 +1061,9 @@ def _compute_sampled_and_priority(
_logger.debug(
"Let adaptive sampler algorithm decide based on sampled=%s and priority=%s.", sampled, priority
)
priority, sampled = self.sampling_algo_compute_sampled_and_priority(priority, sampled)
priority, sampled = self.sampling_algo_compute_sampled_and_priority(
priority, sampled, {"full_granularity": full_granularity, "section": section}
)
return priority, sampled

def _make_sampling_decision(self):
Expand All @@ -1084,9 +1083,8 @@ def _make_sampling_decision(self):
computed_priority, computed_sampled = self._compute_sampled_and_priority(
priority,
sampled,
remote_parent_sampled_path="distributed_tracing.sampler.full_granularity.remote_parent_sampled",
full_granularity=True,
remote_parent_sampled_setting=self.settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled,
remote_parent_not_sampled_path="distributed_tracing.sampler.full_granularity.remote_parent_not_sampled",
remote_parent_not_sampled_setting=self.settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled,
)
_logger.debug("Full granularity sampling decision was %s with priority=%s.", sampled, priority)
Expand All @@ -1102,9 +1100,8 @@ def _make_sampling_decision(self):
self._priority, self._sampled = self._compute_sampled_and_priority(
priority,
sampled,
remote_parent_sampled_path="distributed_tracing.sampler.partial_granularity.remote_parent_sampled",
full_granularity=False,
remote_parent_sampled_setting=self.settings.distributed_tracing.sampler.partial_granularity.remote_parent_sampled,
remote_parent_not_sampled_path="distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled",
remote_parent_not_sampled_setting=self.settings.distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled,
)
_logger.debug(
Expand Down
4 changes: 2 additions & 2 deletions newrelic/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,9 @@ def normalize_name(self, app_name, name, rule_type="url"):

return application.normalize_name(name, rule_type)

def compute_sampled(self, app_name):
def compute_sampled(self, app_name, full_granularity, section, *args, **kwargs):
application = self._applications.get(app_name, None)
return application.compute_sampled()
return application.compute_sampled(full_granularity, section, *args, **kwargs)

def _harvest_shutdown_is_set(self):
try:
Expand Down
17 changes: 6 additions & 11 deletions newrelic/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from functools import partial

from newrelic.common.object_names import callable_name
from newrelic.core.adaptive_sampler import AdaptiveSampler
from newrelic.core.agent_control_health import (
HealthStatus,
agent_control_health_instance,
Expand All @@ -37,6 +36,7 @@
from newrelic.core.internal_metrics import InternalTrace, InternalTraceContext, internal_count_metric, internal_metric
from newrelic.core.profile_sessions import profile_session_manager
from newrelic.core.rules_engine import RulesEngine, SegmentCollapseEngine
from newrelic.core.samplers.sampler_proxy import SamplerProxy
from newrelic.core.stats_engine import CustomMetrics, StatsEngine
from newrelic.network.exceptions import (
DiscardDataForRequest,
Expand Down Expand Up @@ -78,7 +78,7 @@ def __init__(self, app_name, linked_applications=None):
self._transaction_count = 0
self._last_transaction = 0.0

self.adaptive_sampler = None
self.sampler = None

self._global_events_account = 0

Expand Down Expand Up @@ -156,11 +156,11 @@ def configuration(self):
def active(self):
return self.configuration is not None

def compute_sampled(self):
if self.adaptive_sampler is None:
def compute_sampled(self, full_granularity, section, *args, **kwargs):
if self.sampler is None:
return False

return self.adaptive_sampler.compute_sampled()
return self.sampler.compute_sampled(full_granularity, section, *args, **kwargs)

def dump(self, file):
"""Dumps details about the application to the file object."""
Expand Down Expand Up @@ -501,12 +501,7 @@ def connect_to_data_collector(self, activate_agent):

with self._stats_lock:
self._stats_engine.reset_stats(configuration, reset_stream=True)

if configuration.serverless_mode.enabled:
sampling_target_period = 60.0
else:
sampling_target_period = configuration.sampling_target_period_in_seconds
self.adaptive_sampler = AdaptiveSampler(configuration.sampling_target, sampling_target_period)
self.sampler = SamplerProxy(configuration)

active_session.connect_span_stream(self._stats_engine.span_stream, self.record_custom_metric)

Expand Down
14 changes: 14 additions & 0 deletions newrelic/core/samplers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

34 changes: 34 additions & 0 deletions newrelic/core/samplers/sampler_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from newrelic.core.samplers.adaptive_sampler import AdaptiveSampler


class SamplerProxy:
def __init__(self, settings):
if settings.serverless_mode.enabled:
sampling_target_period = 60.0
else:
sampling_target_period = settings.sampling_target_period_in_seconds
adaptive_sampler = AdaptiveSampler(settings.sampling_target, sampling_target_period)
self._samplers = [adaptive_sampler]

def get_sampler(self, full_granularity, section):
return self._samplers[0]

def compute_sampled(self, full_granularity, section, *args, **kwargs):
"""
full_granularity: True is full granularity, False is partial granularity
section: 0-root, 1-remote_parent_sampled, 2-remote_parent_not_sampled
"""
return self.get_sampler(full_granularity, section).compute_sampled(*args, **kwargs)
12 changes: 6 additions & 6 deletions tests/agent_features/test_distributed_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,11 @@ def test_distributed_trace_remote_parent_sampling_decision_full_granularity(
)
if expected_adaptive_sampling_algo_called:
function_called_decorator = validate_function_called(
"newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled"
"newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled"
)
else:
function_called_decorator = validate_function_not_called(
"newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled"
"newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled"
)

@function_called_decorator
Expand Down Expand Up @@ -684,11 +684,11 @@ def test_distributed_trace_remote_parent_sampling_decision_partial_granularity(
)
if expected_adaptive_sampling_algo_called:
function_called_decorator = validate_function_called(
"newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled"
"newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled"
)
else:
function_called_decorator = validate_function_not_called(
"newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled"
"newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled"
)

@function_called_decorator
Expand Down Expand Up @@ -752,11 +752,11 @@ def test_distributed_trace_remote_parent_sampling_decision_between_full_and_part
)
if expected_adaptive_sampling_algo_called:
function_called_decorator = validate_function_called(
"newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled"
"newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled"
)
else:
function_called_decorator = validate_function_not_called(
"newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled"
"newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled"
)

@function_called_decorator
Expand Down
26 changes: 13 additions & 13 deletions tests/agent_unittests/test_harvest_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,30 +542,30 @@ def test_adaptive_sampling(transaction_node, monkeypatch):
app = Application("Python Agent Test (Harvest Loop)")

# Should always return false for sampling prior to connect
assert app.compute_sampled() is False
assert app.compute_sampled(True, 0) is False

app.connect_to_data_collector(None)

# First harvest, first N should be sampled
for _ in range(settings.sampling_target):
assert app.compute_sampled() is True
assert app.compute_sampled(True, 0) is True

assert app.compute_sampled() is False
assert app.compute_sampled(True, 0) is False

# fix random.randrange to return 0
monkeypatch.setattr(random, "randrange", lambda *args, **kwargs: 0)

# Multiple resets should behave the same
for _ in range(2):
# Set the last_reset to longer than the period so a reset will occur.
app.adaptive_sampler.last_reset = time.time() - app.adaptive_sampler.period
app.sampler.get_sampler(True, 0).last_reset = time.time() - app.sampler.get_sampler(True, 0).period

# Subsequent harvests should allow sampling of 2X the target
for _ in range(2 * settings.sampling_target):
assert app.compute_sampled() is True
assert app.compute_sampled(True, 0) is True

# No further samples should be saved
assert app.compute_sampled() is False
assert app.compute_sampled(True, 0) is False


@override_generic_settings(
Expand Down Expand Up @@ -709,20 +709,20 @@ def test_serverless_mode_adaptive_sampling(time_to_next_reset, computed_count, c
app = Application("Python Agent Test (Harvest Loop)")

app.connect_to_data_collector(None)
app.adaptive_sampler.computed_count = 123
app.adaptive_sampler.last_reset = time.time() - 60 + time_to_next_reset
app.sampler.get_sampler(True, 0).computed_count = 123
app.sampler.get_sampler(True, 0).last_reset = time.time() - 60 + time_to_next_reset

assert app.compute_sampled() is True
assert app.adaptive_sampler.computed_count == computed_count
assert app.adaptive_sampler.computed_count_last == computed_count_last
assert app.compute_sampled(True, 0) is True
assert app.sampler.get_sampler(True, 0).computed_count == computed_count
assert app.sampler.get_sampler(True, 0).computed_count_last == computed_count_last


@validate_function_not_called("newrelic.core.adaptive_sampler", "AdaptiveSampler._reset")
@validate_function_not_called("newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler._reset")
@override_generic_settings(settings, {"developer_mode": True})
def test_compute_sampled_no_reset():
app = Application("Python Agent Test (Harvest Loop)")
app.connect_to_data_collector(None)
assert app.compute_sampled() is True
assert app.compute_sampled(True, 0) is True


def test_analytic_event_sampling_info():
Expand Down
2 changes: 1 addition & 1 deletion tests/cross_agent/test_distributed_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def load_tests():


def override_compute_sampled(override):
@transient_function_wrapper("newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled")
@transient_function_wrapper("newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled")
def _override_compute_sampled(wrapped, instance, args, kwargs):
if override:
return True
Expand Down
2 changes: 1 addition & 1 deletion tests/cross_agent/test_w3c_trace_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def target_wsgi_application(environ, start_response):


def override_compute_sampled(override):
@transient_function_wrapper("newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled")
@transient_function_wrapper("newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled")
def _override_compute_sampled(wrapped, instance, args, kwargs):
if override:
return True
Expand Down
2 changes: 1 addition & 1 deletion tests/testing_support/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ def _bind_params(application, *args, **kwargs):

@function_wrapper
def dt_enabled(wrapped, instance, args, kwargs):
@transient_function_wrapper("newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled")
@transient_function_wrapper("newrelic.core.samplers.adaptive_sampler", "AdaptiveSampler.compute_sampled")
def force_sampled(wrapped, instance, args, kwargs):
wrapped(*args, **kwargs)
return True
Expand Down
Loading