diff --git a/bin/query-cloudwatch b/bin/query-cloudwatch new file mode 100755 index 00000000000..bdd13291abe --- /dev/null +++ b/bin/query-cloudwatch @@ -0,0 +1,280 @@ +#!/usr/bin/env ruby +Dir.chdir(File.dirname(__FILE__)) { require 'bundler/setup' } + +require 'active_support' +require 'active_support/core_ext/integer/time' +require 'aws-sdk-cloudwatchlogs' +require 'concurrent-ruby' +require 'csv' +require 'json' +require 'optparse' +require 'optparse/time' + +$LOAD_PATH.unshift(File.expand_path(File.join(__dir__, '../lib'))) +require 'reporting/cloudwatch_client' + +class QueryCloudwatch + Config = Struct.new( + :log, + :group, + :slice, + :env, + :app, + :from, + :to, + :query, + :format, + :complete, + :progress, + :wait_duration, + keyword_init: true, + ) + + attr_reader :config + + def initialize(config) + @config = config + end + + def run(stdout: STDOUT) + cloudwatch_client.fetch( + query: config.query, + from: config.from, + to: config.to, + ) do |row| + stdout.puts format_response(row) + end + rescue Interrupt + # catch interrupts (ctrl-c) and directly exit, this skips printing the backtrace + exit 1 + end + + # Relies on the fact that hashes preserve insertion order + # @param [Hash] row + # @return [String] + def format_response(row) + case config.format + when :csv + row.values.to_csv + when :json + row.to_json + else + raise "unknown format #{config.format}" + end + end + + def cloudwatch_client + @cloudwatch_client ||= Reporting::CloudwatchClient.new( + ensure_complete_logs: config.complete, + log_group_name: config.group, + progress: config.progress, + slice_interval: config.slice, + **config.to_h.slice(:wait_duration).compact, + ) + end + + # @return [Config] + def self.parse!(argv:, stdin:, stdout:, now: Time.now) + config = Config.new( + log: nil, + group: nil, + slice: parse_duration('1w'), + format: :csv, + to: now, + complete: false, + progress: true, + ) + + # rubocop:disable Metrics/BlockLength + OptionParser.new do |opts| + opts.banner = <<~TXT + Usage + ======================= + + #{$PROGRAM_NAME} [OPTIONS] + + Script to query cloudwatch, splits a query up across multiple timeframes and combines + results (useful for querying a log period of time) + + Examples + ======================= + + Query last 7 days, shorthand log, group query in STDIN + + #{$PROGRAM_NAME} --from 7d --env int --app idp --log events.log <\d+)(?\D+)$/)) + number = Integer(match[:number], 10) + + duration = case match[:unit] + when 'min' + number.minutes + when 'h' + number.hours + when 'd' + number.days + when 'w' + number.weeks + when 'mon' + number.months + when 'y' + number.years + end + + return duration if duration + end + end +end + +if __FILE__ == $PROGRAM_NAME + config = QueryCloudwatch.parse!(argv: ARGV, stdin: STDIN, stdout: STDOUT) + + QueryCloudwatch.new(config).run +end diff --git a/lib/reporting/cloudwatch_client.rb b/lib/reporting/cloudwatch_client.rb index d787af98b20..cc101b98ee0 100644 --- a/lib/reporting/cloudwatch_client.rb +++ b/lib/reporting/cloudwatch_client.rb @@ -8,7 +8,7 @@ class CloudwatchClient DEFAULT_WAIT_DURATION = 3 MAX_RESULTS_LIMIT = 10_000 - attr_reader :num_threads, :wait_duration, :slice_interval, :logger + attr_reader :num_threads, :wait_duration, :slice_interval, :logger, :log_group_name # @param [Boolean] ensure_complete_logs when true, will detect when queries return exactly # 10,000 rows (Cloudwatch Insights max limit) and then recursively split the query window into @@ -26,7 +26,8 @@ def initialize( wait_duration: DEFAULT_WAIT_DURATION, slice_interval: 1.day, logger: nil, - progress: true + progress: true, + log_group_name: 'prod_/srv/idp/shared/log/events.log' ) @ensure_complete_logs = ensure_complete_logs @num_threads = num_threads @@ -34,6 +35,7 @@ def initialize( @slice_interval = slice_interval @logger = logger @progress = progress + @log_group_name = log_group_name end # Either both (from, to) or time_slices must be provided @@ -41,9 +43,17 @@ def initialize( # @param [Time] from # @param [Time] to # @param [Array>] time_slices Pass an to use specific slices - # @return [Array] + # @raise [ArgumentError] raised when incorrect time parameters are received + # @overload fetch(query:, from:, to:) + # The block-less form returns the array of *all* results at the end + # @return [Array] + # @overload fetch(query: time_slices:) { |row| "..." } + # The block form yields each result row as its ready to the block and returns nil + # @yieldparam [Hash] row a row of the query result + # @return [nil] def fetch(query:, from: nil, to: nil, time_slices: nil) - results = Concurrent::Array.new + results = Concurrent::Array.new if !block_given? + each_result_queue = Queue.new if block_given? in_progress = Concurrent::Hash.new(0) # Time slices to query, a tuple of range_to_query, range_id [Range