Skip to content

Commit

Permalink
Implement custom events in Azure (#925)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Jun 29, 2020
1 parent 86d5bd7 commit dea0794
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 69 deletions.
15 changes: 15 additions & 0 deletions contrib/opencensus-ext-azure/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ Modifying Logs
logger.addHandler(handler)
logger.warning('Hello, World!')
Events
######

You can send `customEvent` telemetry in exactly the same way you would send `trace` telemetry except using the `AzureEventHandler` instead.

.. code:: python
import logging
from opencensus.ext.azure.log_exporter import AzureEventHandler
logger = logging.getLogger(__name__)
logger.addHandler(AzureEventHandler(connection_string='InstrumentationKey=<your-instrumentation_key-here>'))
logger.setLevel(logging.INFO)
logger.info('Hello, World!')
Metrics
~~~~~~~
Expand Down
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/correlated.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@
with tracer.span(name='test'):
logger.warning('In the span')
logger.warning('After the span')

input("...")
1 change: 1 addition & 0 deletions contrib/opencensus-ext-azure/examples/logs/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ def main():

if __name__ == '__main__':
main()
input("...")
27 changes: 27 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2019, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from opencensus.ext.azure.log_exporter import AzureEventHandler

logger = logging.getLogger(__name__)
# TODO: you need to specify the instrumentation key in a connection string
# and place it in the APPLICATIONINSIGHTS_CONNECTION_STRING
# environment variable.
logger.addHandler(AzureEventHandler())
logger.setLevel(logging.INFO)
logger.info('Hello, World!')

input("...")
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@
result = 1 / 0 # generate a ZeroDivisionError
except Exception:
logger.exception('Captured an exception.', extra=properties)

input("...")
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@
# environment variable.
logger.addHandler(AzureLogHandler())
logger.warning('Hello, World!')

input("...")
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from opencensus.ext.azure.common.protocol import (
Data,
Envelope,
Event,
ExceptionData,
Message,
)
Expand All @@ -33,17 +34,51 @@

logger = logging.getLogger(__name__)

__all__ = ['AzureLogHandler']
__all__ = ['AzureEventHandler', 'AzureLogHandler']


class BaseLogHandler(logging.Handler):
def __init__(self):

def __init__(self, **options):
super(BaseLogHandler, self).__init__()
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
if not 0 <= self.options.logging_sampling_rate <= 1:
raise ValueError('Sampling must be in the range: [0,1]')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)
self._telemetry_processors = []
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))
self._queue = Queue(capacity=8192) # TODO: make this configurable
self._worker = Worker(self._queue, self)
self._worker.start()

def _export(self, batch, event=None): # pragma: NO COVER
try:
if batch:
envelopes = [self.log_record_to_envelope(x) for x in batch]
envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
if result > 0:
self.storage.put(envelopes, result)
if event:
if isinstance(event, QueueExitEvent):
self._transmit_from_storage() # send files before exit
return
if len(batch) < self.options.max_batch_size:
self._transmit_from_storage()
finally:
if event:
event.set()

def close(self):
self.storage.close()
self._worker.stop()

def createLock(self):
Expand All @@ -52,14 +87,7 @@ def createLock(self):
def emit(self, record):
self._queue.put(record, block=False)

def _export(self, batch, event=None):
try:
return self.export(batch)
finally:
if event:
event.set()

def export(self, batch):
def log_record_to_envelope(self, record):
raise NotImplementedError # pragma: NO COVER

def flush(self, timeout=None):
Expand Down Expand Up @@ -121,74 +149,18 @@ def filter(self, record):


class AzureLogHandler(TransportMixin, ProcessorMixin, BaseLogHandler):
"""Handler for logging to Microsoft Azure Monitor.
:param options: Options for the log handler.
"""

def __init__(self, **options):
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
if not 0 <= self.options.logging_sampling_rate <= 1:
raise ValueError('Sampling must be in the range: [0,1]')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)
self._telemetry_processors = []
super(AzureLogHandler, self).__init__()
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))

def close(self):
self.storage.close()
super(AzureLogHandler, self).close()

def _export(self, batch, event=None): # pragma: NO COVER
try:
if batch:
envelopes = [self.log_record_to_envelope(x) for x in batch]
envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
if result > 0:
self.storage.put(envelopes, result)
if event:
if isinstance(event, QueueExitEvent):
self._transmit_from_storage() # send files before exit
return
if len(batch) < self.options.max_batch_size:
self._transmit_from_storage()
finally:
if event:
event.set()
"""Handler for logging to Microsoft Azure Monitor."""

def log_record_to_envelope(self, record):
envelope = Envelope(
iKey=self.options.instrumentation_key,
tags=dict(utils.azure_monitor_context),
time=utils.timestamp_to_iso_str(record.created),
)
envelope = create_envelope(self.options.instrumentation_key, record)

envelope.tags['ai.operation.id'] = getattr(
record,
'traceId',
'00000000000000000000000000000000',
)
envelope.tags['ai.operation.parentId'] = '|{}.{}.'.format(
envelope.tags['ai.operation.id'],
getattr(record, 'spanId', '0000000000000000'),
)
properties = {
'process': record.processName,
'module': record.module,
'fileName': record.pathname,
'lineNumber': record.lineno,
'level': record.levelname,
}

if (hasattr(record, 'custom_dimensions') and
isinstance(record.custom_dimensions, dict)):
properties.update(record.custom_dimensions)
Expand Down Expand Up @@ -230,3 +202,43 @@ def log_record_to_envelope(self, record):
)
envelope.data = Data(baseData=data, baseType='MessageData')
return envelope


class AzureEventHandler(TransportMixin, ProcessorMixin, BaseLogHandler):
"""Handler for sending custom events to Microsoft Azure Monitor."""

def log_record_to_envelope(self, record):
envelope = create_envelope(self.options.instrumentation_key, record)

properties = {}
if (hasattr(record, 'custom_dimensions') and
isinstance(record.custom_dimensions, dict)):
properties.update(record.custom_dimensions)

envelope.name = 'Microsoft.ApplicationInsights.Event'
data = Event(
name=self.format(record),
properties=properties,
)
envelope.data = Data(baseData=data, baseType='EventData')

return envelope


def create_envelope(instrumentation_key, record):
envelope = Envelope(
iKey=instrumentation_key,
tags=dict(utils.azure_monitor_context),
time=utils.timestamp_to_iso_str(record.created),
)
envelope.tags['ai.operation.id'] = getattr(
record,
'traceId',
'00000000000000000000000000000000',
)
envelope.tags['ai.operation.parentId'] = '|{}.{}.'.format(
envelope.tags['ai.operation.id'],
getattr(record, 'spanId', '0000000000000000'),
)

return envelope
Loading

0 comments on commit dea0794

Please sign in to comment.