diff --git a/.gitignore b/.gitignore index c62b51c..fb84911 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ Gemfile.lock .bundle vendor .idea -*~ \ No newline at end of file +*~ +logstash/ diff --git a/lib/logstash/outputs/json_batch.rb b/lib/logstash/outputs/json_batch.rb index 759bc92..9d723af 100644 --- a/lib/logstash/outputs/json_batch.rb +++ b/lib/logstash/outputs/json_batch.rb @@ -23,7 +23,9 @@ class LogStash::Outputs::JSONBatch < LogStash::Outputs::Base config :idle_flush_time, :validate => :number, :default => 5 - config :retry_individual, :validate => :boolean, :default => false + config :retry_individual, :validate => :boolean, :default => true + + config :pool_max, :validate => :number, :default => 50 def register # Handle this deprecated option. TODO: remove the option @@ -35,7 +37,7 @@ def register # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } - + @total = 0 @requests = Array.new buffer_initialize( @@ -53,7 +55,7 @@ def register end # def register - def receive(event) + def receive(event, async_type=:background) buffer_receive(event) end #def event @@ -66,39 +68,40 @@ def flush(events, close=false) documents.push(document) end - make_request(documents, 0) + make_request(documents) + end + + def multi_receive(events) + events.each {|event| receive(event, :parallel)} + client.execute! end private - def make_request(documents, count) + def make_request(documents, async_type=:background) body = LogStash::Json.dump(documents) # Block waiting for a token - token = @request_tokens.pop - - + token = @request_tokens.pop if async_type == :background # Create an async request begin - request = client.send(:post, @url, :body => body, :headers => request_headers, :async => true) + request = client.send(async_type).send(:post, @url, :body => body, :headers => request_headers, :async => true) rescue Exception => e @logger.warn("An error occurred while indexing: #{e.message}") end - # with Maticore version < 0.5 using :async => true places the requests in an @async_requests - # list which is used & cleaned by Client#execute! but we are not using it here and we must - # purge it manually to avoid leaking requests. - client.clear_pending - # attach handlers before performing request request.on_complete do # Make sure we return the token to the pool - @request_tokens << token + @request_tokens << token if async_type == :background end request.on_success do |response| - #string = "Some "+ Time.new.inspect + " " + response - # + @total = @total + documents.length + logger.debug("Successfully submitted", + :docs => documents.length, + :response_code => response.code, + :total => @total) if response.code < 200 || response.code > 299 log_failure( "Encountered non-200 HTTP code #{response.code}", @@ -109,40 +112,26 @@ def make_request(documents, count) :retry_individual => @retry_individual) if documents.length > 1 && @retry_individual documents.each do |doc| - make_request([doc], 0) + make_request([doc]) end - else - puts "%s status code returned for %s docs @ %s\n" % [response.code, body, Time.new.inspect] end end end request.on_failure do |exception| - if count < 1000 - # Workaround due to Manticore sometimes trying to reuse stale connections after idling, - # essentially all threads will fail once and then it will succeeed. - # TODO: better http client - sleep 0.1 - make_request(documents, count + 1) - else - log_failure("Could not access URL", - :url => url, - :method => @http_method, - :body => body, - :headers => headers, - :message => exception.message, - :class => exception.class.name, - :backtrace => exception.backtrace - ) - end + log_failure("Could not access URL", + :url => url, + :method => @http_method, + :body => body, + :headers => headers, + :message => exception.message, + :class => exception.class.name, + :backtrace => exception.backtrace + ) end # Invoke it using the Manticore Executor (CachedThreadPool) directly - begin - request_async_background(request) - rescue Exception => e - @logger.warn("An error occurred while indexing: #{e.message}") - end + request.call if async_type == :background end # This is split into a separate method mostly to help testing @@ -150,15 +139,6 @@ def log_failure(message, opts) @logger.error("[HTTP Output Failure] #{message}", opts) end - # Manticore doesn't provide a way to attach handlers to background or async requests well - # It wants you to use futures. The #async method kinda works but expects single thread batches - # and background only returns futures. - # Proposed fix to manticore here: https://github.com/cheald/manticore/issues/32 - def request_async_background(request) - @method ||= client.executor.java_method(:submit, [java.util.concurrent.Callable.java_class]) - @method.call(request) - end - def request_headers() headers = @headers || {} headers["Content-Type"] ||= "application/json" diff --git a/logstash-output-json_batch.gemspec b/logstash-output-json_batch.gemspec index f0cc53e..62606fa 100644 --- a/logstash-output-json_batch.gemspec +++ b/logstash-output-json_batch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-json_batch' - s.version = '0.1.2' + s.version = '0.2.0' s.licenses = ['Apache License (2.0)'] s.summary = "This output lets you `POST` messages as JSON in a batched fashions" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" @@ -10,7 +10,7 @@ Gem::Specification.new do |s| s.require_paths = ["lib"] # Files - s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT'] + s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','Gemfile','LICENSE' ] # Tests s.test_files = s.files.grep(%r{^(test|spec|features)/}) @@ -19,12 +19,8 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies - s.add_runtime_dependency "logstash-core", ">= 2.1.0", "< 3.0.0" - s.add_runtime_dependency "logstash-mixin-http_client", ">= 2.0.2", "< 3.0.0" - - # Constrain Maticore dependency to less than 0.5.0 because of changes in the async handling - # see note in http.rb line 92-93 - s.add_runtime_dependency "manticore", "< 0.5.0" + s.add_runtime_dependency "logstash-core-plugin-api", "~> 1.0" + s.add_runtime_dependency "logstash-mixin-http_client", ">= 2.2.1", "< 3.0.0" s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'sinatra' diff --git a/spec/outputs/json_batch_spec.rb b/spec/outputs/json_batch_spec.rb deleted file mode 100644 index e69de29..0000000