Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ Gemfile.lock
.bundle
vendor
.idea
*~
*~
logstash/
82 changes: 31 additions & 51 deletions lib/logstash/outputs/json_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class LogStash::Outputs::JSONBatch < LogStash::Outputs::Base

config :idle_flush_time, :validate => :number, :default => 5

config :retry_individual, :validate => :boolean, :default => false
config :retry_individual, :validate => :boolean, :default => true

config :pool_max, :validate => :number, :default => 50

def register
# Handle this deprecated option. TODO: remove the option
Expand All @@ -35,7 +37,7 @@ def register
# tokens must be added back by the client on success
@request_tokens = SizedQueue.new(@pool_max)
@pool_max.times {|t| @request_tokens << true }

@total = 0
@requests = Array.new

buffer_initialize(
Expand All @@ -53,7 +55,7 @@ def register

end # def register

def receive(event)
def receive(event, async_type=:background)
buffer_receive(event)
end #def event

Expand All @@ -66,39 +68,40 @@ def flush(events, close=false)
documents.push(document)
end

make_request(documents, 0)
make_request(documents)
end

def multi_receive(events)
events.each {|event| receive(event, :parallel)}
client.execute!
end

private

def make_request(documents, count)
def make_request(documents, async_type=:background)
body = LogStash::Json.dump(documents)
# Block waiting for a token
token = @request_tokens.pop


token = @request_tokens.pop if async_type == :background

# Create an async request
begin
request = client.send(:post, @url, :body => body, :headers => request_headers, :async => true)
request = client.send(async_type).send(:post, @url, :body => body, :headers => request_headers, :async => true)
rescue Exception => e
@logger.warn("An error occurred while indexing: #{e.message}")
end

# with Maticore version < 0.5 using :async => true places the requests in an @async_requests
# list which is used & cleaned by Client#execute! but we are not using it here and we must
# purge it manually to avoid leaking requests.
client.clear_pending

# attach handlers before performing request
request.on_complete do
# Make sure we return the token to the pool
@request_tokens << token
@request_tokens << token if async_type == :background
end

request.on_success do |response|
#string = "Some "+ Time.new.inspect + " " + response
#
@total = @total + documents.length
logger.debug("Successfully submitted",
:docs => documents.length,
:response_code => response.code,
:total => @total)
if response.code < 200 || response.code > 299
log_failure(
"Encountered non-200 HTTP code #{response.code}",
Expand All @@ -109,56 +112,33 @@ def make_request(documents, count)
:retry_individual => @retry_individual)
if documents.length > 1 && @retry_individual
documents.each do |doc|
make_request([doc], 0)
make_request([doc])
end
else
puts "%s status code returned for %s docs @ %s\n" % [response.code, body, Time.new.inspect]
end
end
end

request.on_failure do |exception|
if count < 1000
# Workaround due to Manticore sometimes trying to reuse stale connections after idling,
# essentially all threads will fail once and then it will succeeed.
# TODO: better http client
sleep 0.1
make_request(documents, count + 1)
else
log_failure("Could not access URL",
:url => url,
:method => @http_method,
:body => body,
:headers => headers,
:message => exception.message,
:class => exception.class.name,
:backtrace => exception.backtrace
)
end
log_failure("Could not access URL",
:url => url,
:method => @http_method,
:body => body,
:headers => headers,
:message => exception.message,
:class => exception.class.name,
:backtrace => exception.backtrace
)
end

# Invoke it using the Manticore Executor (CachedThreadPool) directly
begin
request_async_background(request)
rescue Exception => e
@logger.warn("An error occurred while indexing: #{e.message}")
end
request.call if async_type == :background
end

# This is split into a separate method mostly to help testing
def log_failure(message, opts)
@logger.error("[HTTP Output Failure] #{message}", opts)
end

# Manticore doesn't provide a way to attach handlers to background or async requests well
# It wants you to use futures. The #async method kinda works but expects single thread batches
# and background only returns futures.
# Proposed fix to manticore here: https://github.com/cheald/manticore/issues/32
def request_async_background(request)
@method ||= client.executor.java_method(:submit, [java.util.concurrent.Callable.java_class])
@method.call(request)
end

def request_headers()
headers = @headers || {}
headers["Content-Type"] ||= "application/json"
Expand Down
12 changes: 4 additions & 8 deletions logstash-output-json_batch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-json_batch'
s.version = '0.1.2'
s.version = '0.2.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This output lets you `POST` messages as JSON in a batched fashions"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -10,7 +10,7 @@ Gem::Specification.new do |s|
s.require_paths = ["lib"]

# Files
s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']
s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','Gemfile','LICENSE' ]

# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
Expand All @@ -19,12 +19,8 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }

# Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 2.1.0", "< 3.0.0"
s.add_runtime_dependency "logstash-mixin-http_client", ">= 2.0.2", "< 3.0.0"

# Constrain Maticore dependency to less than 0.5.0 because of changes in the async handling
# see note in http.rb line 92-93
s.add_runtime_dependency "manticore", "< 0.5.0"
s.add_runtime_dependency "logstash-core-plugin-api", "~> 1.0"
s.add_runtime_dependency "logstash-mixin-http_client", ">= 2.2.1", "< 3.0.0"

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'sinatra'
Expand Down
Empty file removed spec/outputs/json_batch_spec.rb
Empty file.