diff --git a/lib/datadog/core/utils/array.rb b/lib/datadog/core/utils/array.rb new file mode 100644 index 00000000000..d2ae7a5de05 --- /dev/null +++ b/lib/datadog/core/utils/array.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Datadog + module Core + module Utils + # Common array-related utility functions. + module Array + def self.filter_map(array, &block) + if array.respond_to?(:filter_map) + # DEV Supported since Ruby 2.7, saves an intermediate object creation + array.filter_map(&block) + elsif array.is_a?(Enumerator::Lazy) + # You would think that .compact would work here, but it does not: + # the result of .map could be an Enumerator::Lazy instance which + # does not implement #compact on Ruby 2.5/2.6. + array.map(&block).reject do |item| + item.nil? + end + else + array.each_with_object([]) do |item, memo| + new_item = block.call(item) + memo.push(new_item) unless new_item.nil? + end + end + end + end + end + end +end diff --git a/lib/datadog/di/transport/http.rb b/lib/datadog/di/transport/http.rb index 1e40d311922..86adf503329 100644 --- a/lib/datadog/di/transport/http.rb +++ b/lib/datadog/di/transport/http.rb @@ -40,9 +40,13 @@ def input( api_version: nil, headers: nil ) - Core::Transport::HTTP.build(api_instance_class: Input::API::Instance, + Core::Transport::HTTP.build( + api_instance_class: Input::API::Instance, logger: logger, - agent_settings: agent_settings, api_version: api_version, headers: headers) do |transport| + agent_settings: agent_settings, + api_version: api_version, + headers: headers, + ) do |transport| apis = API.defaults transport.api API::INPUT, apis[API::INPUT] diff --git a/lib/datadog/di/transport/input.rb b/lib/datadog/di/transport/input.rb index 0b33e7a12f0..b559f97e120 100644 --- a/lib/datadog/di/transport/input.rb +++ b/lib/datadog/di/transport/input.rb @@ -1,6 +1,11 @@ # frozen_string_literal: true +require_relative '../../core/chunker' +require_relative '../../core/encoding' +require_relative '../../core/tag_builder' require_relative '../../core/transport/parcel' +require_relative '../../core/transport/request' +require_relative '../error' require_relative 'http/input' module Datadog @@ -24,6 +29,25 @@ def initialize(parcel, serialized_tags) class Transport attr_reader :client, :apis, :default_api, :current_api_id, :logger + # The limit on an individual snapshot payload, aka "log line", + # is 1 MB. + # + # TODO There is an RFC for snapshot pruning that should be + # implemented to reduce the size of snapshots to be below this + # limit, so that we can send a portion of the captured data + # rather than dropping the snapshot entirely. + MAX_SERIALIZED_SNAPSHOT_SIZE = 1024 * 1024 + + # The maximum chunk (batch) size that intake permits is 5 MB. + # + # Two bytes are for the [ and ] of JSON array syntax. + MAX_CHUNK_SIZE = 5 * 1024 * 1024 - 2 + + # Try to send smaller payloads to avoid large network requests. + # If a payload is larger than default chunk size but is under the + # max chunk size, it will still get sent out. + DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 + def initialize(apis, default_api, logger:) @apis = apis @logger = logger @@ -36,9 +60,45 @@ def current_api end def send_input(payload, tags) - json = JSON.dump(payload) - parcel = EncodedParcel.new(json) + # Tags are the same for all chunks, serialize them one time. serialized_tags = Core::TagBuilder.serialize_tags(tags) + + encoder = Core::Encoding::JSONEncoder + encoded_snapshots = Core::Utils::Array.filter_map(payload) do |snapshot| + encoded = encoder.encode(snapshot) + if encoded.length > MAX_SERIALIZED_SNAPSHOT_SIZE + # Drop the snapshot. + # TODO report via telemetry metric? + logger.debug { "di: dropping too big snapshot" } + nil + else + encoded + end + end + + Datadog::Core::Chunker.chunk_by_size( + encoded_snapshots, DEFAULT_CHUNK_SIZE, + ).each do |chunk| + # We drop snapshots that are too big earlier. + # The limit on chunked payload length here is greater + # than the limit on snapshot size, therefore no chunks + # can exceed limits here. + chunked_payload = encoder.join(chunk) + + # We need to rescue exceptions for each chunk so that + # subsequent chunks are attempted to be sent. + begin + send_input_chunk(chunked_payload, serialized_tags) + rescue => exc + logger.debug { "di: failed to send snapshot chunk: #{exc.class}: #{exc} (at #{exc.backtrace.first})" } + end + end + + payload + end + + def send_input_chunk(chunked_payload, serialized_tags) + parcel = EncodedParcel.new(chunked_payload) request = Request.new(parcel, serialized_tags) response = @client.send_input_payload(request) diff --git a/lib/datadog/tracing/transport/traces.rb b/lib/datadog/tracing/transport/traces.rb index a7b37915f87..5eaabea27a6 100644 --- a/lib/datadog/tracing/transport/traces.rb +++ b/lib/datadog/tracing/transport/traces.rb @@ -3,6 +3,7 @@ require_relative '../../core/chunker' require_relative '../../core/transport/parcel' require_relative '../../core/transport/request' +require_relative '../../core/utils/array' require_relative 'serializable_trace' require_relative 'trace_formatter' @@ -65,11 +66,8 @@ def initialize(encoder, logger:, native_events_supported:, max_size: DEFAULT_MAX # @return [Enumerable[Array[Bytes,Integer]]] list of encoded chunks: each containing a byte array and # number of traces def encode_in_chunks(traces) - encoded_traces = if traces.respond_to?(:filter_map) - # DEV Supported since Ruby 2.7, saves an intermediate object creation - traces.filter_map { |t| encode_one(t) } - else - traces.map { |t| encode_one(t) }.reject(&:nil?) + encoded_traces = Core::Utils::Array.filter_map(traces) do |trace| + encode_one(trace) end Datadog::Core::Chunker.chunk_by_size(encoded_traces, max_size).map do |chunk| diff --git a/sig/datadog/core/utils/array.rbs b/sig/datadog/core/utils/array.rbs new file mode 100644 index 00000000000..172f0fd13ec --- /dev/null +++ b/sig/datadog/core/utils/array.rbs @@ -0,0 +1,9 @@ +module Datadog + module Core + module Utils + module Array + def self.filter_map: (::Array[any] array) { (any) -> any } -> ::Array[any] + end + end + end +end diff --git a/sig/datadog/di/transport/input.rbs b/sig/datadog/di/transport/input.rbs index 117d812db6a..4bb26d3cb48 100644 --- a/sig/datadog/di/transport/input.rbs +++ b/sig/datadog/di/transport/input.rbs @@ -30,12 +30,17 @@ module Datadog attr_reader current_api_id: untyped attr_reader logger: untyped + MAX_SERIALIZED_SNAPSHOT_SIZE: ::Integer + MAX_CHUNK_SIZE: ::Integer + DEFAULT_CHUNK_SIZE: ::Integer def initialize: (untyped apis, untyped default_api, logger: untyped) -> void def current_api: () -> untyped def send_input: (untyped payload, untyped tags) -> untyped + + def send_input_chunk: (untyped chunked_payload, untyped serialized_tags) -> untyped end end end diff --git a/spec/datadog/di/transport/input_spec.rb b/spec/datadog/di/transport/input_spec.rb new file mode 100644 index 00000000000..de4379cc163 --- /dev/null +++ b/spec/datadog/di/transport/input_spec.rb @@ -0,0 +1,78 @@ +require "datadog/di/spec_helper" +require 'datadog/di/transport/http' + +RSpec.describe Datadog::DI::Transport::Input::Transport do + di_test + + let(:transport) do + Datadog::DI::Transport::HTTP.input(agent_settings: agent_settings, logger: logger) + end + + let(:agent_settings) { Datadog::Core::Configuration::AgentSettingsResolver.call(settings, logger: nil) } + + let(:settings) do + Datadog::Core::Configuration::Settings.new + end + + let(:logger) do + instance_double(Logger) + end + + let(:tags) { [] } + + context 'when the combined size of snapshots serialized exceeds intake max' do + before do + # Reduce limits to make the test run faster and not require a lot of memory + stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 100_000) + stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 200_000) + end + + let(:snapshot) do + # It doesn't matter what the payload is, generate a fake one here. + # This payload serializes to 9781 bytes of JSON. + 1000.times.map do |i| + [i, i] + end.to_h + end + + let(:snapshots) do + # This serializes to 978201 bytes of JSON - just under 1 MB. + [snapshot] * 100 + end + + it 'chunks snapshots' do + # Just under 1 MB payload, default chunk size ~100 KB, we expect 10 chunks + expect(transport).to receive(:send_input_chunk).exactly(10).times do |chunked_payload, serialized_tags| + expect(chunked_payload.length).to be < 100_000 + expect(chunked_payload.length).to be > 90_000 + end + transport.send_input(snapshots, tags) + end + + context 'when individual snapshot exceeds intake max' do + before do + # Reduce limits even more to force a reasonably-sized snapshot to be dropped + stub_const('Datadog::DI::Transport::Input::Transport::MAX_SERIALIZED_SNAPSHOT_SIZE', 2_000) + end + + let(:small_snapshot) do + 20.times.map do |i| + [i, i] + end.to_h + end + + let(:snapshots) do + [small_snapshot, snapshot] + end + + it 'drops snapshot that is too big' do + expect(transport).to receive(:send_input_chunk).once do |chunked_payload, serialized_tags| + expect(chunked_payload.length).to be < 1_000 + expect(chunked_payload.length).to be > 100 + end + expect_lazy_log(logger, :debug, 'di: dropping too big snapshot') + transport.send_input(snapshots, tags) + end + end + end +end