Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement custom events in Azure #925

Merged
merged 5 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions contrib/opencensus-ext-azure/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ Modifying Logs
logger.addHandler(handler)
logger.warning('Hello, World!')

Events
######

You can send `customEvent` telemetry in exactly the same way you would send `trace` telemetry except using the `AzureEventHandler` instead.

.. code:: python

import logging

from opencensus.ext.azure.log_exporter import AzureEventHandler

logger = logging.getLogger(__name__)
logger.addHandler(AzureEventHandler(connection_string='InstrumentationKey=<your-instrumentation_key-here>'))
logger.setLevel(logging.INFO)
logger.info('Hello, World!')

Metrics
~~~~~~~
Expand Down
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/correlated.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@
with tracer.span(name='test'):
logger.warning('In the span')
logger.warning('After the span')

input("...")
1 change: 1 addition & 0 deletions contrib/opencensus-ext-azure/examples/logs/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ def main():

if __name__ == '__main__':
main()
input("...")
27 changes: 27 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2019, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from opencensus.ext.azure.log_exporter import AzureEventHandler

logger = logging.getLogger(__name__)
# TODO: you need to specify the instrumentation key in a connection string
# and place it in the APPLICATIONINSIGHTS_CONNECTION_STRING
# environment variable.
logger.addHandler(AzureEventHandler())
logger.setLevel(logging.INFO)
logger.info('Hello, World!')

input("...")
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@
result = 1 / 0 # generate a ZeroDivisionError
except Exception:
logger.exception('Captured an exception.', extra=properties)

input("...")
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@
# environment variable.
logger.addHandler(AzureLogHandler())
logger.warning('Hello, World!')

input("...")
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from opencensus.ext.azure.common.protocol import (
Data,
Envelope,
Event,
ExceptionData,
Message,
)
Expand All @@ -33,17 +34,51 @@

logger = logging.getLogger(__name__)

__all__ = ['AzureLogHandler']
__all__ = ['AzureEventHandler', 'AzureLogHandler']


class BaseLogHandler(logging.Handler):
def __init__(self):

def __init__(self, **options):
super(BaseLogHandler, self).__init__()
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
if not 0 <= self.options.logging_sampling_rate <= 1:
raise ValueError('Sampling must be in the range: [0,1]')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)
self._telemetry_processors = []
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))
self._queue = Queue(capacity=8192) # TODO: make this configurable
self._worker = Worker(self._queue, self)
self._worker.start()

def _export(self, batch, event=None): # pragma: NO COVER
try:
if batch:
envelopes = [self.log_record_to_envelope(x) for x in batch]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If apply_telemetry_processors() just needs an iterable, could use generator

Suggested change
envelopes = [self.log_record_to_envelope(x) for x in batch]
envelopes = (self.log_record_to_envelope(x) for x in batch)

envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
if result > 0:
self.storage.put(envelopes, result)
if event:
if isinstance(event, QueueExitEvent):
self._transmit_from_storage() # send files before exit
return
if len(batch) < self.options.max_batch_size:
self._transmit_from_storage()
finally:
if event:
event.set()

def close(self):
self.storage.close()
self._worker.stop()

def createLock(self):
Expand All @@ -52,14 +87,7 @@ def createLock(self):
def emit(self, record):
self._queue.put(record, block=False)

def _export(self, batch, event=None):
try:
return self.export(batch)
finally:
if event:
event.set()

def export(self, batch):
def log_record_to_envelope(self, record):
raise NotImplementedError # pragma: NO COVER

def flush(self, timeout=None):
Expand Down Expand Up @@ -121,74 +149,18 @@ def filter(self, record):


class AzureLogHandler(TransportMixin, ProcessorMixin, BaseLogHandler):
"""Handler for logging to Microsoft Azure Monitor.

:param options: Options for the log handler.
"""

def __init__(self, **options):
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
if not 0 <= self.options.logging_sampling_rate <= 1:
raise ValueError('Sampling must be in the range: [0,1]')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)
self._telemetry_processors = []
super(AzureLogHandler, self).__init__()
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))

def close(self):
self.storage.close()
super(AzureLogHandler, self).close()

def _export(self, batch, event=None): # pragma: NO COVER
try:
if batch:
envelopes = [self.log_record_to_envelope(x) for x in batch]
envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
if result > 0:
self.storage.put(envelopes, result)
if event:
if isinstance(event, QueueExitEvent):
self._transmit_from_storage() # send files before exit
return
if len(batch) < self.options.max_batch_size:
self._transmit_from_storage()
finally:
if event:
event.set()
"""Handler for logging to Microsoft Azure Monitor."""

def log_record_to_envelope(self, record):
envelope = Envelope(
iKey=self.options.instrumentation_key,
tags=dict(utils.azure_monitor_context),
time=utils.timestamp_to_iso_str(record.created),
)
envelope = create_envelope(self.options.instrumentation_key, record)

envelope.tags['ai.operation.id'] = getattr(
record,
'traceId',
'00000000000000000000000000000000',
)
envelope.tags['ai.operation.parentId'] = '|{}.{}.'.format(
envelope.tags['ai.operation.id'],
getattr(record, 'spanId', '0000000000000000'),
)
properties = {
'process': record.processName,
'module': record.module,
'fileName': record.pathname,
'lineNumber': record.lineno,
'level': record.levelname,
}

if (hasattr(record, 'custom_dimensions') and
isinstance(record.custom_dimensions, dict)):
properties.update(record.custom_dimensions)
Expand Down Expand Up @@ -230,3 +202,43 @@ def log_record_to_envelope(self, record):
)
envelope.data = Data(baseData=data, baseType='MessageData')
return envelope


class AzureEventHandler(TransportMixin, ProcessorMixin, BaseLogHandler):
"""Handler for sending custom events to Microsoft Azure Monitor."""

def log_record_to_envelope(self, record):
envelope = create_envelope(self.options.instrumentation_key, record)

properties = {}
if (hasattr(record, 'custom_dimensions') and
isinstance(record.custom_dimensions, dict)):
properties.update(record.custom_dimensions)

envelope.name = 'Microsoft.ApplicationInsights.Event'
data = Event(
name=self.format(record),
properties=properties,
)
envelope.data = Data(baseData=data, baseType='EventData')

return envelope


def create_envelope(instrumentation_key, record):
envelope = Envelope(
iKey=instrumentation_key,
tags=dict(utils.azure_monitor_context),
time=utils.timestamp_to_iso_str(record.created),
)
envelope.tags['ai.operation.id'] = getattr(
record,
'traceId',
'00000000000000000000000000000000',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'0' * n?

)
envelope.tags['ai.operation.parentId'] = '|{}.{}.'.format(
envelope.tags['ai.operation.id'],
getattr(record, 'spanId', '0000000000000000'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

)

return envelope
Loading