diff --git a/docs/TelemetryDevelopment.md b/docs/TelemetryDevelopment.md new file mode 100644 index 00000000000..cd6fee4ddb5 --- /dev/null +++ b/docs/TelemetryDevelopment.md @@ -0,0 +1,66 @@ +# Telemetry Development + +## Telemetry Presence + +`dd-trace-rb` is written to assume that the telemetry component is always +present. If telemetry is disabled, the component is still created but does +nothing. + +Most components call methods on `telemetry` unconditionally. There are two +exceptons: DI and Data Streams are written to assume that `telemetry` may be nil. +However, this assumption is not necessary and these components may be +changed in the future to assume that `telemetry` is always present. + +## Event Submission Prior To Start + +Telemetry is unique among other components in that it permits events to be +submitted to it prior to its worker starting. This is done so that errors +during `Datadog.configure` processing can be reported via telemetry, because +the errors can be produced prior to telemetry worker starting. The telemetry +component keeps the events and sends them after the worker starts. + +## Initial Event + +`dd-trace-rb` can be initialized multiple times during application boot. +For example, if customers follow our documentation and require +`datadog/auto_instrument`, and call `Datadog.configure`, they would get +`Datadog.configure` invoked two times total (the first time by `auto_instrument`) +and thus telemetry instance would be created twice. This happens in the +applications used with system tests. + +System tests, on the other hand, require that there is only one `app-started` +event emitted, because they think the application is launched once. +To deal with this we have a hack in the telemetry code to send an +`app-client-configuration-change` event instead of the second `app-started` +event. This is implemented via the `SynthAppClientConfigurationChange` class. + +## Fork Handling + +We must send telemetry data from forked children. + +Telemetry started out as a diagnostic tool used during application boot, +but is now used for reporting application liveness (and settings/state) +throughout the application lifetime. Live Debugger / Dynamic Instrumentation, +for example, require ongoing `app-heartbeat` events emitted via telemetry +to provide a working UI to customers. + +It is somewhat common for customers to preload the application in the parent +web server process and process requests from children. This means telemetry +is initialized from the parent process, and it must emit events in the +forked children. + +We use the standard worker `after_fork` handler to recreated the worker +thread in forked children. However, there are two caveats to keep in mind +which are specific to telemetry: + +1. Due to telemetry permitting event submission prior to its start, it is +not sufficient to simply reset the state from the worker's `perform` method, +as is done in other components. We must only reset the state when we are +in the forked child, otherwise we'll trash any events submitted to telemetry +prior to its worker starting. + +2. The child process is a brand new application as far as the backend/UI is +concerned, having a new runtime ID, and therefore the initial event in the +forked child must always be `app-started`. Since we track the initial event +in the telemetry component, this event must be changed to `app-started` in +forked children regardless of what it was in the parent. diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index 7c704bcceda..39c7ed598c7 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -14,14 +14,26 @@ module Datadog module Core module Telemetry - # Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle. - # Note: Telemetry does not spawn its worker thread in fork processes, thus no telemetry is sent in forked processes. + # Telemetry entry point, coordinates sending telemetry events at + # various points in application lifecycle. # # @api private class Component ENDPOINT_COLLECTION_MESSAGE_LIMIT = 300 - attr_reader :enabled, :logger, :transport, :worker + ONLY_ONCE = Utils::OnlyOnce.new + + attr_reader :enabled + attr_reader :logger + attr_reader :transport + attr_reader :worker + attr_reader :settings + attr_reader :agent_settings + attr_reader :metrics_manager + + # Alias for consistency with other components. + # TODO Remove +enabled+ method + alias_method :enabled?, :enabled include Core::Utils::Forking include Telemetry::Logging @@ -50,6 +62,17 @@ def initialize( # standard:disable Metrics/MethodLength logger:, enabled: ) + ONLY_ONCE.run do + Utils::AtForkMonkeyPatch.apply! + + # All of the other at fork monkey patch callbacks reference + # globals, follow that pattern here to avoid having the component + # referenced via the at fork callbacks. + Datadog::Core::Utils::AtForkMonkeyPatch.at_fork(:child) do + Datadog.send(:components, allow_initialization: false)&.telemetry&.after_fork + end + end + @enabled = enabled @log_collection_enabled = settings.telemetry.log_collection_enabled @logger = logger @@ -110,7 +133,7 @@ def disable! end def start(initial_event_is_change = false, components:) - return if !@enabled + return unless enabled? initial_event = if initial_event_is_change Event::SynthAppClientConfigurationChange.new( @@ -136,19 +159,19 @@ def shutdown! end def emit_closing! - return if !@enabled || forked? + return unless enabled? @worker.enqueue(Event::AppClosing.new) end def integrations_change! - return if !@enabled || forked? + return unless enabled? @worker.enqueue(Event::AppIntegrationsChange.new) end def log!(event) - return if !@enabled || forked? || !@log_collection_enabled + return unless enabled? && @log_collection_enabled @worker.enqueue(event) end @@ -158,22 +181,22 @@ def log!(event) # been flushed, or nil if telemetry is disabled. # # @api private - def flush - return if !@enabled || forked? + def flush(timeout: nil) + return unless enabled? - @worker.flush + @worker.flush(timeout: timeout) end # Report configuration changes caused by Remote Configuration. def client_configuration_change!(changes) - return if !@enabled || forked? + return unless enabled? @worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config')) end # Report application endpoints def app_endpoints_loaded(endpoints, page_size: ENDPOINT_COLLECTION_MESSAGE_LIMIT) - return if !@enabled || forked? + return unless enabled? endpoints.each_slice(page_size).with_index do |endpoints_slice, i| @worker.enqueue(Event::AppEndpointsLoaded.new(endpoints_slice, is_first: i.zero?)) @@ -204,6 +227,22 @@ def rate(namespace, metric_name, value, tags: {}, common: true) def distribution(namespace, metric_name, value, tags: {}, common: true) @metrics_manager.distribution(namespace, metric_name, value, tags: tags, common: common) end + + # When a fork happens, we generally need to do two things inside the + # child proess: + # 1. Restart the worker. + # 2. Discard any events and metrics that were submitted in the + # parent process (because they will be sent out in the parent + # process, sending them in the child would cause duplicate + # submission). + def after_fork + # We cannot simply create a new instance of metrics manager because + # it is referenced from other objects (e.g. the worker). + # We must reset the existing instance. + @metrics_manager.clear + + worker&.send(:after_fork_monkey_patched) + end end end end diff --git a/lib/datadog/core/telemetry/event/app_started.rb b/lib/datadog/core/telemetry/event/app_started.rb index bd2a5715152..81039ecf033 100644 --- a/lib/datadog/core/telemetry/event/app_started.rb +++ b/lib/datadog/core/telemetry/event/app_started.rb @@ -11,6 +11,12 @@ class AppStarted < Base def initialize(components:) # To not hold a reference to the component tree, generate # the event payload here in the constructor. + # + # Important: do not store data that contains (or is derived from) + # the runtime_id or sequence numbers. + # This event is reused when a process forks, but in the + # child process the runtime_id would be different and sequence + # number is reset. @configuration = configuration(components.settings, components.agent_settings) @install_signature = install_signature(components.settings) @products = products(components) @@ -30,6 +36,15 @@ def payload } end + # Whether the event is actually the app-started event. + # For the app-started event we follow up by sending + # app-dependencies-loaded, if the event is + # app-client-configuration-change we don't send + # app-dependencies-loaded. + def app_started? + true + end + private def products(components) diff --git a/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb b/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb index 6ddbe9cffeb..0b35328f638 100644 --- a/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb +++ b/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb @@ -28,13 +28,36 @@ module Event # and app-closing events. class SynthAppClientConfigurationChange < AppStarted def type - 'app-client-configuration-change' + if reset? + super + else + 'app-client-configuration-change' + end end def payload - { - configuration: @configuration, - } + if reset? + super + else + { + configuration: @configuration, + } + end + end + + def app_started? + reset? + end + + # Revert this event to a "regular" AppStarted event. + # + # Used in after_fork to send the AppStarted event in child processes. + def reset! + @reset = true + end + + def reset? + defined?(@reset) && !!@reset end end end diff --git a/lib/datadog/core/telemetry/metrics_manager.rb b/lib/datadog/core/telemetry/metrics_manager.rb index 95750728c64..c8c49e3277d 100644 --- a/lib/datadog/core/telemetry/metrics_manager.rb +++ b/lib/datadog/core/telemetry/metrics_manager.rb @@ -6,8 +6,11 @@ module Datadog module Core module Telemetry # MetricsManager aggregates and flushes metrics and distributions + # + # @api private class MetricsManager attr_reader :enabled + attr_reader :collections def initialize(aggregation_interval:, enabled:) @interval = aggregation_interval @@ -68,6 +71,12 @@ def disable! @enabled = false end + def clear + @mutex.synchronize do + @collections = {} + end + end + private def fetch_or_create_collection(namespace) diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index a1b869842bc..7287fbf45a5 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -9,7 +9,10 @@ module Datadog module Core module Telemetry - # Accumulates events and sends them to the API at a regular interval, including heartbeat event. + # Accumulates events and sends them to the API at a regular interval, + # including heartbeat event. + # + # @api private class Worker include Core::Workers::Queue include Core::Workers::Polling @@ -40,11 +43,23 @@ def initialize( self.enabled = enabled # Workers::IntervalLoop settings self.loop_base_interval = metrics_aggregation_interval_seconds - self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP + # We actually restart the worker after fork, but this is done + # via the AtForkMonkeyPatch rather than the worker fork policy + # because we also need to reset state outside of the worker + # (e.g. the metrics). + self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART @shutdown_timeout = shutdown_timeout @buffer_size = buffer_size + initialize_state + end + + # To make the method calls clear, the initialization code is in this + # method called +initialize_state+ which is called from +after_fork+. + # This way users of this class (e.g. telemetry Component) do not + # need to invoke +initialize_state+ directly, which can be confusing. + private def initialize_state self.buffer = buffer_klass.new(@buffer_size) @initial_event_once = Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES) @@ -53,12 +68,13 @@ def initialize( attr_reader :logger attr_reader :initial_event_once attr_reader :initial_event + attr_reader :emitter # Returns true if worker thread is successfully started, # false if worker thread was not started but telemetry is enabled, # nil if telemetry is disabled. def start(initial_event) - return if !enabled? || forked? + return unless enabled? @initial_event = initial_event @@ -79,7 +95,7 @@ def stop(force_stop = false, timeout = @shutdown_timeout) # for not enqueueing event (presently) is that telemetry is disabled # altogether, and in this case other methods return nil. def enqueue(event) - return if !enabled? || forked? + return unless enabled? buffer.push(event) true @@ -111,7 +127,7 @@ def flush(timeout: nil) private def perform(*events) - return if !enabled? || forked? + return unless enabled? if need_initial_event? started! @@ -167,7 +183,9 @@ def started! # dependencies and send the new ones. # System tests demand only one instance of this event per # dependency. - send_event(Event::AppDependenciesLoaded.new) if @dependency_collection && initial_event.class.eql?(Telemetry::Event::AppStarted) # standard:disable Style/ClassEqualityComparison: + if @dependency_collection && initial_event.app_started? + send_event(Event::AppDependenciesLoaded.new) + end true else @@ -220,6 +238,45 @@ def disable_on_not_found!(response) disable! end + # Call this method in a forked child to reset the state of this worker. + # + # Discard any accumulated events since they will be sent by + # the parent. + # Discard any accumulated metrics. + # Restart the worker thread, if it was running in the parent process. + # + # This method cannot be called +after_fork+ because workers define + # and call +after_fork+ which is supposed to do different things. + def after_fork_monkey_patched + # If telemetry is disabled, we still reset the state to avoid + # having wrong state. It is possible that in the future telemetry + # will be re-enabled after errors. + initialize_state + # In the child process, we get a new runtime_id. + # As such we need to send AppStarted event. + # In the parent process, the event may have been the + # SynthAppClientConfigurationChange instead of AppStarted, + # and in that case we need to convert it to the "regular" + # AppStarted event. + if defined?(@initial_event) && @initial_event.is_a?(Event::SynthAppClientConfigurationChange) + # It would be great to just replace the initial event in + # +initialize_state+ method. Unfortunately this event requires + # the entire component tree to build its payload, which we + # 1) don't currently have in telemetry and + # 2) don't want to keep a permanent reference to in any case. + # Therefore we have this +reset!+ method that changes the + # event type while keeping the payload. + @initial_event.reset! # steep:ignore NoMethod + end + + if enabled? && !worker.nil? + # Start the background thread if it was started in the parent + # process (which requires telemetry to be enabled). + # This should be done after all of the state resets. + perform + end + end + # Deduplicate logs by counting the number of repeated occurrences of the same log # entry and replacing them with a single entry with the calculated `count` value. # Non-log events are unchanged. diff --git a/sig/datadog/core/telemetry/component.rbs b/sig/datadog/core/telemetry/component.rbs index 0d59476e33f..baa5c8e2a2a 100644 --- a/sig/datadog/core/telemetry/component.rbs +++ b/sig/datadog/core/telemetry/component.rbs @@ -3,6 +3,7 @@ module Datadog module Telemetry class Component ENDPOINT_COLLECTION_MESSAGE_LIMIT: Integer + ONLY_ONCE: Core::Utils::OnlyOnce @enabled: bool @log_collection_enabled: bool @@ -23,6 +24,8 @@ module Datadog def self.build: (untyped settings, Datadog::Core::Configuration::AgentSettings agent_settings, Datadog::Core::Logger logger) -> Component def initialize: (logger: Core::Logger, settings: untyped, agent_settings: Datadog::Core::Configuration::AgentSettings, enabled: true | false) -> void + + def enabled?: -> bool def disable!: () -> void @@ -30,7 +33,7 @@ module Datadog def shutdown!: () -> void - def flush: () -> void + def flush: (?timeout: Numeric?) -> bool? def client_configuration_change!: (Array[[String, Numeric | bool | String]] changes) -> void @@ -49,6 +52,8 @@ module Datadog def rate: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void def distribution: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void + + def after_fork: () -> void end end end diff --git a/sig/datadog/core/telemetry/event/app_started.rbs b/sig/datadog/core/telemetry/event/app_started.rbs index d0b7871149e..68bfba41755 100644 --- a/sig/datadog/core/telemetry/event/app_started.rbs +++ b/sig/datadog/core/telemetry/event/app_started.rbs @@ -8,6 +8,8 @@ module Datadog def type: () -> "app-started" def payload: () -> { products: untyped, configuration: untyped, install_signature: untyped } + + def app_started?: -> bool private diff --git a/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs b/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs index 24331290384..188d7bcc9b1 100644 --- a/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs +++ b/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs @@ -3,9 +3,13 @@ module Datadog module Telemetry module Event class SynthAppClientConfigurationChange < AppStarted - def type: () -> "app-client-configuration-change" + def type: -> ("app-client-configuration-change" | "app-started") - def payload: () -> { configuration: untyped } + def payload: () -> { ?products: untyped, configuration: untyped, ?install_signature: untyped } + + def reset?: -> bool + + def reset!: -> void end end end diff --git a/sig/datadog/core/telemetry/metrics_manager.rbs b/sig/datadog/core/telemetry/metrics_manager.rbs index 50369469732..44d44385b1c 100644 --- a/sig/datadog/core/telemetry/metrics_manager.rbs +++ b/sig/datadog/core/telemetry/metrics_manager.rbs @@ -27,6 +27,8 @@ module Datadog def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base] def disable!: () -> void + + def clear: () -> void private diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index dd557d1c8af..ee2db35a082 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -22,10 +22,11 @@ module Datadog @logger: ::Logger attr_reader logger: ::Logger - attr_reader initial_event: Telemetry::Event::Base + attr_reader initial_event: Telemetry::Event::AppStarted attr_reader initial_event_once: Datadog::Core::Utils::OnlyOnceSuccessful def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Float | Integer, ?buffer_size: Integer, dependency_collection: bool, logger: ::Logger) -> void + def initialize_state: -> void def start: (Telemetry::Event::AppStarted initial_event) -> void @@ -60,6 +61,8 @@ module Datadog def buffer_klass: () -> untyped def idle?: () -> bool + + def after_fork_monkey_patched: () -> void end end end diff --git a/sig/datadog/core/utils/only_once_successful.rbs b/sig/datadog/core/utils/only_once_successful.rbs index 9e6729807a1..803d9b51058 100644 --- a/sig/datadog/core/utils/only_once_successful.rbs +++ b/sig/datadog/core/utils/only_once_successful.rbs @@ -8,15 +8,15 @@ module Datadog def initialize: (?Integer? limit) -> void - def success?: () -> bool + def success?: -> bool - def failed?: () -> bool + def failed?: -> bool private - def check_limit!: () -> void + def check_limit!: -> void - def limited?: () -> bool + def limited?: -> bool end end end diff --git a/spec/datadog/core/crashtracking/component_spec.rb b/spec/datadog/core/crashtracking/component_spec.rb index 3d5bde26158..036f8d8d087 100644 --- a/spec/datadog/core/crashtracking/component_spec.rb +++ b/spec/datadog/core/crashtracking/component_spec.rb @@ -368,6 +368,7 @@ def ruby_method_with_c_calls crash_stack_helper_class.new.top_level_ruby_method end + expect(runtime_stack).to be_a(Hash) frames = runtime_stack[:frames] # Check that the crashing function is captured diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index b32c5ab9a7a..e2e65f112f2 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -365,7 +365,7 @@ event = instance_double(Datadog::Core::Telemetry::Event::Log) telemetry.log!(event) - expect(worker).not_to have_received(:enqueue) + expect(worker).to have_received(:enqueue).with(event) end end end diff --git a/spec/datadog/core/telemetry/integration/full_integration_spec.rb b/spec/datadog/core/telemetry/integration/full_integration_spec.rb index 2b1fbb54830..8bd6f55bef6 100644 --- a/spec/datadog/core/telemetry/integration/full_integration_spec.rb +++ b/spec/datadog/core/telemetry/integration/full_integration_spec.rb @@ -5,6 +5,8 @@ require 'datadog/core/telemetry/component' RSpec.describe 'Telemetry full integration tests' do + skip_unless_integration_testing_enabled + context 'when Datadog.configure is used' do let(:worker1) do double(Datadog::Core::Telemetry::Worker) diff --git a/spec/datadog/core/telemetry/integration/telemetry_spec.rb b/spec/datadog/core/telemetry/integration/telemetry_spec.rb index a7a3802f39e..3741efe1b30 100644 --- a/spec/datadog/core/telemetry/integration/telemetry_spec.rb +++ b/spec/datadog/core/telemetry/integration/telemetry_spec.rb @@ -5,6 +5,8 @@ require 'datadog/core/telemetry/component' RSpec.describe 'Telemetry integration tests' do + skip_unless_integration_testing_enabled + # Although the tests override the environment variables, if any, # with programmatic configuration, that may produce warnings from the # configuration code. Remove environment variables to suppress the warnings. diff --git a/spec/datadog/core/telemetry/metrics_integration_spec.rb b/spec/datadog/core/telemetry/metrics_integration_spec.rb new file mode 100644 index 00000000000..1c1973b2566 --- /dev/null +++ b/spec/datadog/core/telemetry/metrics_integration_spec.rb @@ -0,0 +1,199 @@ +require 'spec_helper' + +RSpec.describe Datadog::Core::Telemetry::Component do + before(:all) do + if RUBY_VERSION < '2.6' + # The tests here are flaking in CI on Ruby 2.5. + # Once I add diagnostics to investigate why they are failing, they + # stop failing. + # After 3 weeks of trying to figure this out I am skipping + # the failing runtimes. + skip 'flaky in CI' + end + + # We need to ensure the patch is present. + # There is a unit test for the patcher itself which clears the callbacks, + # we need to reinstall our callback if the callback got installed before + # that test is run and this test is run even later. + described_class.const_get(:ONLY_ONCE).send(:reset_ran_once_state_for_tests) + + # Clear out existing handlers so that our handler is registered exactly once. + Datadog::Core::Utils::AtForkMonkeyPatch.const_get(:AT_FORK_CHILD_BLOCKS).clear + end + + let(:settings) do + Datadog::Core::Configuration::Settings.new.tap do |settings| + settings.telemetry.enabled = true + # Reduce the number of generated events + settings.telemetry.dependency_collection = false + end + end + + let(:component) do + described_class.build(settings, agent_settings, logger) + end + + let(:agent_settings) do + Datadog::Core::Configuration::AgentSettingsResolver.call(settings) + end + + let(:logger) { logger_allowing_debug } + + # Uncomment for debugging to see the log entries. + #let(:logger) { Logger.new(STDERR) } + + let(:components) do + double(Datadog::Core::Configuration::Components, + settings: settings, + agent_settings: agent_settings, + # This is required for the forking tests. + telemetry: component, + # Forking test logges to this logger in the forked child process. + logger: logger, + # Crash tracking registers a handler via at fork monkey patch, + # this handler tries to access the crash tracking component from the + # global component tree. + crashtracker: nil,) + end + + after do + component.shutdown! + end + + let(:initial_event) do + double(Datadog::Core::Telemetry::Event::AppStarted, + payload: {hello: 'world'}, + type: 'app-started', + app_started?: true,) + end + + let(:response) do + double(Datadog::Core::Transport::HTTP::Response, + ok?: true,) + end + + let(:events) { [] } + + def assert_events(events) + expect(events.length).to eq 2 + expect(events.first).to be initial_event + expect(events[1]).to be_a(Datadog::Core::Telemetry::Event::MessageBatch) + expect(events[1].events.length).to eq 1 + metrics_event = events[1].events.first + expect(metrics_event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics) + expect(metrics_event.payload).to match( + namespace: 'ns', + series: [ + metric: 'hello', + points: [[Integer, 1]], + type: 'count', + tags: [], + common: true, + ], + ) + end + + context 'when worker is started before metrics are submitted' do + it 'emits metrics' do + expect(Datadog::Core::Telemetry::Event::AppStarted).to receive(:new).and_return(initial_event) + expect(component.worker).to receive(:send_event).twice do |event| + events << event + response + end.ordered + component.start(components: components) + component.inc('ns', 'hello', 1) + # Assert that the flush succeeded, because we were sometimes not + # getting both of the events. + expect(component.flush).to be true + + assert_events(events) + end + end + + context 'when metrics are submitted before worker is started' do + it 'emits metrics' do + expect(Datadog::Core::Telemetry::Event::AppStarted).to receive(:new).and_return(initial_event) + expect(component.worker).to receive(:send_event).twice do |event| + events << event + response + end.ordered + component.inc('ns', 'hello', 1) + expect(component.worker.running?).to be false + component.start(components: components) + # Assert that the flush succeeded, because we were sometimes not + # getting both of the events. + expect(component.flush).to be true + + assert_events(events) + end + + # Submitting metrics in parent with the worker running is racy - we + # don't know if the worker in the parent will flush the events before + # the fork executes. + # Only test the forking case when worker is started after the fork + # (in the forked child). + context 'in forked child' do + forking_platform_only + + before do + # after_fork handler goes through the global variable. + expect(Datadog).to receive(:components).at_least(:once).and_return(components) + end + + it 'emits child but not parent metrics' do + expect(Datadog::Core::Telemetry::Event::AppStarted).to receive(:new).and_return(initial_event) + expect(component.worker).to receive(:send_event).twice do |event| + events << event + response + end.ordered + component.inc('ns', 'hello', 1) + expect(component.worker.running?).to be false + + expect(component.metrics_manager.collections.keys).to eq(%w[ns]) + + # The timeout for each flush is 15 seconds, and we perform two + # flushes. Thus the total timeout needs to be at least 30 seconds. + expect_in_fork(timeout_seconds: 40) do + # We expect namespaces to have been reset. + expect(component.metrics_manager.collections).to be_empty + + component.inc('child-ns', 'child-metric', 1) + expect(component.worker.running?).to be false + + # We expect only child namespace to be present. + expect(component.metrics_manager.collections.keys).to eq(%w[child-ns]) + + component.start(components: components) + expect(component.flush).to be true + + expect(events.length).to eq 2 + # We are going to have an initial event in the child + expect(events.first).to be initial_event + expect(events[1]).to be_a(Datadog::Core::Telemetry::Event::MessageBatch) + expect(events[1].events.length).to eq 1 + metrics_event = events[1].events.first + expect(metrics_event).to be_a(Datadog::Core::Telemetry::Event::GenerateMetrics) + expect(metrics_event.payload).to match( + namespace: 'child-ns', + series: [ + # Child only - no parent metric sent. + metric: 'child-metric', + points: [[Integer, 1]], + type: 'count', + tags: [], + common: true, + ], + ) + end + + # The events added in the parent should be sent in the parent. + # We still haven't started the worker in parent - do so now. + component.start(components: components) + # Assert that the flush succeeded, because we were sometimes not + # getting both of the events. + expect(component.flush).to be true + assert_events(events) + end + end + end +end diff --git a/spec/datadog/core/telemetry/worker_forking_spec.rb b/spec/datadog/core/telemetry/worker_forking_spec.rb new file mode 100644 index 00000000000..0ac033e9a31 --- /dev/null +++ b/spec/datadog/core/telemetry/worker_forking_spec.rb @@ -0,0 +1,249 @@ +require 'spec_helper' + +RSpec.describe Datadog::Core::Telemetry::Component do + forking_platform_only + + before(:all) do + # We need to ensure the patch is present. + # There is a unit test for the patcher itself which clears the callbacks, + # we need to reinstall our callback if the callback got installed before + # that test is run and this test is run even later. + described_class.const_get(:ONLY_ONCE).send(:reset_ran_once_state_for_tests) + + # Clear out existing handlers so that our handler is registered exactly once. + Datadog::Core::Utils::AtForkMonkeyPatch.const_get(:AT_FORK_CHILD_BLOCKS).clear + end + + let(:sent_payloads) { [] } + + let(:handler_proc) do + lambda do |req, _res| + expect(req.content_type).to eq('application/json') + payload = JSON.parse(req.body) + sent_payloads << { + headers: req.header, + payload: payload, + } + end + end + + http_server do |http_server| + http_server.mount_proc('/telemetry/proxy/api/v2/apmtelemetry', &handler_proc) + end + + let(:settings) do + Datadog::Core::Configuration::Settings.new.tap do |settings| + settings.telemetry.enabled = true + # Host may be overridden by environment variables + settings.agent.host = 'localhost' + settings.agent.port = http_server_port + # In this test we want to assert on dependency events + settings.telemetry.dependency_collection = true + # In CI tests can take a long time to run, avoid extra heartbeat events + settings.telemetry.heartbeat_interval_seconds = 1_000 + end + end + + let(:component) do + described_class.build(settings, agent_settings, logger) + end + + let(:agent_settings) do + Datadog::Core::Configuration::AgentSettingsResolver.call(settings) + end + + let(:logger) { logger_allowing_debug } + + # Uncomment for debugging to see the log entries. + #let(:logger) { Logger.new(STDERR) } + + let(:components) do + double(Datadog::Core::Configuration::Components, + settings: settings, + agent_settings: agent_settings, + # This is required for the forking tests. + telemetry: component, + # Forking test logges to this logger in the forked child process. + logger: logger, + # Crash tracking registers a handler via at fork monkey patch, + # this handler tries to access the crash tracking component from the + # global component tree. + crashtracker: nil, + profiler: nil, + dynamic_instrumentation: nil,) + end + + after do + component.shutdown! + end + + let(:initial_event) do + double(Datadog::Core::Telemetry::Event::AppStarted, + payload: {hello: 'world'}, + type: 'app-started', + app_started?: true,) + end + + let(:response) do + double(Datadog::Core::Transport::HTTP::Response, + ok?: true,) + end + + context 'when telemetry is disabled' do + before do + settings.telemetry.enabled = false + end + + it 'stays disabled in child process' do + expect(component.enabled?).to be false + expect(component.worker).to be nil + + expect_in_fork do + expect(component.enabled?).to be false + expect(component.worker).to be nil + end + end + end + + context 'when telemetry is enabled' do + before do + settings.telemetry.enabled = true + end + + before do + # after_fork handler goes through the global variable. + # + # Cannot use +expect+ here because the call is in child process. + allow(Datadog).to receive(:components).and_return(components) + end + + it 'stays enabled in child process' do + expect(component.enabled?).to be true + expect(component.worker).to be_a(Datadog::Core::Telemetry::Worker) + expect(component.worker.enabled?).to be true + + expect_in_fork do + expect(component.enabled?).to be true + expect(component.worker.enabled?).to be true + end + end + + context 'when worker is running' do + before do + # Reduce interval between event submissions in worker + # to make the test run faster. + expect(component.worker).to receive(:loop_wait_time).at_least(:once).and_return(1) + end + + before do + component.worker.start(initial_event) + end + + it 'restarts worker after fork' do + expect(component.enabled?).to be true + expect(component.worker).to be_a(Datadog::Core::Telemetry::Worker) + expect(component.worker.enabled?).to be true + expect(component.worker.running?).to be true + + expect_in_fork do + expect(component.enabled?).to be true + expect(component.worker.enabled?).to be true + expect(component.worker.running?).to be true + + # Queueing an event will restart the worker in the forked child. + component.worker.enqueue(Datadog::Core::Telemetry::Event::AppHeartbeat.new) + + expect(component.worker.running?).to be true + end + end + end + + describe 'events generated in forked child' do + before do + # Reduce interval between event submissions in worker + # to make the test run faster. + expect(component.worker).to receive(:loop_wait_time).at_least(:once).and_return(1) + end + + # Behavior in the child should be the same regardless of what + # was sent in the parent, because the child is a new application + # (process) from the backend's perspective. + def fork_and_assert + sent_payloads.clear + + expect_in_fork do + expect(component.worker).to be_running + + component.worker.enqueue(Datadog::Core::Telemetry::Event::AppHeartbeat.new) + + component.flush + end + + expect(sent_payloads.length).to eq 3 + + payload = sent_payloads[0].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-started', + ) + payload = sent_payloads[1].fetch(:payload) + # The app-dependencies-loaded assertion is also critical here, + # since there is no other test coverage for the + # app-dependencies-loaded event being sent in the forked child. + expect(payload).to include( + 'request_type' => 'app-dependencies-loaded', + ) + payload = sent_payloads[2].fetch(:payload) + expect(payload).to include( + 'request_type' => 'message-batch', + ) + expect(payload.fetch('payload').first).to include( + 'request_type' => 'app-heartbeat', + ) + end + + context 'when initial event is AppStarted' do + let(:initial_event) do + Datadog::Core::Telemetry::Event::AppStarted.new(components: components) + end + + it 'produces correct events in the child' do + component.worker.start(initial_event) + component.flush + + expect(sent_payloads.length).to eq 2 + + payload = sent_payloads[0].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-started', + ) + payload = sent_payloads[1].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-dependencies-loaded', + ) + + fork_and_assert + end + end + + context 'when initial event is SynthAppClientConfigurationChange' do + let(:initial_event) do + Datadog::Core::Telemetry::Event::SynthAppClientConfigurationChange.new(components: Datadog.send(:components)) + end + + it 'produces correct events in the child' do + component.worker.start(initial_event) + component.flush + + expect(sent_payloads.length).to eq 1 + + payload = sent_payloads[0].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-client-configuration-change', + ) + + fork_and_assert + end + end + end + end +end diff --git a/spec/datadog/di/spec_helper.rb b/spec/datadog/di/spec_helper.rb index b7ad4ec8d79..13c313d61f2 100644 --- a/spec/datadog/di/spec_helper.rb +++ b/spec/datadog/di/spec_helper.rb @@ -99,14 +99,6 @@ def deactivate_code_tracking end end - def ruby_2_only - if RUBY_VERSION >= '3' - before(:all) do - skip "Test is only for Ruby 2" - end - end - end - def di_test if PlatformHelpers.jruby? before(:all) do diff --git a/spec/support/core_helpers.rb b/spec/support/core_helpers.rb index 5caca383079..1757d031f2d 100644 --- a/spec/support/core_helpers.rb +++ b/spec/support/core_helpers.rb @@ -71,6 +71,14 @@ def skip_unless_integration_testing_enabled end end + def skip_unless_fork_supported + unless Process.respond_to?(:fork) + before(:all) do + skip 'Fork is not supported on current platform' + end + end + end + # Positional and keyword arguments are both accepted to make the method # work on Ruby 2.5/2.6 and 2.7+. In practice only one type of arguments # should be used in any given call. diff --git a/spec/support/platform_helpers.rb b/spec/support/platform_helpers.rb index 4ac5337e2d9..958cbf0367c 100644 --- a/spec/support/platform_helpers.rb +++ b/spec/support/platform_helpers.rb @@ -85,5 +85,21 @@ def skip_any_instance_on_buggy_jruby end end end + + def ruby_2_only + if RUBY_VERSION >= '3' + before(:all) do + skip "Test is only for Ruby 2" + end + end + end + + def forking_platform_only + if PlatformHelpers.jruby? + before(:all) do + skip "Test requires fork to be implemented, JRuby does not" + end + end + end end end diff --git a/spec/support/synchronization_helpers.rb b/spec/support/synchronization_helpers.rb index a433031f80c..a114450376a 100644 --- a/spec/support/synchronization_helpers.rb +++ b/spec/support/synchronization_helpers.rb @@ -1,11 +1,18 @@ require 'English' module SynchronizationHelpers - def expect_in_fork(fork_expectations: nil, timeout_seconds: 10, trigger_stacktrace_on_kill: false) + def expect_in_fork(fork_expectations: nil, timeout_seconds: 10, trigger_stacktrace_on_kill: false, debug: false) fork_expectations ||= proc { |status:, stdout:, stderr:| expect(status && status.success?).to be(true), "STDOUT:`#{stdout}` STDERR:`#{stderr}" } + if debug + rv = expect_in_fork_debug(fork_expectations: fork_expectations) do + yield + end + return rv + end + fork_stdout = Tempfile.new('datadog-rspec-expect-in-fork-stdout') fork_stderr = Tempfile.new('datadog-rspec-expect-in-fork-stderr') begin @@ -66,6 +73,17 @@ def expect_in_fork(fork_expectations: nil, timeout_seconds: 10, trigger_stacktra end end + # Debug version of expect_in_fork that does not redirect I/O streams and + # has no timeout on execution. The idea is to use it for interactive + # debugging where you would set a break point in the fork. + def expect_in_fork_debug(fork_expectations:, timeout_seconds: 10, trigger_stacktrace_on_kill: false) + pid = fork do + yield + end + _, status = Process.wait2(pid) + fork_expectations.call(status: status, stdout: '', stderr: '') + end + # Waits for the condition provided by the block argument to return truthy. # # Waits for 5 seconds by default.