Skip to content
55 changes: 22 additions & 33 deletions app/jobs/fastly_log_processor.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'zlib'

class FastlyLogProcessor
class AlreadyProcessedError < ::StandardError; end
class LogFileNotFoundError < ::StandardError; end

attr_accessor :bucket, :key

Expand All @@ -11,28 +11,28 @@ def initialize(bucket, key)
end

def perform
counts = download_counts
StatsD.increment('fastly_log_processor.processed')

# Temporary feature flag while we roll out fastly log processing
if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] != 'true'
# Just log & exit w/out updating stats
Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}"
StatsD.increment('fastly_log_processor.processed')
log_ticket = LogTicket.pop(key: key, directory: bucket)
if log_ticket.nil?
StatsD.increment('fastly_log_processor.extra')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does extra mean? Can we make this more descriptive?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra, means a processor that was enqueued but didnt have any ticket/log to process. That should not happen, unless we manually queue jobs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay.

return
end

# Check if this log has already been processed by another job
unless Redis.current.setnx(redis_key, 'processing')
raise FastlyLogProcessor::AlreadyProcessedError, "Already processed bucket: #{bucket} key: #{key}"
end

# Set a short expiry while updating
Redis.current.expire(redis_key, 2.minutes)
counts = download_counts(log_ticket)

Download.bulk_update(munge_for_bulk_update(counts))
# TODO: wrap this in a transation when download update is in the DB

Redis.current.set(redis_key, 'processed')
Redis.current.expire(redis_key, 30.days)
Delayed::Worker.logger.info "Processed Fastly log counts: #{counts.inspect}"
# Temporary feature flag while we roll out fastly log processing
if ENV['FASTLY_LOG_PROCESSOR_ENABLED'] == 'true'
updates = munge_for_bulk_update(counts)
Download.bulk_update(updates)
else
# Just log & exit w/out updating stats
StatsD.increment('fastly_log_processor.disabled')
end
log_ticket.update(status: "processed")
end

# Takes an enumerator of log lines and returns a hash of download counts
Expand All @@ -41,7 +41,11 @@ def perform
# 'rails-4.0.0' => 25,
# 'rails-4.2.0' => 50
# }
def download_counts(enumerator = log_lines)
def download_counts(log_ticket)
file = log_ticket.filesystem.get(key)
raise LogFileNotFoundError if file.nil?
enumerator = file.each_line

enumerator.each_with_object(Hash.new(0)) do |log_line, accum|
path, response_code = log_line.split[10, 2]
# Only count successful downloads
Expand All @@ -67,19 +71,4 @@ def munge_for_bulk_update(download_counts)
name ? [name, path, count] : nil
end.compact
end

def log_lines
s3_body.each_line
end

def s3_body
# TODO: Are rubygems' fastly logs gzipped?
io = Aws::S3::Object.new(bucket_name: bucket, key: key).get.body
io = Zlib::GzipReader.wrap(io) if key.end_with?('.gz')
io
end

def redis_key
"fastly-log:#{bucket}:#{key}"
end
end
23 changes: 23 additions & 0 deletions app/models/log_ticket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
class LogTicket < ActiveRecord::Base
enum backend: [:s3, :local]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this dependent on the environment, rather than the model/record? We shouldn't ever have both in any one environment. For development it would be pretty easy to flip back and forth if needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't ever have both in any one environment.

Not sure about that, specially on staging we could test both. Also if we ever need to fix something manually, we could use local FS easily to do that.


scope :pending, -> { limit(1).lock(true).select("id").where(status: "pending").order("id ASC") }

def self.pop(key: nil, directory: nil)
scope = pending
scope = scope.where(key: key) if key
scope = scope.where(directory: directory) if directory
sql = scope.to_sql

find_by_sql(["UPDATE #{quoted_table_name} SET status = ? WHERE id IN (#{sql}) RETURNING *", 'processing']).first
end

def filesystem
@fs ||=
if s3?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it will be better to hide this branching and passing credentials in RubygemFs class since it is really similar code to that used in RubygemFs#instance.

for example:

@filesystem ||= RubygemFS.new(directory: directory, s3: s3?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i will play with that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I had another idea also:

@filesystem ||= RubygemFS.new(directory: directory, backend: backend)

Just to move responsibility of backend selecting to one place.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with moving this into RubygemsFs.new, it's pretty brittle to make LogTicket know about the different classes and arguments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arthurnn FYI, you could stub out the S3 responses to use a local log file in tests. That's what I did in the FastlyLogProcessor unit tests.

Here's a good blog post all about stubbing AWS responses.

That seems simpler than supporting pluggable RubygemFS backends for LogTicket.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thing about supporting local file is so I can test using development environment too, not just on test. That was a simple win, as we already had the API RubygemFS

RubygemFs::S3.new(bucket: directory)
else
RubygemFs::Local.new(directory)
end
end
end
14 changes: 14 additions & 0 deletions db/migrate/20160227194735_create_log_tickets.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class CreateLogTickets < ActiveRecord::Migration
def change
create_table :log_tickets do |t|
t.string :key
t.string :directory
t.integer :backend, default: 0
t.string :status

t.timestamps null: false
end

add_index :log_tickets, [:directory, :key], unique: true
end
end
13 changes: 12 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 20150709170542) do
ActiveRecord::Schema.define(version: 20160227194735) do

# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
Expand Down Expand Up @@ -76,6 +76,17 @@

add_index "linksets", ["rubygem_id"], name: "index_linksets_on_rubygem_id", using: :btree

create_table "log_tickets", force: :cascade do |t|
t.string "key"
t.string "directory"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, calling S3 buckets directories is confusing b/c key could contain slashes.

If you get rid of the pluggable backends and use S3 stubbing instead, I recommend renaming this to bucket to more clearly map to the S3 domain model.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to make this specific to S3. Thats why I called directory.
Indeed if we removed the backed we could make it specific.

However as I explained above, we already have the s3/local abstraction in RubygemFS, so I made that work, and it allow us to test local development environment too.. End-to-end, so I can just drop a file in a folder, and that will be processed locally.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @ktheory here, but I can see the value for testing.

t.integer "backend", default: 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arthurnn: IMHO, having a facade wrapper around 3rd-party APIs is great, so that's a big 👍 for this PR and RubygemFs. It's definitely useful to test with local file; and the pattern I was using of stubbing out AWS API calls is cumbersome.

OTOH, adding the backend column to db schema to support a testing feature is a smell to me. And the current implementation leads to ​_more_​ test cases, with both the stubbed-out S3 backend, and the local FS backend.

So my pie-in-the-sky ideal would be a way to support S3 stubbing/local files in dev & test envs, without needing the database.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One solution I could do is, instead of having a backend flag on the log file level, I could add a ENV var that would either pull the log from s3 or local fs.
Only draw back about this, is that if we ever wanna fix one file that went wrong, and we wanted to manually drop that file in production some where and have the process working on it, we would have to hack something on top of that.

Also, the extra tests are like 3 tests, 12 lines of code. As the facade itself(RubygemsFS) is already been tested as a unit.

I think having a backend per log, will give us more flexibility and it actually hasn't cost much in terms of code, so thats why I am pushing this approach.

Also, this allow us to process files from other backends in the future, lets say we start pushing logs to kafka, and we want to process them. I know this sounds a bit like crazy talk now, but for the current use case we have, having two backends sounded like a good abstraction area to me.

t.string "status"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
end

add_index "log_tickets", ["directory", "key"], name: "index_log_tickets_on_directory_and_key", unique: true, using: :btree

create_table "oauth_access_grants", force: :cascade do |t|
t.integer "resource_owner_id", null: false
t.integer "application_id", null: false
Expand Down
33 changes: 18 additions & 15 deletions lib/rubygem_fs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,17 @@ def self.instance
if Rails.env.development?
RubygemFs::Local.new
else
RubygemFs::S3.new(access_key_id: ENV['S3_KEY'],
secret_access_key: ENV['S3_SECRET'],
region: Gemcutter.config['s3_region'],
endpoint: "https://#{Gemcutter.config['s3_endpoint']}")
RubygemFs::S3.new
end
end

def self.mock!
@fs = RubygemFs::Local.new
@fs.define_singleton_method(:base_dir) do
@dir ||= Dir.mktmpdir
end
@fs = RubygemFs::Local.new(Dir.mktmpdir)
end

def self.s3!(host)
@fs = RubygemFs::S3.new(access_key_id: 'k',
secret_access_key: 's',
region: Gemcutter.config['s3_region'],
endpoint: host,
force_path_style: true)
@fs.define_singleton_method(:init) do
Expand All @@ -31,6 +24,10 @@ def self.s3!(host)
end

class Local
def initialize(base_dir = nil)
@base_dir = base_dir
end

def store(key, body, _metadata = {})
FileUtils.mkdir_p File.dirname("#{base_dir}/#{key}")
File.open("#{base_dir}/#{key}", 'wb') do |f|
Expand All @@ -51,13 +48,19 @@ def remove(key)
end

def base_dir
Rails.root.join('server')
@base_dir || Rails.root.join('server')
end
end

class S3
def initialize(config)
@config = config
def initialize(config = {})
@bucket = config.delete(:bucket)
@config = {
access_key_id: ENV['S3_KEY'],
secret_access_key: ENV['S3_SECRET'],
region: Gemcutter.config['s3_region'],
endpoint: "https://#{Gemcutter.config['s3_endpoint']}"
}.merge(config)
end

def store(key, body, metadata = {})
Expand All @@ -81,12 +84,12 @@ def restore(key)
s3.delete_object(key: key, bucket: bucket, version_id: version_id)
end

private

def bucket
Gemcutter.config['s3_bucket']
@bucket || Gemcutter.config['s3_bucket']
end

private

def s3
@s3 ||= Aws::S3::Client.new(@config)
end
Expand Down
9 changes: 6 additions & 3 deletions lib/shoryuken/sqs_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ def perform(_sqs_msg, body)

StatsD.increment('fastly_log_processor.s3_entry_fetched')

ActiveRecord::Base.transaction do
s3_objects.each do |bucket, key|
StatsD.increment('fastly_log_processor.enqueued')
s3_objects.each do |bucket, key|
StatsD.increment('fastly_log_processor.enqueued')
begin
LogTicket.create!(backend: "s3", key: key, directory: bucket, status: "pending")
Delayed::Job.enqueue FastlyLogProcessor.new(bucket, key), priority: PRIORITIES[:stats]
rescue ActiveRecord::RecordNotUnique
StatsD.increment('fastly_log_processor.duplicated')
end
end
end
Expand Down
73 changes: 50 additions & 23 deletions test/unit/fastly_log_processor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase
"json-1.8.2" => 4,
"no-such-gem-1.2.3" => 1
}
@log_ticket = LogTicket.create!(backend: 's3', directory: 'test-bucket', key: 'fastly-fake.log', status: "pending")

Aws.config[:s3] = {
stub_responses: { get_object: { body: @sample_log } }
Expand All @@ -28,25 +29,21 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase
ENV['FASTLY_LOG_PROCESSOR_ENABLED'] = @orig_fastly_log_processor_enabled
end

context "#s3_body" do
should "return a readable object " do
assert @job.s3_body.respond_to?(:read)
end
end

context "#log_lines" do
should "return an enumerator" do
assert_kind_of Enumerator, @job.log_lines
context "#download_counts" do
should "process file from s3" do
assert_equal @sample_log_counts, @job.download_counts(@log_ticket)
end

should "have values" do
assert @job.log_lines.first
should "process file from local fs" do
@log_ticket.update(backend: 'local', directory: 'test/sample_logs')
assert_equal @sample_log_counts, @job.download_counts(@log_ticket)
end
end

context "#download_counts" do
should "be correct" do
assert_equal @sample_log_counts, @job.download_counts
should "fail if dont find the file" do
@log_ticket.update(backend: 'local', directory: 'foobar')
assert_raises FastlyLogProcessor::LogFileNotFoundError do
@job.download_counts(@log_ticket)
end
end
end

Expand Down Expand Up @@ -77,27 +74,57 @@ class FastlyLogProcessorTest < ActiveSupport::TestCase
end

context '#perform' do
should "not double count" do
assert_equal 0, Rubygem.find_by_name('json').downloads
3.times { @job.perform }
assert_equal 7, Rubygem.find_by_name('json').downloads
end

should "update download counts" do
@job.perform

@sample_log_counts
.reject { |k, _| k == "no-such-gem-1.2.3" }
.each do |name, expected_count|
assert_equal expected_count, Version.find_by_full_name(name).downloads_count
assert_equal expected_count, Version.find_by_full_name(name).downloads_count, "invalid value for #{name}"
end

assert_equal 7, Rubygem.find_by_name('json').downloads
assert_equal "processed", @log_ticket.reload.status
end

should 'set the redis key' do
should "not run if already processed" do
assert_equal 0, Rubygem.find_by_name('json').downloads
@log_ticket.update(status: 'processed')
@job.perform
assert_equal 'processed', Redis.current.get(@job.redis_key)
assert_in_delta Redis.current.ttl(@job.redis_key), 30.days, 10

assert_equal 0, Rubygem.find_by_name('json').downloads
end

should "not mark as processed if anything fails" do
def @job.download_counts(_)
raise "woops"
end
assert_raises { @job.perform }
refute_equal "processed", @log_ticket.reload.status
end

should "fail if already run" do
Redis.current.set(@job.redis_key, 'processed')
assert_raises(FastlyLogProcessor::AlreadyProcessedError) { @job.perform }
should "not re-process if it failed" do
def @job.download_counts(_)
raise "woops"
end
assert_raises { @job.perform }

@job = FastlyLogProcessor.new('test-bucket', 'fastly-fake.log')
@job.perform
assert_equal 0, Rubygem.find_by_name('json').downloads
end

should "only process the right file" do
ticket = LogTicket.create!(backend: 's3', directory: 'test-bucket', key: 'fastly-fake.2.log', status: "pending")

@job.perform
assert_equal "pending", ticket.reload.status
assert_equal "processed", @log_ticket.reload.status
end
end
end
Expand Down
Loading