Skip to content

Commit

Permalink
Merge pull request #1809 from fluent/improve-on-logging
Browse files Browse the repository at this point in the history
Improve on logging
  • Loading branch information
repeatedly authored Jan 5, 2018
2 parents 5fc0ea1 + d6760e2 commit ce6cda0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 23 deletions.
24 changes: 12 additions & 12 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ def skipped_type?(type)
end
end

def on_trace(&block)
def on_trace
return if @level > LEVEL_TRACE
block.call if block
yield
end

def trace(*args, &block)
Expand All @@ -282,9 +282,9 @@ def trace_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_TRACE)
end

def on_debug(&block)
def on_debug
return if @level > LEVEL_DEBUG
block.call if block
yield
end

def debug(*args, &block)
Expand All @@ -302,9 +302,9 @@ def debug_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_DEBUG)
end

def on_info(&block)
def on_info
return if @level > LEVEL_INFO
block.call if block
yield
end

def info(*args, &block)
Expand All @@ -322,9 +322,9 @@ def info_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_INFO)
end

def on_warn(&block)
def on_warn
return if @level > LEVEL_WARN
block.call if block
yield
end

def warn(*args, &block)
Expand All @@ -342,9 +342,9 @@ def warn_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_WARN)
end

def on_error(&block)
def on_error
return if @level > LEVEL_ERROR
block.call if block
yield
end

def error(*args, &block)
Expand All @@ -362,9 +362,9 @@ def error_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_ERROR)
end

def on_fatal(&block)
def on_fatal
return if @level > LEVEL_FATAL
block.call if block
yield
end

def fatal(*args, &block)
Expand Down
26 changes: 17 additions & 9 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ def new_metadata(timekey: nil, tag: nil, variables: nil)
end

def add_metadata(metadata)
log.trace "adding metadata", instance: self.object_id, metadata: metadata
log.on_trace { log.trace "adding metadata", instance: self.object_id, metadata: metadata }

synchronize do
if i = @metadata_list.index(metadata)
@metadata_list[i]
Expand All @@ -263,7 +264,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
return if metadata_and_data.size < 1
raise BufferOverflowError, "buffer space has too many data" unless storable?

log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size
log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size }

staged_bytesize = 0
operated_chunks = []
Expand Down Expand Up @@ -382,7 +383,8 @@ def queued?(metadata=nil)
end

def enqueue_chunk(metadata)
log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata
log.on_trace { log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata }

chunk = synchronize do
@stage.delete(metadata)
end
Expand All @@ -406,7 +408,8 @@ def enqueue_chunk(metadata)
end

def enqueue_unstaged_chunk(chunk)
log.trace "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata
log.on_trace { log.trace "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata }

synchronize do
chunk.synchronize do
metadata = chunk.metadata
Expand All @@ -419,7 +422,8 @@ def enqueue_unstaged_chunk(chunk)
end

def enqueue_all
log.trace "enqueueing all chunks in buffer", instance: self.object_id
log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id }

if block_given?
synchronize{ @stage.keys }.each do |metadata|
# NOTE: The following line might cause data race depending on Ruby implementations except CRuby
Expand All @@ -438,7 +442,8 @@ def enqueue_all

def dequeue_chunk
return nil if @queue.empty?
log.trace "dequeueing a chunk", instance: self.object_id
log.on_trace { log.trace "dequeueing a chunk", instance: self.object_id }

synchronize do
chunk = @queue.shift

Expand All @@ -453,7 +458,8 @@ def dequeue_chunk
end

def takeback_chunk(chunk_id)
log.trace "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id)
log.on_trace { log.trace "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id) }

synchronize do
chunk = @dequeued.delete(chunk_id)
return false unless chunk # already purged by other thread
Expand All @@ -470,7 +476,8 @@ def purge_chunk(chunk_id)
return nil unless chunk # purged by other threads

metadata = chunk.metadata
log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
log.on_trace { log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata }

begin
bytesize = chunk.bytesize
chunk.purge
Expand All @@ -489,7 +496,8 @@ def purge_chunk(chunk_id)
end

def clear_queue!
log.trace "clearing queue", instance: self.object_id
log.on_trace { log.trace "clearing queue", instance: self.object_id }

synchronize do
until @queue.empty?
begin
Expand Down
5 changes: 3 additions & 2 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,8 @@ def handle_stream_simple(tag, es, enqueue: false)
end

def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed
log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed }

if delayed
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id }
Expand Down Expand Up @@ -1057,7 +1058,7 @@ def try_flush
chunk = @buffer.dequeue_chunk
return unless chunk

log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id)
log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) }

output = self
using_secondary = false
Expand Down

0 comments on commit ce6cda0

Please sign in to comment.