-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathsveta.lua
298 lines (271 loc) · 9.88 KB
/
sveta.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
-- sveta.lua
-- implements aggregation of abstract data by update date and number of updates
-- grouping by id and data
-- allows desc sorting by date or by (2_week_count + total_count),
-- where 2_week_count is number of updates for last two weeks,
-- this updates automatically by fiber every 24 hours
--
--
-- space[4].enabled = 1
-- space[4].index[0].type = "HASH"
-- space[4].index[0].unique = 1
-- space[4].index[0].key_field[0].fieldno = 0
-- space[4].index[0].key_field[0].type = "NUM"
-- space[4].index[0].key_field[1].fieldno = 1
-- space[4].index[0].key_field[1].type = "STR"
-- space[4].index[1].type = "AVLTREE"
-- space[4].index[1].unique = 0
-- space[4].index[1].key_field[0].fieldno = 0
-- space[4].index[1].key_field[0].type = "NUM"
-- space[4].index[1].key_field[1].fieldno = 3
-- space[4].index[1].key_field[1].type = "NUM"
-- space[4].index[2].type = "AVLTREE"
-- space[4].index[2].unique = 0
-- space[4].index[2].key_field[0].fieldno = 0
-- space[4].index[2].key_field[0].type = "NUM"
-- space[4].index[2].key_field[1].fieldno = 4
-- space[4].index[2].key_field[1].type = "NUM"
-- space[4].index[2].key_field[2].fieldno = 2
-- space[4].index[2].key_field[2].type = "NUM"
-- space[4].index[3].type = "AVLTREE"
-- space[4].index[3].unique = 1
-- space[4].index[3].key_field[0].fieldno = 3
-- space[4].index[3].key_field[0].type = "NUM"
-- space[4].index[3].key_field[1].fieldno = 0
-- space[4].index[3].key_field[1].type = "NUM"
-- space[4].index[3].key_field[2].fieldno = 1
-- space[4].index[3].key_field[2].type = "STR"
--
-- tuple structure:
-- uid, query, total_count, last_ts, 2_week_count, latest_counter(today), ... , earliest_counter(2 weeks ago)
local space_no = 4
local delete_chunk = 100
local max_query_size = 1024
local seconds_per_day = 86400
local two_weeks = 14
local index_of_user_id = 0
local index_of_query = 1
local index_of_total_counter = 2
local index_of_last_ts = 3
local index_of_2_week_counter = 4
local index_of_latest_counter = 5
local index_of_earliest_counter = 18
local old_query_days = 60
local timeout = 0.005
local max_attempts = 5
local n_fibers = 10
local move_channel = box.ipc.channel(n_fibers)
local delete_channel = box.ipc.channel(n_fibers)
function add_query(user_id, query)
if string.len(query) > max_query_size then
error("too long query")
end
uid = box.unpack('i', user_id)
local tuple = box.update(space_no, {uid, query}, "+p=p+p+p", index_of_total_counter, 1, index_of_last_ts, box.time(), index_of_2_week_counter, 1, index_of_latest_counter, 1)
if tuple == nil then
local new_tuple = {}
new_tuple[index_of_user_id] = uid
new_tuple[index_of_query] = query
new_tuple[index_of_total_counter] = 1
new_tuple[index_of_last_ts] = box.time()
new_tuple[index_of_2_week_counter] = 1
new_tuple[index_of_latest_counter] = 1
for i=index_of_latest_counter+1,index_of_earliest_counter do
new_tuple[i] = 0
end
return box.insert(space_no, new_tuple)
end
return tuple
end
function add_old_query(user_id, query, timestamp)
if string.len(query) > max_query_size then
error("too long query")
end
local uid = box.unpack('i', user_id)
local ts = box.unpack('i', timestamp)
if ts > box.time() then
error("unable to add query in future")
end
local days_ago = math.ceil(box.time()/seconds_per_day) - math.ceil(ts/seconds_per_day)
local tuple = box.select(space_no, 0, uid, query)
if tuple == nil then
local new_tuple = {}
new_tuple[index_of_user_id] = uid
new_tuple[index_of_query] = query
new_tuple[index_of_total_counter] = 1
new_tuple[index_of_last_ts] = ts
for i=index_of_latest_counter,index_of_earliest_counter do
new_tuple[i] = 0
end
if( days_ago < two_weeks ) then
new_tuple[index_of_2_week_counter] = 1
new_tuple[index_of_latest_counter + days_ago] = 1
end
return box.insert(space_no, new_tuple)
else
new_ts = math.max(ts, box.unpack('i', tuple[index_of_last_ts]))
if( days_ago < two_weeks ) then
return box.update(space_no, {uid, query}, "+p=p+p+p", index_of_total_counter, 1,index_of_last_ts, new_ts,
index_of_2_week_counter, 1, index_of_latest_counter + days_ago, 1)
else
return box.update(space_no, {uid, query}, "+p=p", index_of_total_counter, 1, index_of_last_ts, new_ts)
end
end
end
function delete_query(user_id, query)
return box.delete(space_no, box.unpack('i', user_id), query)
end
function delete_all(user_id)
uid = box.unpack('i', user_id)
while true do
-- delete by chuncks of dalete_chunk len
local tuples = {box.select_limit(space_no, 1, 0, delete_chunk, uid)}
if( #tuples == 0 ) then
break
end
for _, tuple in ipairs(tuples) do
box.delete(space_no, uid, tuple[1])
end
end
end
local function select_queries_by_index(uid, limit, index)
local resp = {}
local i = 0
for tuple in box.space[space_no].index[index]:iterator(box.index.REQ, uid) do
if i == limit then break end
table.insert(resp, {tuple[index_of_user_id], tuple[index_of_query], tuple[index_of_total_counter], tuple[index_of_last_ts], tuple[index_of_2_week_counter]})
i = i + 1
end
return unpack(resp)
end
function select_recent(user_id, limit)
local uid = box.unpack('i', user_id)
local lim = box.unpack('i', limit)
return select_queries_by_index(uid, lim, 1)
end
function select_2_week_popular(user_id, limit)
local uid = box.unpack('i', user_id)
local lim = box.unpack('i', limit)
return select_queries_by_index(uid, lim, 2)
end
local function move_counters(tuple)
local new_tuple = {}
for i = index_of_user_id,index_of_2_week_counter do
new_tuple[i] = tuple[i]
end
new_tuple[index_of_latest_counter] = box.pack('i', 0)
for i=index_of_latest_counter+1,index_of_earliest_counter do
new_tuple[i] = tuple[i - 1]
end
new_tuple[index_of_2_week_counter] = box.pack('i', box.unpack('i', new_tuple[index_of_2_week_counter]) - box.unpack('i', tuple[index_of_earliest_counter]))
return box.replace(space_no, new_tuple)
end
local function move_counters_fiber()
local tpl = move_channel:get()
while true do
local count = 0
local status, result = pcall(move_counters, tpl)
if status then
--success
tpl = move_channel:get()
else
--exception
count = count + 1
if count == max_attempts then
print('max attempts reached for moving counters for user ', tpl[0], ' query ', tpl[1])
tpl = move_channel:get()
else
box.fiber.sleep(timeout)
end
end
end
end
local function delete_old_queries_fiber()
local tpl = delete_channel:get()
while true do
local count = 0
local status, result = pcall(box.delete, space_no, tpl[0], tpl[1])
if status then
--success
tpl = delete_channel:get()
else
--exception
count = count + 1
if count == max_attempts then
print('max attempts reached for deleting query for user ', tpl[0], ' query ', tpl[1])
tpl = delete_channel:get()
else
box.fiber.sleep(timeout)
end
end
end
end
local function move_all_counters()
local n = 0
print('start moving counters')
local start_time = box.time()
local tuples = {box.select_range(space_no, 3, n_fibers, box.time() - (two_weeks + 1) * seconds_per_day)}
local last_tuple = nil
while true do
if #tuples == 0 then
break
end
for _, t in pairs(tuples) do
move_channel:put(t)
n = n + 1
if n % 1000 == 0 then
box.fiber.sleep(0)
end
last_tuple = t
end
tuples = {box.select_range(space_no, 3, n_fibers, last_tuple[index_of_last_ts], last_tuple[index_of_user_id], last_tuple[index_of_query])}
tuples[1] = nil
end
print('finish moving counters. elapsed ', box.time() - start_time, ' seconds moved ', n, ' tuples')
end
local function delete_old_queries()
local n = 0
print('start delete old queries')
local start_time = box.time()
local tuples = {box.select_reverse_range(space_no, 3, n_fibers, box.time() - old_query_days * seconds_per_day)}
local last_tuple = nil
while true do
if #tuples == 0 then
break
end
for _, t in pairs(tuples) do
delete_channel:put(t)
n = n + 1
if n % 1000 == 0 then
box.fiber.sleep(0)
end
last_tuple = t
end
tuples = {box.select_reverse_range(space_no, 3, n_fibers, last_tuple[index_of_last_ts], last_tuple[index_of_user_id], last_tuple[index_of_query])}
end
print('finish delete old queries. elapsed ', box.time() - start_time, ' seconds, ', n, ' delete requests')
end
local function move_and_delete_fiber()
while true do
local time = box.time()
local sleep_time = math.ceil(time/seconds_per_day)*seconds_per_day - time + 1
print('move_and_delete_fiber: sleep for ', sleep_time, ' seconds')
box.fiber.sleep(sleep_time)
move_all_counters()
delete_old_queries()
end
end
if sveta_started_fibers ~= nil then
for _, fid in pairs(sveta_started_fibers) do
box.fiber.kill(fid)
end
end
sveta_started_fibers = {}
local fiber = box.fiber.wrap(move_and_delete_fiber)
table.insert(sveta_started_fibers, box.fiber.id(fiber))
for i = 1, n_fibers do
fiber = box.fiber.wrap(move_counters_fiber)
table.insert(sveta_started_fibers, box.fiber.id(fiber))
fiber = box.fiber.wrap(delete_old_queries_fiber)
table.insert(sveta_started_fibers, box.fiber.id(fiber))
end