Skip to content

Commit 2d5115b

Browse files
danilapogagolybev
authored andcommitted
Add reserved backends table processing (#7)
Co-authored-by: danilapog <[email protected]> Co-committed-by: danilapog <[email protected]>
1 parent 5615a1b commit 2d5115b

9 files changed

+306
-21
lines changed

Dockerfile.balancer

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ RUN apt-get update -y && \
2222
rm -f /etc/nginx/conf.d/default.conf \
2323
/usr/local/openresty/nginx/conf/nginx.conf
2424

25-
COPY scripts/ds-ep-observer.py /ds_ep_observer/ds-ep-observer.py
25+
COPY scripts/ds-ep-observer.py \
26+
scripts/ds-pod-observer.py \
27+
/ds_ep_observer/
2628
COPY config/balancer/conf.d/balancer-server.conf /etc/nginx/conf.d/
2729
COPY config/balancer/conf.d/handler-server.conf /etc/nginx/conf.d/
2830
COPY config/balancer/nginx.conf /usr/local/openresty/nginx/conf/
2931
COPY config/balancer/lua/configuration.lua /etc/nginx/lua/
32+
COPY config/balancer/lua/docs_balancer.lua /etc/nginx/lua/
3033
COPY --chmod=755 balancer-docker-entrypoint.py /docker_entrypoint.py
3134

3235
EXPOSE 80 443

balancer-docker-entrypoint.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ def running_services():
3939
try:
4040
running_nginx = ["/usr/local/openresty/bin/openresty", "-g", "daemon off;"]
4141
running_get_ds_ep = ["python3", "/ds_ep_observer/ds-ep-observer.py"]
42-
all_cmd = [running_nginx, running_get_ds_ep]
42+
running_get_ds_pod = ["python3", "/ds_ep_observer/ds-pod-observer.py"]
43+
all_cmd = [running_nginx, running_get_ds_ep, running_get_ds_pod]
4344
for cmd in all_cmd:
4445
cmd_process = subprocess.Popen(cmd)
4546
logger_endpoints_ds.info(f'The "{cmd_process.pid}" process has been running')

config/balancer/conf.d/balancer-server.conf

+9-11
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ server {
55
listen [::]:80 default_server reuseport backlog=4096 ;
66

77
# liveness/readness/readiness probes location
8-
location /healthcheck {
8+
location /balancer-healthcheck {
99
return 200;
1010
}
1111

@@ -67,20 +67,18 @@ server {
6767
proxy_cookie_domain off;
6868
proxy_cookie_path off;
6969

70-
# In case of errors try the next upstream server before returning an error
71-
proxy_next_upstream error timeout;
72-
proxy_next_upstream_timeout 0;
73-
proxy_next_upstream_tries 3;
74-
7570
proxy_cache docs_cache;
7671

7772
add_header X-Cache-Status $upstream_cache_status;
7873

7974
include /etc/nginx/mnt_config/balancer-lua.conf;
80-
81-
error_page 500 502 503 504 /50x.html;
82-
location = /50x.html {
83-
root /usr/local/openresty/nginx/html;
84-
}
75+
76+
proxy_next_upstream error timeout;
77+
proxy_next_upstream_timeout 0;
78+
proxy_next_upstream_tries 3;
79+
80+
proxy_pass http://main_balancer;
81+
82+
proxy_redirect off;
8583
}
8684
}

config/balancer/conf.d/handler-server.conf

+12-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,18 @@ server {
1515

1616
content_by_lua_block {
1717
local configuration = require("configuration")
18-
configuration.handle_endpoints()
18+
configuration.handle()
19+
}
20+
}
21+
22+
location /configuration_reserved {
23+
client_max_body_size 21M;
24+
client_body_buffer_size 21M;
25+
proxy_buffering off;
26+
27+
content_by_lua_block {
28+
local configuration = require("configuration")
29+
configuration.handle()
1930
}
2031
}
2132
}

config/balancer/lua/configuration.lua

+35-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local endpoints_data = ngx.shared.endpoints_data
2+
local reserved_data = ngx.shared.reserved_data
23
local cjson = require("cjson.safe")
34

45
local _M = {}
@@ -19,10 +20,24 @@ function _M.get_backends_data()
1920
return endpoints_data:get("backends")
2021
end
2122

22-
function _M.handle_endpoints()
23-
local value = endpoints_data:get("backends")
23+
function _M.get_reserved_data()
24+
return reserved_data:get("backends")
25+
end
26+
27+
local function handle_endpoints(type)
28+
local dict
29+
local dict_str
2430

25-
print("[BALANCER.HANDLER]: New endpoints update request income")
31+
if type == "live" then
32+
dict = ngx.shared.endpoints_data
33+
dict_str = "LIVE_ENDPOINTS"
34+
else if type == "reserved" then
35+
dict = ngx.shared.reserved_data
36+
dict_str = "RESERVED_ENDPOINTS"
37+
end
38+
end
39+
40+
print(string.format("[BALANCER.HANDLER]: New endpoints update request income. Used dict: %s", dict_str))
2641
print(ngx.var.request_method)
2742
if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
2843
ngx.status = ngx.HTTP_BAD_REQUEST
@@ -41,16 +56,16 @@ function _M.handle_endpoints()
4156
local none_status = check_none(endpoints)
4257

4358
if none_status then
44-
print("[BALANCER.HANDLER]: Empty endpoint table is come, seting backends to none")
59+
print(string.format("[BALANCER.HANDLER]: Empty endpoint table is come, seting backends to none in dict: %s", dict_str))
4560
local none_endpoints = '[{"address": "none"}]'
46-
local success, err = endpoints_data:set("backends", none_endpoints)
61+
local success, err = dict:set("backends", none_endpoints)
4762
if not success then
4863
ngx.log(ngx.ERR, "[BALANCER.HANDLER]: dynamic-configuration: error updating configuration: " .. tostring(err))
4964
ngx.status = ngx.HTTP_BAD_REQUEST
5065
return
5166
end
5267
else
53-
local success, err = endpoints_data:set("backends", endpoints)
68+
local success, err = dict:set("backends", endpoints)
5469
if not success then
5570
ngx.log(ngx.ERR, "[BALANCER.HANDLER]: dynamic-configuration: error updating configuration: " .. tostring(err))
5671
ngx.status = ngx.HTTP_BAD_REQUEST
@@ -59,6 +74,20 @@ function _M.handle_endpoints()
5974
end
6075

6176
ngx.status = ngx.HTTP_CREATED
77+
end
78+
79+
function _M.handle()
80+
if ngx.var.request_uri == "/configuration" then
81+
local type = "live"
82+
handle_endpoints(type)
83+
return
84+
end
85+
86+
if ngx.var.request_uri == "/configuration_reserved" then
87+
local type = "reserved"
88+
handle_endpoints(type)
89+
return
90+
end
6291
end
6392

6493
return _M

config/balancer/lua/docs_balancer.lua

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
local _M = {}
2+
function _M.balance_ep()
3+
local configuration = require("configuration")
4+
local cjson = require("cjson.safe")
5+
local ngx_balancer = require("ngx.balancer")
6+
local data
7+
local random_endpoint
8+
local request_uri = ngx.var.request_uri
9+
local ver = request_uri:match("/([%d%.%-]+%-[^/]+)/")
10+
local wopisrc = ngx.var.arg_WOPISrc
11+
local shardkey = ngx.var.arg_shardkey
12+
13+
local rr_live_dict = ngx.shared.rr_live_index
14+
local rr_reserved_dict = ngx.shared.rr_reserved_index
15+
16+
local api_key
17+
18+
if wopisrc then
19+
api_key = wopisrc
20+
end
21+
if shardkey then
22+
api_key = shardkey
23+
end
24+
25+
repeat
26+
data = configuration.get_backends_data()
27+
print(data)
28+
local decoded_table = cjson.decode(data)
29+
print(tostring(decoded_table))
30+
local address = decoded_table[1].address
31+
print(cjson.encode(address))
32+
if address == "none" then
33+
ngx.sleep(1)
34+
print("No active shards found, waiting...")
35+
end
36+
until address ~= "none"
37+
local decoded_data = cjson.decode(data)
38+
local matching_addresses = {}
39+
40+
if not api_key and not ver then
41+
for _, entry in ipairs(decoded_data) do
42+
table.insert(matching_addresses, entry.address .. ":" .. entry.port)
43+
end
44+
end
45+
if api_key then
46+
for _, entry in ipairs(decoded_data) do
47+
table.insert(matching_addresses, entry.address .. ":" .. entry.port)
48+
end
49+
else
50+
if ver then
51+
-- Iterate through the decoded table
52+
for _, entry in ipairs(decoded_data) do
53+
if entry.ver == ver then
54+
table.insert(matching_addresses, entry.address .. ":" .. entry.port)
55+
end
56+
end
57+
end
58+
end
59+
60+
if ver and next(matching_addresses) == nil then
61+
print(string.format("WARN: Can't find endpoint in live table. VER: %s", ver))
62+
local reserved_data
63+
repeat
64+
reserved_data = configuration.get_reserved_data()
65+
print(string.format("RESERVED_DATA:%s", reserved_data))
66+
local decoded_reserved_table = cjson.decode(reserved_data)
67+
print(tostring(decoded_reserved_table))
68+
local address = decoded_reserved_table[1].address
69+
print(cjson.encode(address))
70+
if address == "none" then
71+
ngx.sleep(1)
72+
print("No active shards found, waiting...")
73+
end
74+
until address ~= "none"
75+
local reserved_decoded_data = cjson.decode(reserved_data)
76+
local reserved_addresses = {}
77+
for _, entry in ipairs(reserved_decoded_data) do
78+
if entry.ver == ver then
79+
table.insert(reserved_addresses, entry.address .. ":" .. entry.port)
80+
end
81+
end
82+
83+
local idx = rr_reserved_dict:get("last_used_index") or 1
84+
85+
random_endpoint = reserved_addresses[idx]
86+
87+
-- Update to the next index for the next request
88+
idx = idx + 1
89+
if idx > #matching_addresses then
90+
idx = 1
91+
end
92+
93+
rr_reserved_dict:set("last_used_index", idx)
94+
else
95+
local idx = rr_live_dict:get("last_used_index") or 1
96+
97+
random_endpoint = matching_addresses[idx]
98+
99+
-- Update to the next index for the next request
100+
idx = idx + 1
101+
if idx > #matching_addresses then
102+
idx = 1
103+
end
104+
105+
rr_live_dict:set("last_used_index", idx)
106+
end
107+
108+
if api_key then
109+
return random_endpoint
110+
else
111+
ngx_balancer.set_more_tries(1)
112+
113+
local ok, err = ngx_balancer.set_current_peer(random_endpoint)
114+
if not ok then
115+
ngx.log(ngx.ERR, "error while setting current upstream peer ", peer,
116+
": ", err)
117+
end
118+
end
119+
end
120+
return _M

config/balancer/nginx.conf

+37
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
worker_processes 1;
22

3+
worker_rlimit_nofile 1047552;
4+
35
# Enables the use of JIT for regular expressions to speed-up their processing.
46
pcre_jit on;
57

@@ -16,9 +18,29 @@ events {
1618
}
1719

1820
http {
21+
init_by_lua_block {
22+
ok, res = pcall(require, "configuration")
23+
if not ok then
24+
error("require failed: " .. tostring(res))
25+
else
26+
configuration = res
27+
end
28+
ok, res = pcall(require, "docs_balancer")
29+
if not ok then
30+
error("require failed: " .. tostring(res))
31+
else
32+
docs_balancer = res
33+
end
34+
}
1935
lua_package_path "/etc/nginx/lua/?.lua;;";
2036
# dict where backends will be stored
2137
lua_shared_dict endpoints_data 20M;
38+
# dict where reserved backends will be stored
39+
lua_shared_dict reserved_data 20M;
40+
# dict where live backends index is stored
41+
lua_shared_dict rr_live_index 10M;
42+
# dict where reserved index is stored
43+
lua_shared_dict rr_reserved_index 10M;
2244

2345
include mime.types;
2446
default_type application/octet-stream;
@@ -47,6 +69,21 @@ http {
4769
"" $request_id;
4870
}
4971

72+
upstream main_balancer {
73+
74+
server 0.0.0.1; # placeholder
75+
76+
balancer_by_lua_block {
77+
docs_balancer.balance_ep()
78+
}
79+
80+
keepalive 320;
81+
keepalive_time 1h;
82+
keepalive_timeout 60s;
83+
keepalive_requests 10000;
84+
85+
}
86+
5087
resolver local=on;
5188

5289
log_subrequest on;

scripts/ds-ep-observer.py

-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ def get_ep_list(ep_ds, ep_port):
5858
total_result = {}
5959
for ep_ip in ep_ds:
6060
try:
61-
url = ep_ip.ip + ':' + ep_port
6261
pod_name = ep_ip.target_ref.name
6362
ver_ds = read_pod_annotation(pod_name)
6463
total_result['address'] = ep_ip.ip

0 commit comments

Comments
 (0)