Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 280 additions & 0 deletions bin/query-cloudwatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
#!/usr/bin/env ruby
Dir.chdir(File.dirname(__FILE__)) { require 'bundler/setup' }

require 'active_support'
require 'active_support/core_ext/integer/time'
require 'aws-sdk-cloudwatchlogs'
require 'concurrent-ruby'
require 'csv'
require 'json'
require 'optparse'
require 'optparse/time'

$LOAD_PATH.unshift(File.expand_path(File.join(__dir__, '../lib')))
require 'reporting/cloudwatch_client'

class QueryCloudwatch
Config = Struct.new(
:log,
:group,
:slice,
:env,
:app,
:from,
:to,
:query,
:format,
:complete,
:progress,
:wait_duration,
keyword_init: true,
)

attr_reader :config

def initialize(config)
@config = config
end

def run(stdout: STDOUT)
cloudwatch_client.fetch(
query: config.query,
from: config.from,
to: config.to,
) do |row|
stdout.puts format_response(row)
end
rescue Interrupt
# catch interrupts (ctrl-c) and directly exit, this skips printing the backtrace
exit 1
end

# Relies on the fact that hashes preserve insertion order
# @param [Hash] row
# @return [String]
def format_response(row)
case config.format
when :csv
row.values.to_csv
when :json
row.to_json
else
raise "unknown format #{config.format}"
end
end

def cloudwatch_client
@cloudwatch_client ||= Reporting::CloudwatchClient.new(
ensure_complete_logs: config.complete,
log_group_name: config.group,
progress: config.progress,
slice_interval: config.slice,
**config.to_h.slice(:wait_duration).compact,
)
end

# @return [Config]
def self.parse!(argv:, stdin:, stdout:, now: Time.now)
config = Config.new(
log: nil,
group: nil,
slice: parse_duration('1w'),
format: :csv,
to: now,
complete: false,
progress: true,
)

# rubocop:disable Metrics/BlockLength
OptionParser.new do |opts|
opts.banner = <<~TXT
Usage
=======================

#{$PROGRAM_NAME} [OPTIONS]

Script to query cloudwatch, splits a query up across multiple timeframes and combines
results (useful for querying a log period of time)

Examples
=======================

Query last 7 days, shorthand log, group query in STDIN

#{$PROGRAM_NAME} --from 7d --env int --app idp --log events.log <<QUERY
fields @timestamp
| limit 9999
QUERY

Query full log group, full timestamps, query as an arugment

#{$PROGRAM_NAME} \\
--group int_/srv/idp/shared/log/production.log \\
--start "2020-01-01T00:00:00-00:00" \\
--to "2020-12-31T23:59:59-00:00" \\
--query "fields @timestamp | limit 9999"

Timestamps
=======================

* Can be anything Ruby's Time.parse understands:

"2021-01-01T00:00:00-07:00"
"2021-01-01 00:00:00 -0700"

* Can be represented as duration shorthands:

10min - 10 minutes ago
9h - 9 hours ago
8d - 8 days ago
7w - 7 weeks ago
6mon - 6 months ago
5y - 5 years ago

Options
=======================
TXT

opts.on('--env ENV', '(optional)') do |env|
config.env = env
end

opts.on('--app APP', '(optional)') do |app|
config.app = app
end

opts.on('--log LOG', 'shorthand log group (ex "events.log"), needs --app and --env') do |log|
config.log = log
end

opts.on(
'--group GROUP',
'shorthand log group (ex "int_/srv/idp/shared/log/production.log")',
) do |group|
config.group = group
end

opts.on(
'--from STR', '--start STR',
'query start, see Timestamps above for examples'
) do |str|
if (duration = parse_duration(str))
config.from = duration.ago(now)
else
config.from = Time.parse(str) # rubocop:disable Rails/TimeZone
end
end

opts.on(
'--to STR', '--end STR',
'(optional, defaults to now) query end, see Timestamps above for examples'
) do |str|
if (duration = parse_duration(str))
config.to = duration.ago(now)
else
config.to = Time.parse(str) # rubocop:disable Rails/TimeZone
end
end

opts.on('--query QUERY', 'Cloudwatch Insights query') do |query|
config.query = query
end

opts.on('--slice SLICE', '(optional) query slice size duration, defaults to 1w') do |slice|
config.slice = parse_duration(slice)
end

opts.on('--json', 'format output as newline-delimited JSON (nd-json)') do
config.format = :json
end

opts.on('--csv', '(default) format output as CSV') do
config.format = :csv
end

opts.on(
'--[no-]complete',
'whether or not to split query slices if exactly 10k rows are returned, defaults to off',
) do |complete|
config.complete = complete
end

opts.on('--[no-]progress', 'whether or not show a progress bar, defaults to on') do |progress|
config.progress = progress
end

opts.on('-h', '--help', 'Prints this help') do
stdout.puts opts
exit
return # rubocop:disable Lint/NonLocalExitFromIterator, Lint/UnreachableCode
end
end.parse!(argv)
# rubocop:enable Metrics/BlockLength

if (app = config.app) && (env = config.env) && !(log = config.log).blank?
config.group = build_group(app: app, env: env, log: log)
end

errors = []

if config.group.blank?
errors << 'ERROR: missing log group --group or (--app and --env and --log)'
end

errors << 'ERROR: missing time range --from or --start' if !config.from

if !config.query
if stdin.tty?
errors << 'ERROR: missing query, either STDIN or --query)'
else
config.query = stdin.read
end
end

if !errors.empty?
stdout.puts errors
exit 1
return # rubocop:disable Lint/UnreachableCode
end

config
end

# @param [String] log ex production.log
# @param [String] app ex idp
# @param [String] env ex int
def self.build_group(log:, app:, env:)
"#{env}_/srv/#{app}/shared/log/#{log}"
end

# @param [String] value a string such as 1min, 2h, 3d, 4w, 5mon, 6y
# @return [ActiveSupport::Duration]
def self.parse_duration(value)
if (match = value.match(/^(?<number>\d+)(?<unit>\D+)$/))
number = Integer(match[:number], 10)

duration = case match[:unit]
when 'min'
number.minutes
when 'h'
number.hours
when 'd'
number.days
when 'w'
number.weeks
when 'mon'
number.months
when 'y'
number.years
end

return duration if duration
end
end
end

if __FILE__ == $PROGRAM_NAME
config = QueryCloudwatch.parse!(argv: ARGV, stdin: STDIN, stdout: STDOUT)

QueryCloudwatch.new(config).run
end
41 changes: 34 additions & 7 deletions lib/reporting/cloudwatch_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class CloudwatchClient
DEFAULT_WAIT_DURATION = 3
MAX_RESULTS_LIMIT = 10_000

attr_reader :num_threads, :wait_duration, :slice_interval, :logger
attr_reader :num_threads, :wait_duration, :slice_interval, :logger, :log_group_name

# @param [Boolean] ensure_complete_logs when true, will detect when queries return exactly
# 10,000 rows (Cloudwatch Insights max limit) and then recursively split the query window into
Expand All @@ -26,24 +26,34 @@ def initialize(
wait_duration: DEFAULT_WAIT_DURATION,
slice_interval: 1.day,
logger: nil,
progress: true
progress: true,
log_group_name: 'prod_/srv/idp/shared/log/events.log'
)
@ensure_complete_logs = ensure_complete_logs
@num_threads = num_threads
@wait_duration = wait_duration
@slice_interval = slice_interval
@logger = logger
@progress = progress
@log_group_name = log_group_name
end

# Either both (from, to) or time_slices must be provided
# @param [#to_s] query
# @param [Time] from
# @param [Time] to
# @param [Array<Range<Time>>] time_slices Pass an to use specific slices
# @return [Array<Hash>]
# @raise [ArgumentError] raised when incorrect time parameters are received
# @overload fetch(query:, from:, to:)
# The block-less form returns the array of *all* results at the end
# @return [Array<Hash>]
# @overload fetch(query: time_slices:) { |row| "..." }
# The block form yields each result row as its ready to the block and returns nil
# @yieldparam [Hash] row a row of the query result
# @return [nil]
def fetch(query:, from: nil, to: nil, time_slices: nil)
results = Concurrent::Array.new
results = Concurrent::Array.new if !block_given?
each_result_queue = Queue.new if block_given?
in_progress = Concurrent::Hash.new(0)

# Time slices to query, a tuple of range_to_query, range_id [Range<Time>, Integer]
Expand All @@ -68,6 +78,7 @@ def fetch(query:, from: nil, to: nil, time_slices: nil)
log(:debug, "starting query, queue_size=#{queue.length} num_threads=#{num_threads}")
log(:info, "=== query ===\n#{query}\n=== query ===")

# rubocop:disable Metrics/BlockLength
threads = num_threads.times.map do |thread_idx|
Thread.new do
while (range, range_id = queue.pop)
Expand All @@ -91,7 +102,14 @@ def fetch(query:, from: nil, to: nil, time_slices: nil)
else
log(:debug, "worker finished, slice_duration=#{end_time - start_time}")
in_progress[range_id] -= 1
results.concat(parse_results(response.results))
parsed_results = parse_results(response.results)

results&.concat(parsed_results)
if each_result_queue
parsed_results.each do |row|
each_result_queue << row
end
end
end
end

Expand All @@ -102,13 +120,23 @@ def fetch(query:, from: nil, to: nil, time_slices: nil)
thread.abort_on_exception = true
end
end
# rubocop:enable Metrics/BlockLength

if each_result_queue
threads << Thread.new do
while (row = each_result_queue.pop)
yield row
end
end
end

until (num_in_progress = in_progress.sum(&:last)).zero?
@progress_bar&.refresh
log(:debug, "waiting, num_in_progress=#{num_in_progress}, queue_size=#{queue.size}")
sleep wait_duration
end
queue.close
each_result_queue&.close
threads.each(&:value) # wait for all threads

@progress_bar&.finish
Expand All @@ -126,8 +154,7 @@ def fetch_one(query:, start_time:, end_time:)
log(:debug, "starting query: #{start_time}..#{end_time}")

query_id = aws_client.start_query(
# NOTE: this should be configurable as well
log_group_name: 'prod_/srv/idp/shared/log/events.log',
log_group_name: log_group_name,
start_time:,
end_time:,
query_string: query.to_s,
Expand Down
Loading