Skip to content

Commit

Permalink
Move several cloud config bits to telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
jnunemaker committed Nov 26, 2023
1 parent d707465 commit bcfed9e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 58 deletions.
42 changes: 14 additions & 28 deletions lib/flipper/cloud/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,15 @@ class Configuration
# Public: Should telemetry be enabled or not (default: false).
attr_accessor :telemetry_enabled

# Public: The Integer for Float number of seconds between submission of
# telemetry to Cloud (default: 60, minimum: 10).
attr_reader :telemetry_interval

# Public: The Integer or Float number of seconds to wait for telemetry
# to shutdown (default: 5).
attr_accessor :telemetry_shutdown_timeout

def initialize(options = {})
setup_auth options
setup_log options
setup_http options
setup_sync options
setup_adapter options
setup_telemetry options
end

# Public: Change the telemetry interval.
def telemetry_interval=(value)
value = Typecast.to_float(value)
@telemetry_interval = value
enforce_minimum :telemetry_interval, 10
value
end

# Public: Read or customize the http adapter. Calling without a block will
# perform a read. Calling with a block yields the cloud adapter
# for customization.
Expand Down Expand Up @@ -187,6 +172,17 @@ def setup_auth(options)
set_option :token, options, required: true
end

def setup_log(options)
set_option :logging_enabled, options, default: true, typecast: :boolean
set_option :logger, options, from_env: false, default: -> {
if logging_enabled
Logger.new(STDOUT)
else
Logger.new("/dev/null")
end
}
end

def setup_http(options)
set_option :url, options, default: DEFAULT_URL
set_option :debug_output, options, from_env: false
Expand All @@ -206,18 +202,8 @@ def setup_adapter(options)
end

def setup_telemetry(options)
set_option :telemetry_interval, options, default: 60 # typecast and minimum set in writer method.
set_option :telemetry_shutdown_timeout, options, default: 5, typecast: :float, minimum: 0.1
set_option :logging_enabled, options, default: true, typecast: :boolean
set_option :logger, options, from_env: false, default: -> {
if logging_enabled
Logger.new(STDOUT)
else
Logger.new("/dev/null")
end
}

# Needs to be after url and other telemetry config assignments.
# Needs to be after url and token assignments because they are used for
# uniqueness in Telemetry.instance_for.
set_option :telemetry, options, from_env: false, default: -> {
Telemetry.instance_for(self)
}
Expand Down
56 changes: 45 additions & 11 deletions lib/flipper/cloud/telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "concurrent/executor/fixed_thread_pool"
require "flipper/cloud/telemetry/metric"
require "flipper/cloud/telemetry/metric_storage"
require "flipper/cloud/telemetry/submitter"

module Flipper
module Cloud
Expand All @@ -28,16 +29,35 @@ def self.instance_for(cloud_configuration)
end
end

attr_reader :cloud_configuration, :metric_storage, :pool, :timer
# Public: The cloud configuration to use for this telemetry instance.
attr_reader :cloud_configuration

# Internal: Where the metrics are stored between cloud submissions.
attr_reader :metric_storage

# Internal: The pool of background threads that submits metrics to cloud.
attr_reader :pool

# Internal: The timer that triggers draining the metrics to the pool.
attr_reader :timer

# Internal: The interval in seconds for how often telemetry should be sent to cloud.
attr_reader :interval

# Internal: The timeout in seconds for how long to wait for the pool to shutdown.
attr_reader :shutdown_timeout

# Internal: The proc that is called to submit metrics to cloud.
attr_accessor :submitter

def initialize(cloud_configuration)
@pid = $$
@cloud_configuration = cloud_configuration
@submitter = ->(drained) {
Telemetry::Submitter.new(@cloud_configuration).call(drained)
self.submitter = ->(drained) {
Submitter.new(@cloud_configuration).call(drained)
}
self.interval = ENV.fetch("FLIPPER_TELEMETRY_INTERVAL", 60).to_f
self.shutdown_timeout = ENV.fetch("FLIPPER_TELEMETRY_SHUTDOWN_TIMEOUT", 5).to_f
start
at_exit { stop }
end
Expand All @@ -52,7 +72,7 @@ def record(name, payload)
@metric_storage.increment metric
end

# Start all the tasks and setup new metric storage.
# Public: Start all the tasks and setup new metric storage.
def start
info "action=start"

Expand All @@ -65,12 +85,12 @@ def start
})

@timer = Concurrent::TimerTask.execute({
execution_interval: @cloud_configuration.telemetry_interval,
execution_interval: interval,
name: "flipper-telemetry-post-to-pool-timer".freeze,
}) { post_to_pool }
end

# Shuts down all the tasks and tries to flush any remaining info to Cloud.
# Public: Shuts down all the tasks and tries to flush any remaining info to Cloud.
def stop
info "action=stop"

Expand All @@ -88,17 +108,31 @@ def stop
post_to_pool # one last drain
debug "action=pool_shutdown_start"
@pool.shutdown
pool_termination_result = @pool.wait_for_termination(@cloud_configuration.telemetry_shutdown_timeout)
pool_termination_result = @pool.wait_for_termination(@shutdown_timeout)
@pool.kill unless pool_termination_result
debug "action=pool_shutdown_end result=#{pool_termination_result}"
end
end

# Public: Restart all the tasks and reset the storage.
def restart
stop
start
end

# Internal: Sets the interval in seconds for how often telemetry should be sent to cloud.
def interval=(value)
new_interval = [Typecast.to_float(value), 10].max
@timer&.execution_interval = new_interval
@interval = new_interval
end

# Internal: Sets the timeout in seconds for how long to wait for the pool to shutdown.
def shutdown_timeout=(value)
new_shutdown_timeout = [Typecast.to_float(value), 0.1].max
@shutdown_timeout = new_shutdown_timeout
end

private

def detect_forking
Expand All @@ -109,13 +143,15 @@ def detect_forking
end
end

# Drains the metric storage and enqueues the metrics to be posted to cloud.
def post_to_pool
drained = @metric_storage.drain
return if drained.empty?
debug "action=post_to_pool metrics=#{drained.size}"
@pool.post { post_to_cloud(drained) }
end

# Posts the drained metrics to cloud.
def post_to_cloud(drained)
debug "action=post_to_cloud metrics=#{drained.size}"
response, error = submitter.call(drained)
Expand All @@ -124,10 +160,8 @@ def post_to_cloud(drained)
# thus may have a telemetry-interval header for us to respect.
response ||= error.response if error && error.respond_to?(:response)

if response && telemetry_interval = response["telemetry-interval"]
telemetry_interval = telemetry_interval.to_i
@timer.execution_interval = telemetry_interval
@cloud_configuration.telemetry_interval = telemetry_interval
if response && interval = response["telemetry-interval"]
self.interval = interval.to_f
end
rescue => error
error "action=post_to_cloud error=#{error.inspect}"
Expand Down
15 changes: 0 additions & 15 deletions spec/flipper/cloud/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,6 @@
expect(poller.interval).to eq(20)
end

it "can set telemetry_interval" do
instance = described_class.new(required_options.merge(telemetry_interval: 10))
expect(instance.telemetry_interval).to eq(10)
end

it "defaults telemetry_interval" do
instance = described_class.new(required_options)
expect(instance.telemetry_interval).to eq(60)
end

it "cannot set telemetry_interval to lower than 10" do
instance = described_class.new(required_options.merge(telemetry_interval: 9))
expect(instance.telemetry_interval).to eq(10)
end

it "can set debug_output" do
instance = described_class.new(required_options.merge(debug_output: STDOUT))
expect(instance.debug_output).to eq(STDOUT)
Expand Down
8 changes: 4 additions & 4 deletions spec/flipper/cloud/telemetry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
})
telemetry.stop

expect(cloud_configuration.telemetry_interval).to eq(60)
expect(telemetry.interval).to eq(60)
expect(telemetry.timer.execution_interval).to eq(60)
expect(stub).to have_been_requested
end
Expand All @@ -38,7 +38,7 @@
})
telemetry.stop

expect(cloud_configuration.telemetry_interval).to eq(120)
expect(telemetry.interval).to eq(120)
expect(telemetry.timer.execution_interval).to eq(120)
expect(stub).to have_been_requested
end
Expand Down Expand Up @@ -69,7 +69,7 @@
telemetry.stop

# Check the conig interval and the timer interval.
expect(cloud_configuration.telemetry_interval).to eq(120)
expect(telemetry.interval).to eq(120)
expect(telemetry.timer.execution_interval).to eq(120)
expect(stub).to have_been_requested.times(10)
end
Expand Down Expand Up @@ -99,7 +99,7 @@
})
telemetry.stop

expect(cloud_configuration.telemetry_interval).to eq(60)
expect(telemetry.interval).to eq(60)
expect(telemetry.timer.execution_interval).to eq(60)
expect(stub).to have_been_requested.times(10)
end
Expand Down

0 comments on commit bcfed9e

Please sign in to comment.