Skip to content

Commit

Permalink
delete unused method and move to private
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Jul 22, 2019
1 parent aeff561 commit 9cd7d38
Showing 1 changed file with 25 additions and 31 deletions.
56 changes: 25 additions & 31 deletions lib/fluent/plugin/out_forward/ack_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ def ack_reader(select_interval)
end
end

ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do
def expired?(now)
time + timeout < now
end
end

Ack = Struct.new(:id, :handler, :node) do
def enqueue(sock)
handler.enqueue(node, sock, id)
end
end

def create_ack(id, node)
Ack.new(id, self, node)
end

def enqueue(node, sock, cid)
info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now, @timeout)
@mutex.synchronize do
@ack_waitings << info
end
end

private

def read_ack_from_sock(sock)
begin
raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
Expand Down Expand Up @@ -120,37 +145,6 @@ def read_ack_from_sock(sock)
delete(info)
end

def synchronize
@mutex.synchronize do
yield
end
end

ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do
def expired?(now)
time + timeout < now
end
end

Ack = Struct.new(:id, :handler, :node) do
def enqueue(sock)
handler.enqueue(node, sock, id)
end
end

def create_ack(id, node)
Ack.new(id, self, node)
end

def enqueue(node, sock, cid)
info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now, @timeout)
@mutex.synchronize do
@ack_waitings << info
end
end

private

def dump_unique_id_hex(unique_id)
Fluent::UniqueId.hex(unique_id)
end
Expand Down

0 comments on commit 9cd7d38

Please sign in to comment.