Skip to content
Merged
66 changes: 66 additions & 0 deletions docs/TelemetryDevelopment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Telemetry Development

## Telemetry Presence

`dd-trace-rb` is written to assume that the telemetry component is always
present. If telemetry is disabled, the component is still created but does
nothing.

Most components call methods on `telemetry` unconditionally. There are two
exceptons: DI and Data Streams are written to assume that `telemetry` may be nil.
However, this assumption is not necessary and these components may be
changed in the future to assume that `telemetry` is always present.

## Event Submission Prior To Start

Telemetry is unique among other components in that it permits events to be
submitted to it prior to its worker starting. This is done so that errors
during `Datadog.configure` processing can be reported via telemetry, because
the errors can be produced prior to telemetry worker starting. The telemetry
component keeps the events and sends them after the worker starts.

## Initial Event

`dd-trace-rb` can be initialized multiple times during application boot.
For example, if customers follow our documentation and require
`datadog/auto_instrument`, and call `Datadog.configure`, they would get
`Datadog.configure` invoked two times total (the first time by `auto_instrument`)
and thus telemetry instance would be created twice. This happens in the
applications used with system tests.

System tests, on the other hand, require that there is only one `app-started`
event emitted, because they think the application is launched once.
To deal with this we have a hack in the telemetry code to send an
`app-client-configuration-change` event instead of the second `app-started`
event. This is implemented via the `SynthAppClientConfigurationChange` class.

## Fork Handling

We must send telemetry data from forked children.

Telemetry started out as a diagnostic tool used during application boot,
but is now used for reporting application liveness (and settings/state)
throughout the application lifetime. Live Debugger / Dynamic Instrumentation,
for example, require ongoing `app-heartbeat` events emitted via telemetry
to provide a working UI to customers.

It is somewhat common for customers to preload the application in the parent
web server process and process requests from children. This means telemetry
is initialized from the parent process, and it must emit events in the
forked children.

We use the standard worker `after_fork` handler to recreated the worker
thread in forked children. However, there are two caveats to keep in mind
which are specific to telemetry:

1. Due to telemetry permitting event submission prior to its start, it is
not sufficient to simply reset the state from the worker's `perform` method,
as is done in other components. We must only reset the state when we are
in the forked child, otherwise we'll trash any events submitted to telemetry
prior to its worker starting.

2. The child process is a brand new application as far as the backend/UI is
concerned, having a new runtime ID, and therefore the initial event in the
forked child must always be `app-started`. Since we track the initial event
in the telemetry component, this event must be changed to `app-started` in
forked children regardless of what it was in the parent.
29 changes: 19 additions & 10 deletions lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@
module Datadog
module Core
module Telemetry
# Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle.
# Note: Telemetry does not spawn its worker thread in fork processes, thus no telemetry is sent in forked processes.
# Telemetry entry point, coordinates sending telemetry events at
# various points in application lifecycle.
#
# @api private
class Component
ENDPOINT_COLLECTION_MESSAGE_LIMIT = 300

attr_reader :enabled, :logger, :transport, :worker
attr_reader :enabled
attr_reader :logger
attr_reader :transport
attr_reader :worker
attr_reader :settings
attr_reader :agent_settings

# Alias for consistency with other components.
# TODO Remove +enabled+ method
alias_method :enabled?, :enabled

include Core::Utils::Forking
include Telemetry::Logging
Expand Down Expand Up @@ -110,7 +119,7 @@ def disable!
end

def start(initial_event_is_change = false, components:)
return if !@enabled
return unless enabled?

initial_event = if initial_event_is_change
Event::SynthAppClientConfigurationChange.new(
Expand All @@ -136,19 +145,19 @@ def shutdown!
end

def emit_closing!
return if !@enabled || forked?
return unless enabled?

@worker.enqueue(Event::AppClosing.new)
end

def integrations_change!
return if !@enabled || forked?
return unless enabled?

@worker.enqueue(Event::AppIntegrationsChange.new)
end

def log!(event)
return if !@enabled || forked? || !@log_collection_enabled
return unless enabled? && @log_collection_enabled

@worker.enqueue(event)
end
Expand All @@ -159,21 +168,21 @@ def log!(event)
#
# @api private
def flush
return if !@enabled || forked?
return unless enabled?

@worker.flush
end

# Report configuration changes caused by Remote Configuration.
def client_configuration_change!(changes)
return if !@enabled || forked?
return unless enabled?

@worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config'))
end

# Report application endpoints
def app_endpoints_loaded(endpoints, page_size: ENDPOINT_COLLECTION_MESSAGE_LIMIT)
return if !@enabled || forked?
return unless enabled?

endpoints.each_slice(page_size).with_index do |endpoints_slice, i|
@worker.enqueue(Event::AppEndpointsLoaded.new(endpoints_slice, is_first: i.zero?))
Expand Down
15 changes: 15 additions & 0 deletions lib/datadog/core/telemetry/event/app_started.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ class AppStarted < Base
def initialize(components:)
# To not hold a reference to the component tree, generate
# the event payload here in the constructor.
#
# Important: do not store data that contains (or is derived from)
# the runtime_id or sequence numbers.
# This event is reused when a process forks, but in the
# child process the runtime_id would be different and sequence
# number is reset.
@configuration = configuration(components.settings, components.agent_settings)
@install_signature = install_signature(components.settings)
@products = products(components)
Expand All @@ -30,6 +36,15 @@ def payload
}
end

# Whether the event is actually the app-started event.
# For the app-started event we follow up by sending
# app-dependencies-loaded, if the event is
# app-client-configuration-change we don't send
# app-dependencies-loaded.
def app_started?
true
end

private

def products(components)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,36 @@ module Event
# and app-closing events.
class SynthAppClientConfigurationChange < AppStarted
def type
'app-client-configuration-change'
if reset?
super
else
'app-client-configuration-change'
end
end

def payload
{
configuration: @configuration,
}
if reset?
super
else
{
configuration: @configuration,
}
end
end

def app_started?
reset?
end

# Revert this event to a "regular" AppStarted event.
#
# Used in after_fork to send the AppStarted event in child processes.
def reset!
@reset = true
end

def reset?
!!@reset
end
end
end
Expand Down
57 changes: 51 additions & 6 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
module Datadog
module Core
module Telemetry
# Accumulates events and sends them to the API at a regular interval, including heartbeat event.
# Accumulates events and sends them to the API at a regular interval,
# including heartbeat event.
#
# @api private
class Worker
include Core::Workers::Queue
include Core::Workers::Polling
Expand Down Expand Up @@ -40,11 +43,15 @@ def initialize(
self.enabled = enabled
# Workers::IntervalLoop settings
self.loop_base_interval = metrics_aggregation_interval_seconds
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART

@shutdown_timeout = shutdown_timeout
@buffer_size = buffer_size

initialize_state
end

private def initialize_state
self.buffer = buffer_klass.new(@buffer_size)

@initial_event_once = Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)
Expand All @@ -53,12 +60,13 @@ def initialize(
attr_reader :logger
attr_reader :initial_event_once
attr_reader :initial_event
attr_reader :emitter

# Returns true if worker thread is successfully started,
# false if worker thread was not started but telemetry is enabled,
# nil if telemetry is disabled.
def start(initial_event)
return if !enabled? || forked?
return unless enabled?

@initial_event = initial_event

Expand All @@ -79,7 +87,21 @@ def stop(force_stop = false, timeout = @shutdown_timeout)
# for not enqueueing event (presently) is that telemetry is disabled
# altogether, and in this case other methods return nil.
def enqueue(event)
return if !enabled? || forked?
return unless enabled?

# Start the worker if needed, including in forked children.
# Needs to be done before pushing to buffer since perform
# may invoke after_fork handler which resets the buffer.
#
# Telemetry is special in that it permits events to be submitted
# to the worker with the worker not running, and the worker is
# explicitly started later (to maintain proper initialization order).
# Thus here we can't just call perform unconditionally and must
# check if the worker is supposed to be running, and only call
# perform in that case.
if worker && !worker.alive?
perform
end

buffer.push(event)
true
Expand Down Expand Up @@ -133,7 +155,7 @@ def flush
private

def perform(*events)
return if !enabled? || forked?
return unless enabled?

if need_initial_event?
started!
Expand Down Expand Up @@ -189,7 +211,9 @@ def started!
# dependencies and send the new ones.
# System tests demand only one instance of this event per
# dependency.
send_event(Event::AppDependenciesLoaded.new) if @dependency_collection && initial_event.class.eql?(Telemetry::Event::AppStarted) # standard:disable Style/ClassEqualityComparison:
if @dependency_collection && initial_event.app_started?
send_event(Event::AppDependenciesLoaded.new)
end

true
else
Expand Down Expand Up @@ -240,6 +264,27 @@ def disable_on_not_found!(response)
disable!
end

# Stop the worker after fork without sending closing event.
# The closing event will be (or should be) sent by the worker
# in the parent process.
# Also, discard any accumulated events since they will be sent by
# the parent.
def after_fork
# If telemetry is disabled, we still reset the state to avoid
# having wrong state. It is possible that in the future telemetry
# will be re-enabled after errors.
initialize_state
# In the child process, we get a new runtime_id.
# As such we need to send AppStarted event.
# In the parent process, the event may have been the
# SynthAppClientConfigurationChange instead of AppStarted,
# and in that case we need to convert it to the "regular"
# AppStarted event.
if @initial_event.is_a?(Event::SynthAppClientConfigurationChange)
@initial_event.reset! # steep:ignore
end
end

# Deduplicate logs by counting the number of repeated occurrences of the same log
# entry and replacing them with a single entry with the calculated `count` value.
# Non-log events are unchanged.
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ module Datadog
def self.build: (untyped settings, Datadog::Core::Configuration::AgentSettings agent_settings, Datadog::Core::Logger logger) -> Component

def initialize: (logger: Core::Logger, settings: untyped, agent_settings: Datadog::Core::Configuration::AgentSettings, enabled: true | false) -> void

def enabled?: -> bool

def disable!: () -> void

Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/event/app_started.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ module Datadog
def type: () -> "app-started"

def payload: () -> { products: untyped, configuration: untyped, install_signature: untyped }

def app_started?: -> bool

private

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ module Datadog
module Telemetry
module Event
class SynthAppClientConfigurationChange < AppStarted
def type: () -> "app-client-configuration-change"
def type: -> ("app-client-configuration-change" | "app-started")

def payload: () -> { configuration: untyped }
def payload: () -> { ?products: untyped, configuration: untyped, ?install_signature: untyped }

def reset?: -> bool

def reset!: -> void
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ module Datadog
@logger: ::Logger

attr_reader logger: ::Logger
attr_reader initial_event: Telemetry::Event::Base
attr_reader initial_event: Telemetry::Event::AppStarted
attr_reader initial_event_once: Datadog::Core::Utils::OnlyOnceSuccessful

def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Float | Integer, ?buffer_size: Integer, dependency_collection: bool, logger: ::Logger) -> void
def initialize_state: -> void

def start: (Telemetry::Event::Base initial_event) -> void

Expand Down
Loading