Skip to content
29 changes: 29 additions & 0 deletions lib/datadog/core/utils/array.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module Datadog
module Core
module Utils
# Common array-related utility functions.
module Array
def self.filter_map(array, &block)
if array.respond_to?(:filter_map)
# DEV Supported since Ruby 2.7, saves an intermediate object creation
array.filter_map(&block)
elsif array.is_a?(Enumerator::Lazy)
# You would think that .compact would work here, but it does not:
# the result of .map could be an Enumerator::Lazy instance which
# does not implement #compact on Ruby 2.5/2.6.
array.map(&block).reject do |item|
item.nil?
end
else
array.each_with_object([]) do |item, memo|
new_item = block.call(item)
memo.push(new_item) unless new_item.nil?
end
end
end
end
end
end
end
8 changes: 6 additions & 2 deletions lib/datadog/di/transport/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ def input(
api_version: nil,
headers: nil
)
Core::Transport::HTTP.build(api_instance_class: Input::API::Instance,
Core::Transport::HTTP.build(
api_instance_class: Input::API::Instance,
logger: logger,
agent_settings: agent_settings, api_version: api_version, headers: headers) do |transport|
agent_settings: agent_settings,
api_version: api_version,
headers: headers,
) do |transport|
apis = API.defaults

transport.api API::INPUT, apis[API::INPUT]
Expand Down
64 changes: 62 additions & 2 deletions lib/datadog/di/transport/input.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# frozen_string_literal: true

require_relative '../../core/chunker'
require_relative '../../core/encoding'
require_relative '../../core/tag_builder'
require_relative '../../core/transport/parcel'
require_relative '../../core/transport/request'
require_relative '../error'
require_relative 'http/input'

module Datadog
Expand All @@ -24,6 +29,25 @@ def initialize(parcel, serialized_tags)
class Transport
attr_reader :client, :apis, :default_api, :current_api_id, :logger

# The limit on an individual snapshot payload, aka "log line",
# is 1 MB.
#
# TODO There is an RFC for snapshot pruning that should be
# implemented to reduce the size of snapshots to be below this
# limit, so that we can send a portion of the captured data
# rather than dropping the snapshot entirely.
MAX_SERIALIZED_SNAPSHOT_SIZE = 1024 * 1024

# The maximum chunk (batch) size that intake permits is 5 MB.
#
# Two bytes are for the [ and ] of JSON array syntax.
MAX_CHUNK_SIZE = 5 * 1024 * 1024 - 2

# Try to send smaller payloads to avoid large network requests.
# If a payload is larger than default chunk size but is under the
# max chunk size, it will still get sent out.
DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024

def initialize(apis, default_api, logger:)
@apis = apis
@logger = logger
Expand All @@ -36,9 +60,45 @@ def current_api
end

def send_input(payload, tags)
json = JSON.dump(payload)
parcel = EncodedParcel.new(json)
# Tags are the same for all chunks, serialize them one time.
serialized_tags = Core::TagBuilder.serialize_tags(tags)

encoder = Core::Encoding::JSONEncoder
encoded_snapshots = Core::Utils::Array.filter_map(payload) do |snapshot|
encoded = encoder.encode(snapshot)
if encoded.length > MAX_SERIALIZED_SNAPSHOT_SIZE
# Drop the snapshot.
# TODO report via telemetry metric?
logger.debug { "di: dropping too big snapshot" }
nil
else
encoded
end
end

Datadog::Core::Chunker.chunk_by_size(
encoded_snapshots, DEFAULT_CHUNK_SIZE,
).each do |chunk|
# We drop snapshots that are too big earlier.
# The limit on chunked payload length here is greater
# than the limit on snapshot size, therefore no chunks
# can exceed limits here.
chunked_payload = encoder.join(chunk)

# We need to rescue exceptions for each chunk so that
# subsequent chunks are attempted to be sent.
begin
send_input_chunk(chunked_payload, serialized_tags)
rescue => exc
logger.debug { "di: failed to send snapshot chunk: #{exc.class}: #{exc} (at #{exc.backtrace.first})" }
end
end

payload
end

def send_input_chunk(chunked_payload, serialized_tags)
parcel = EncodedParcel.new(chunked_payload)
request = Request.new(parcel, serialized_tags)

response = @client.send_input_payload(request)
Expand Down
8 changes: 3 additions & 5 deletions lib/datadog/tracing/transport/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative '../../core/chunker'
require_relative '../../core/transport/parcel'
require_relative '../../core/transport/request'
require_relative '../../core/utils/array'
require_relative 'serializable_trace'
require_relative 'trace_formatter'

Expand Down Expand Up @@ -65,11 +66,8 @@ def initialize(encoder, logger:, native_events_supported:, max_size: DEFAULT_MAX
# @return [Enumerable[Array[Bytes,Integer]]] list of encoded chunks: each containing a byte array and
# number of traces
def encode_in_chunks(traces)
encoded_traces = if traces.respond_to?(:filter_map)
# DEV Supported since Ruby 2.7, saves an intermediate object creation
traces.filter_map { |t| encode_one(t) }
else
traces.map { |t| encode_one(t) }.reject(&:nil?)
encoded_traces = Core::Utils::Array.filter_map(traces) do |trace|
encode_one(trace)
end

Datadog::Core::Chunker.chunk_by_size(encoded_traces, max_size).map do |chunk|
Expand Down
9 changes: 9 additions & 0 deletions sig/datadog/core/utils/array.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Datadog
module Core
module Utils
module Array
def self.filter_map: (::Array[any] array) { (any) -> any } -> ::Array[any]
end
end
end
end
5 changes: 5 additions & 0 deletions sig/datadog/di/transport/input.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ module Datadog
attr_reader current_api_id: untyped

attr_reader logger: untyped
MAX_SERIALIZED_SNAPSHOT_SIZE: ::Integer
MAX_CHUNK_SIZE: ::Integer
DEFAULT_CHUNK_SIZE: ::Integer

def initialize: (untyped apis, untyped default_api, logger: untyped) -> void

def current_api: () -> untyped

def send_input: (untyped payload, untyped tags) -> untyped

def send_input_chunk: (untyped chunked_payload, untyped serialized_tags) -> untyped
end
end
end
Expand Down
78 changes: 78 additions & 0 deletions spec/datadog/di/transport/input_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
require "datadog/di/spec_helper"
require 'datadog/di/transport/http'

RSpec.describe Datadog::DI::Transport::Input::Transport do
di_test

let(:transport) do
Datadog::DI::Transport::HTTP.input(agent_settings: agent_settings, logger: logger)
end

let(:agent_settings) { Datadog::Core::Configuration::AgentSettingsResolver.call(settings, logger: nil) }

let(:settings) do
Datadog::Core::Configuration::Settings.new
end

let(:logger) do
instance_double(Logger)
end

let(:tags) { [] }

context 'when the combined size of snapshots serialized exceeds intake max' do
before do
# Reduce limits to make the test run faster and not require a lot of memory
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 100_000)
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 200_000)
end

let(:snapshot) do
# It doesn't matter what the payload is, generate a fake one here.
# This payload serializes to 9781 bytes of JSON.
1000.times.map do |i|
[i, i]
end.to_h
end

let(:snapshots) do
# This serializes to 978201 bytes of JSON - just under 1 MB.
[snapshot] * 100
end

it 'chunks snapshots' do
# Just under 1 MB payload, default chunk size ~100 KB, we expect 10 chunks
expect(transport).to receive(:send_input_chunk).exactly(10).times do |chunked_payload, serialized_tags|
expect(chunked_payload.length).to be < 100_000
expect(chunked_payload.length).to be > 90_000
end
transport.send_input(snapshots, tags)
end

context 'when individual snapshot exceeds intake max' do
before do
# Reduce limits even more to force a reasonably-sized snapshot to be dropped
stub_const('Datadog::DI::Transport::Input::Transport::MAX_SERIALIZED_SNAPSHOT_SIZE', 2_000)
end

let(:small_snapshot) do
20.times.map do |i|
[i, i]
end.to_h
end

let(:snapshots) do
[small_snapshot, snapshot]
end

it 'drops snapshot that is too big' do
expect(transport).to receive(:send_input_chunk).once do |chunked_payload, serialized_tags|
expect(chunked_payload.length).to be < 1_000
expect(chunked_payload.length).to be > 100
end
expect_lazy_log(logger, :debug, 'di: dropping too big snapshot')
transport.send_input(snapshots, tags)
end
end
end
end