Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## 1.0.0b45 (Unreleased)

### Features Added
- Added new log record processor to support Trace Based Sampling
([#43811](https://github.com/Azure/azure-sdk-for-python/pull/43811))
- Added Operation Name Propagation for Dependencies and Logs
([#43588](https://github.com/Azure/azure-sdk-for-python/pull/43588))
- Added local storage support for multiple users on the same Linux system
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from typing import Optional, Dict, Any

from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
from opentelemetry.trace import get_current_span


class _AzureBatchLogRecordProcessor(BatchLogRecordProcessor):
"""Azure Monitor Log Record Processor with support for trace-based sampling."""

def __init__(
self,
log_exporter: LogExporter,
options: Optional[Dict[str, Any]] = None,
):
"""Initialize the Azure Monitor Log Record Processor.

:param exporter: The LogRecordExporter to use for exporting logs.
:param options: Optional configuration dictionary. Supported options:
- enable_trace_based_sampling_for_logs(bool): Enable trace-based sampling for logs.
"""
super().__init__(log_exporter)
self._options = options or {}
self._enable_trace_based_sampling_for_logs = self._options.get("enable_trace_based_sampling_for_logs")

def on_emit(self, log_data: LogData) -> None:
# cspell: disable
""" Determines whether the logger should drop log records associated with unsampled traces.
If `trace_based_sampling` is `true`, log records associated with unsampled traces are dropped by the `Logger`.
A log record is considered associated with an unsampled trace if it has a valid `SpanId` and its
`TraceFlags` indicate that the trace is unsampled. A log record that isn't associated with a trace
context is not affected by this parameter and therefore bypasses trace based sampling filtering.

:param log_data: Contains the log record to be exported
:type log_data: LogData
"""

# cspell: enable
if self._enable_trace_based_sampling_for_logs:
if hasattr(log_data, "log_record") and log_data.log_record is not None:
if hasattr(log_data.log_record, "context") and log_data.log_record.context is not None:
span = get_current_span(log_data.log_record.context)
span_context = span.get_span_context()
if span_context.is_valid and not span_context.trace_flags.sampled:
return
super().on_emit(log_data)
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
import os
import unittest
from unittest import mock

from opentelemetry.sdk import _logs
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry._logs.severity import SeverityNumber
from opentelemetry.trace import TraceFlags

from azure.monitor.opentelemetry.exporter.export.logs._exporter import (
AzureMonitorLogExporter,
)
from azure.monitor.opentelemetry.exporter.export.logs._processor import (
_AzureBatchLogRecordProcessor,
)


# pylint: disable=protected-access
class TestAzureBatchLogRecordProcessor(unittest.TestCase):
"""Test cases for the Azure Monitor Batch Log Record Processor with trace-based sampling."""

@classmethod
def setUpClass(cls):
os.environ.pop("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL", None)
os.environ.pop("APPINSIGHTS_INSTRUMENTATIONKEY", None)
os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = "1234abcd-5678-4efa-8abc-1234567890ab"
os.environ["APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"] = "true"
cls._exporter = AzureMonitorLogExporter()

def test_processor_initialization_without_trace_based_sampling(self):
"""Test processor initialization without trace-based sampling enabled."""
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={}
)
self.assertFalse(processor._enable_trace_based_sampling_for_logs)

def test_processor_initialization_with_trace_based_sampling(self):
"""Test processor initialization with trace-based sampling enabled."""
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={"enable_trace_based_sampling_for_logs": True}
)
self.assertTrue(processor._enable_trace_based_sampling_for_logs)

def test_processor_initialization_without_options(self):
"""Test processor initialization without options."""
processor = _AzureBatchLogRecordProcessor(self._exporter)
self.assertIsNone(processor._enable_trace_based_sampling_for_logs)

def test_on_emit_with_trace_based_sampling_disabled(self):
"""Test on_emit does not filter logs when trace-based sampling is disabled."""
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={}
)

mock_context = mock.Mock()
mock_span_context = mock.Mock()
mock_span_context.is_valid = True
mock_span_context.trace_flags.sampled = False

mock_span = mock.Mock()
mock_span.get_span_context.return_value = mock_span_context

log_record = _logs.LogData(
_logs.LogRecord(
timestamp=1646865018558419456,
trace_id=125960616039069540489478540494783893221,
span_id=2909973987304607650,
severity_text="INFO",
trace_flags=TraceFlags.DEFAULT,
severity_number=SeverityNumber.INFO,
body="Test log",
context=mock_context
),
InstrumentationScope("test_name"),
)

# Mock the parent class's on_emit method through super
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
processor.on_emit(log_record)
# Parent on_emit should be called because trace-based sampling is disabled
parent_on_emit_mock.assert_called_once()

def test_on_emit_with_trace_based_sampling_enabled_and_unsampled_trace(self): # cspell:disable-line
"""Test on_emit filters logs when trace-based sampling is enabled and trace is unsampled.""" # cspell:disable-line
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={"enable_trace_based_sampling_for_logs": True}
)

mock_context = mock.Mock()
mock_span_context = mock.Mock()
mock_span_context.is_valid = True
mock_span_context.trace_flags.sampled = False

mock_span = mock.Mock()
mock_span.get_span_context.return_value = mock_span_context

log_record = _logs.LogData(
_logs.LogRecord(
timestamp=1646865018558419456,
trace_id=125960616039069540489478540494783893221,
span_id=2909973987304607650,
severity_text="INFO",
trace_flags=TraceFlags.DEFAULT,
severity_number=SeverityNumber.INFO,
body="Test log",
context=mock_context
),
InstrumentationScope("test_name"),
)
# Mock get_current_span to return our mock span with proper get_span_context method
with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
# Mock only the parent class's on_emit method
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
processor.on_emit(log_record)
# Parent on_emit should NOT be called because trace is unsampled and filtering is enabled # cspell:disable-line
parent_on_emit_mock.assert_not_called()

def test_on_emit_with_trace_based_sampling_enabled_and_sampled_trace(self):
"""Test on_emit does not filter logs when trace-based sampling is enabled and trace is sampled."""
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={"enable_trace_based_sampling_for_logs": True}
)

mock_context = mock.Mock()
mock_span_context = mock.Mock()
mock_span_context.is_valid = True
mock_span_context.trace_flags.sampled = True

mock_span = mock.Mock()
mock_span.get_span_context.return_value = mock_span_context

log_record = _logs.LogData(
_logs.LogRecord(
timestamp=1646865018558419456,
trace_id=125960616039069540489478540494783893221,
span_id=2909973987304607650,
severity_text="INFO",
trace_flags=TraceFlags.SAMPLED,
severity_number=SeverityNumber.INFO,
body="Test log",
context=mock_context
),
InstrumentationScope("test_name"),
)

with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
processor.on_emit(log_record)
# Parent on_emit should be called because trace is sampled
parent_on_emit_mock.assert_called_once()

def test_on_emit_with_trace_based_sampling_enabled_and_invalid_span_context(self):
"""Test on_emit does not filter logs with invalid span context."""
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={"enable_trace_based_sampling_for_logs": True}
)

mock_context = mock.Mock()
mock_span_context = mock.Mock()
mock_span_context.is_valid = False

mock_span = mock.Mock()
mock_span.get_span_context.return_value = mock_span_context

log_record = _logs.LogData(
_logs.LogRecord(
timestamp=1646865018558419456,
trace_id=125960616039069540489478540494783893221,
span_id=2909973987304607650,
severity_text="INFO",
trace_flags=TraceFlags.DEFAULT,
severity_number=SeverityNumber.INFO,
body="Test log",
context=mock_context
),
InstrumentationScope("test_name"),
)

with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
processor.on_emit(log_record)
# Parent on_emit should be called because span context is invalid
parent_on_emit_mock.assert_called_once()

def test_on_emit_with_trace_based_sampling_enabled_and_no_context(self):
"""Test on_emit does not filter logs when there is no log record context."""
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={"enable_trace_based_sampling_for_logs": True}
)

log_record = _logs.LogData(
_logs.LogRecord(
timestamp=1646865018558419456,
trace_id=125960616039069540489478540494783893221,
span_id=2909973987304607650,
severity_text="INFO",
trace_flags=TraceFlags.DEFAULT,
severity_number=SeverityNumber.INFO,
body="Test log",
context=None
),
InstrumentationScope("test_name"),
)

with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
processor.on_emit(log_record)
# Parent on_emit should be called because there's no context
parent_on_emit_mock.assert_called_once()

def test_on_emit_integration_with_multiple_log_records(self):
"""Integration test: verify processor handles multiple log records correctly with trace-based sampling."""
processor = _AzureBatchLogRecordProcessor(
self._exporter,
options={"enable_trace_based_sampling_for_logs": True}
)

mock_context = mock.Mock()

# Create unsampled span context # cspell:disable-line
mock_span_context_unsampled = mock.Mock() # cspell:disable-line
mock_span_context_unsampled.is_valid = True # cspell:disable-line
mock_span_context_unsampled.trace_flags.sampled = False # cspell:disable-line

mock_span_unsampled = mock.Mock() # cspell:disable-line
mock_span_unsampled.get_span_context.return_value = mock_span_context_unsampled # cspell:disable-line

# Create sampled span context
mock_span_context_sampled = mock.Mock()
mock_span_context_sampled.is_valid = True
mock_span_context_sampled.trace_flags.sampled = True

mock_span_sampled = mock.Mock()
mock_span_sampled.get_span_context.return_value = mock_span_context_sampled

log_record_unsampled = _logs.LogData( # cspell:disable-line
_logs.LogRecord(
timestamp=1646865018558419456,
trace_id=125960616039069540489478540494783893221,
span_id=2909973987304607650,
severity_text="INFO",
trace_flags=TraceFlags.DEFAULT,
severity_number=SeverityNumber.INFO,
body="Unsampled log", # cspell:disable-line
context=mock_context
),
InstrumentationScope("test_name"),
)

log_record_sampled = _logs.LogData(
_logs.LogRecord(
timestamp=1646865018558419457,
trace_id=125960616039069540489478540494783893222,
span_id=2909973987304607651,
severity_text="INFO",
trace_flags=TraceFlags.SAMPLED,
severity_number=SeverityNumber.INFO,
body="Sampled log",
context=mock_context
),
InstrumentationScope("test_name"),
)

with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span") as get_span_mock:
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
# Test unsampled log is filtered # cspell:disable-line
get_span_mock.return_value = mock_span_unsampled # cspell:disable-line
processor.on_emit(log_record_unsampled) # cspell:disable-line
parent_on_emit_mock.assert_not_called()

# Reset mock
parent_on_emit_mock.reset_mock()
get_span_mock.reset_mock()

# Test sampled log is not filtered
get_span_mock.return_value = mock_span_sampled
processor.on_emit(log_record_sampled)
parent_on_emit_mock.assert_called_once()
2 changes: 2 additions & 0 deletions sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Add `trace_based_sampling` logger parameters to filter logs
([#43811](https://github.com/Azure/azure-sdk-for-python/pull/43811))
- Performance Counters
([#43262](https://github.com/Azure/azure-sdk-for-python/pull/43262))
- Adding more diagnostic log message IDs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
SAMPLING_TRACES_PER_SECOND_ARG,
SPAN_PROCESSORS_ARG,
VIEWS_ARG,
ENABLE_TRACE_BASED_SAMPLING_ARG,
)
from azure.monitor.opentelemetry._types import ConfigurationValue
from azure.monitor.opentelemetry.exporter._quickpulse import ( # pylint: disable=import-error,no-name-in-module
Expand Down Expand Up @@ -109,6 +110,8 @@ def configure_azure_monitor(**kwargs) -> None: # pylint: disable=C4758
`<tempfile.gettempdir()>/Microsoft/AzureMonitor/opentelemetry-python-<your-instrumentation-key>`.
:keyword list[~opentelemetry.sdk.metrics.view.View] views: List of `View` objects to configure and filter
metric output.
:keyword bool enable_trace_based_sampling_for_logs: Boolean value to determine whether to enable trace based
sampling for logs. Defaults to `False`
:rtype: None
"""

Expand Down Expand Up @@ -203,7 +206,7 @@ def _setup_logging(configurations: Dict[str, ConfigurationValue]):
try:
from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from azure.monitor.opentelemetry.exporter.export.logs._processor import _AzureBatchLogRecordProcessor

from azure.monitor.opentelemetry.exporter import ( # pylint: disable=import-error,no-name-in-module
AzureMonitorLogExporter
Expand All @@ -212,15 +215,17 @@ def _setup_logging(configurations: Dict[str, ConfigurationValue]):
resource: Resource = configurations[RESOURCE_ARG] # type: ignore
enable_performance_counters_config = configurations[ENABLE_PERFORMANCE_COUNTERS_ARG]
logger_provider = LoggerProvider(resource=resource)
enable_trace_based_sampling_for_logs = configurations[ENABLE_TRACE_BASED_SAMPLING_ARG]
if configurations.get(ENABLE_LIVE_METRICS_ARG):
qlp = _QuickpulseLogRecordProcessor()
logger_provider.add_log_record_processor(qlp)
if enable_performance_counters_config:
pclp = _PerformanceCountersLogRecordProcessor()
logger_provider.add_log_record_processor(pclp)
log_exporter = AzureMonitorLogExporter(**configurations)
log_record_processor = BatchLogRecordProcessor(
log_record_processor = _AzureBatchLogRecordProcessor(
log_exporter,
{"enable_trace_based_sampling_for_logs": enable_trace_based_sampling_for_logs},
)
logger_provider.add_log_record_processor(log_record_processor)
set_logger_provider(logger_provider)
Expand Down
Loading