-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathload_balancer.rb
114 lines (99 loc) · 3.18 KB
/
load_balancer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/output'
require 'fluent/plugin/out_forward/error'
module Fluent::Plugin
class ForwardOutput < Output
class LoadBalancer
def initialize(log)
@log = log
@weight_array = []
@rand_seed = Random.new.seed
@rr = 0
@mutex = Mutex.new
end
def select_healthy_node
error = nil
# Don't care about the change of @weight_array's size while looping since
# it's only used for determining the number of loops and it is not so important.
wlen = @weight_array.size
wlen.times do
node = @mutex.synchronize do
r = @rr % @weight_array.size
@rr = (r + 1) % @weight_array.size
@weight_array[r]
end
next unless node.available?
begin
ret = yield node
return ret, node
rescue
# for load balancing during detecting crashed servers
error = $! # use the latest error
end
end
raise error if error
raise NoNodesAvailable, "no nodes are available"
end
def rebuild_weight_array(nodes)
standby_nodes, regular_nodes = nodes.partition {|n|
n.standby?
}
lost_weight = 0
regular_nodes.each {|n|
unless n.available?
lost_weight += n.weight
end
}
@log.debug("rebuilding weight array", lost_weight: lost_weight)
if lost_weight > 0
standby_nodes.each {|n|
if n.available?
regular_nodes << n
@log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight
lost_weight -= n.weight
break if lost_weight <= 0
end
}
end
weight_array = []
if regular_nodes.empty?
@log.warn('No nodes are available')
@mutex.synchronize do
@weight_array = weight_array
end
return @weight_array
end
gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
regular_nodes.each {|n|
(n.weight / gcd).times {
weight_array << n
}
}
# for load balancing during detecting crashed servers
coe = (regular_nodes.size * 6) / weight_array.size
weight_array *= coe if coe > 1
r = Random.new(@rand_seed)
weight_array.sort_by! { r.rand }
@mutex.synchronize do
@weight_array = weight_array
end
end
alias select_service select_healthy_node
alias rebalance rebuild_weight_array
end
end
end