-
Notifications
You must be signed in to change notification settings - Fork 0
/
async.go
388 lines (343 loc) · 17.1 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
package mongox
import (
"context"
"errors"
"sync"
"github.com/maxbolgarin/gorder"
"go.mongodb.org/mongo-driver/v2/mongo"
)
// DefaultAsyncRetries is the maximum number of retries for failed tasks in async mode.
const DefaultAsyncRetries = 10
// AsyncDatabase is a database client that handles operations asynchronously without waiting for them to complete.
// It is safe for concurrent use by multiple goroutines.
type AsyncDatabase struct {
db *Database
queue *gorder.Gorder[string]
log gorder.Logger
colls map[string]*AsyncCollection
mu sync.RWMutex
}
// Database returns the underlying Database.
func (m *AsyncDatabase) Database() *Database {
return m.db
}
// AsyncCollection returns an async collection object by name.
// It will create a new collection if it doesn't exist after first query.
func (m *AsyncDatabase) AsyncCollection(name string) *AsyncCollection {
m.mu.RLock()
coll, ok := m.colls[name]
m.mu.RUnlock()
if ok {
return coll
}
coll = &AsyncCollection{
coll: m.db.Collection(name),
queue: m.queue,
log: m.log,
}
m.mu.Lock()
m.colls[name] = coll
m.mu.Unlock()
return coll
}
// WithTransaction executes a transaction asynchronously.
// It will create a new session and execute a function inside a transaction.
// Warning! Transactions in MongoDB is available only for replica sets or Sharded Clusters, not for standalone servers.
func (m *AsyncDatabase) WithTransaction(queueKey, taskName string, fn func(ctx context.Context) error) {
if queueKey == "" {
queueKey = m.db.db.Name()
}
if taskName == "" {
taskName = m.db.db.Name() + "_transaction"
}
m.queue.Push(queueKey, taskName, func(ctx context.Context) error {
_, err := m.db.WithTransaction(ctx, func(ctx context.Context) (any, error) {
return nil, fn(ctx)
})
return err
})
}
// WithTask adds a function to execute it asynchronously.
// It won't handle errors like in collection method and will retry function in case of returning any error.
// If queue is empty, it will use the database name.
func (m *AsyncDatabase) WithTask(queueKey, taskName string, fn func(ctx context.Context) error) {
if queueKey == "" {
queueKey = m.db.db.Name()
}
if taskName == "" {
taskName = m.db.db.Name() + "_task"
}
m.queue.Push(queueKey, taskName, func(ctx context.Context) error {
return fn(ctx)
})
}
// AsyncCollection is a collection client that handles operations asynchronously without waiting for them to complete.
// It is safe for concurrent use by multiple goroutines.
// Tasks in different queues will be executed in parallel.
type AsyncCollection struct {
coll *Collection
queue *gorder.Gorder[string]
log gorder.Logger
}
// Insert inserts a document or many documents into the collection asynchronously without waiting.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrDuplicate, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) Insert(queueKey, taskName string, records ...any) {
ac.push(queueKey, taskName, "insert", func(ctx context.Context) error {
_, err := ac.coll.Insert(ctx, records...)
return err
})
}
// Upsert replaces a document in the collection or inserts it if it doesn't exist asynchronously without waiting.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) Upsert(queueKey, taskName string, record any, filter M) {
ac.push(queueKey, taskName, "upsert", func(ctx context.Context) error {
_, err := ac.coll.Upsert(ctx, record, filter)
return err
})
}
// ReplaceOne replaces a document in the collection asynchronously without waiting.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) ReplaceOne(queueKey, taskName string, record any, filter M) {
ac.push(queueKey, taskName, "replace", func(ctx context.Context) error {
return ac.coll.ReplaceOne(ctx, record, filter)
})
}
// SetFields sets fields in a document in the collection asynchronously without waiting.
// For example: {key1: value1, key2: value2} becomes {$set: {key1: value1, key2: value2}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) SetFields(queueKey, taskName string, filter, update M) {
ac.push(queueKey, taskName, "set_fields", func(ctx context.Context) error {
return ac.coll.SetFields(ctx, filter, update)
})
}
// UpdateOne updates a document in the collection asynchronously without waiting for it to complete.
// Update map/document must contain key beginning with '$', e.g. {$set: {key1: value1}}.
// Modifiers operate on fields. For example: {$mod: {<field>: ...}}.
// You can use predefined options from mongox, e.g. mongox.M{mongox.Inc: mongox.M{"number": 1}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) UpdateOne(queueKey, taskName string, filter, update M) {
ac.push(queueKey, taskName, "update_one", func(ctx context.Context) error {
return ac.coll.UpdateOne(ctx, filter, update)
})
}
// UpdateMany updates multi documents in the collection asynchronously without waiting for them to complete.
// Update map/document must contain key beginning with '$', e.g. {$set: {key1: value1}}.
// Modifiers operate on fields. For example: {$mod: {<field>: ...}}.
// You can use predefined options from mongox, e.g. mongox.M{mongox.Inc: mongox.M{"number": 1}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) UpdateMany(queueKey, taskName string, filter, update M) {
ac.push(queueKey, taskName, "update_many", func(ctx context.Context) error {
_, err := ac.coll.UpdateMany(ctx, filter, update)
return err
})
}
// UpdateOneFromDiff sets fields in a document in the collection using diff structure asynchronously without waiting for it to complete.
// Diff structure is a map of pointers to field names with their new values.
// E.g. if you have structure:
//
// type MyStruct struct {name string, index int}
//
// Diff structure will be:
//
// type MyStructDiff struct {name *string, index *int}
//
// It returns ErrNotFound if no document is updated.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) UpdateOneFromDiff(queueKey, taskName string, filter M, diff any) {
ac.push(queueKey, taskName, "update_from_diff", func(ctx context.Context) error {
return ac.coll.UpdateOneFromDiff(ctx, filter, diff)
})
}
// DeleteFields deletes fields in a document in the collection asynchronously without waiting for it to complete.
// For example: [key1, key2] becomes {$unset: {key1: "", key2: ""}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) DeleteFields(queueKey, taskName string, filter M, fields ...string) {
ac.push(queueKey, taskName, "delete_fields", func(ctx context.Context) error {
return ac.coll.DeleteFields(ctx, filter, fields...)
})
}
// DeleteOne deletes a document in the collection asynchronously without waiting for it to complete.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) DeleteOne(queueKey, taskName string, filter M) {
ac.push(queueKey, taskName, "delete_one", func(ctx context.Context) error {
return ac.coll.DeleteOne(ctx, filter)
})
}
// DeleteMany deletes multi documents in the collection asynchronously without waiting for them to complete.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) DeleteMany(queueKey, taskName string, filter M) {
ac.push(queueKey, taskName, "delete_many", func(ctx context.Context) error {
_, err := ac.coll.DeleteMany(ctx, filter)
return err
})
}
// BulkWrite executes bulk write operations in the collection asynchronously without waiting for them to complete.
// Use [BulkBuilder] to create models for bulk write operations.
// IsOrdered==true means that all operations are executed in the order they are added to the [BulkBuilder]
// and if any of them fails, the whole operation fails.
// IsOrdered==false means that all operations are executed in parallel and if any of them fails,
// the whole operation continues.
func (ac *AsyncCollection) BulkWrite(queueKey, taskName string, models []mongo.WriteModel, isOrdered bool) {
ac.push(queueKey, taskName, "bulk_write", func(ctx context.Context) error {
_, err := ac.coll.BulkWrite(ctx, models, isOrdered)
return err
})
}
func (ac *AsyncCollection) push(queueKey, taskName, opName string, f gorder.TaskFunc) {
if queueKey == "" {
queueKey = ac.coll.coll.Name()
}
if taskName == "" {
taskName = ac.coll.coll.Name() + "_" + opName
}
ac.queue.Push(queueKey, taskName, func(ctx context.Context) error {
return ac.HandleRetryError(f(ctx), taskName)
})
}
func (ac *AsyncCollection) HandleRetryError(err error, taskName string) error {
if err == nil {
return nil
}
switch {
case errors.Is(err, ErrNotFound):
// ErrNotFound is read error, it doesn't change state of the document and it can be throwed
ac.log.Error("document not found", "error", err, "collection", ac.coll.coll.Name(), "task", taskName, "flow", "async")
return nil
case errors.Is(err, ErrDuplicate):
// ErrDuplicate is a persistent error, there is no sense to retry it
ac.log.Error("duplicate", "error", err, "collection", ac.coll.coll.Name(), "task", taskName, "flow", "async")
return nil
case errors.Is(err, ErrInvalidArgument) ||
errors.Is(err, ErrBadValue) ||
errors.Is(err, ErrIndexNotFound) ||
errors.Is(err, ErrFailedToParse) ||
errors.Is(err, ErrTypeMismatch) ||
errors.Is(err, ErrIllegalOperation):
// ErrInvalidArgument means error with using mongo interface
// It is a persistent error and there is no sense to retry
ac.log.Error("invalid argument", "error", err, "collection", ac.coll.coll.Name(), "task", taskName, "flow", "async")
return nil
default: // network, timeout, server and other errors should be retried
return err
}
}
// QueueCollection is a async collection with predefined queue key.
type QueueCollection struct {
*AsyncCollection
name string
}
// QueueCollection returns a new QueueCollection.
func (qc *AsyncCollection) QueueCollection(name string) *QueueCollection {
return &QueueCollection{AsyncCollection: qc, name: name}
}
// Queue returns the queue key.
func (qc *QueueCollection) Queue() string {
return qc.name
}
// Insert inserts a document or many documents into the collection asynchronously without waiting.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrDuplicate, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) Insert(records ...any) {
qc.AsyncCollection.Insert(qc.name, "", records...)
}
// Upsert replaces a document in the collection or inserts it if it doesn't exist asynchronously without waiting.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) Upsert(record any, filter M) {
qc.AsyncCollection.Upsert(qc.name, "", record, filter)
}
// ReplaceOne replaces a document in the collection asynchronously without waiting.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) ReplaceOne(record any, filter M) {
qc.AsyncCollection.ReplaceOne(qc.name, "", record, filter)
}
// SetFields sets fields in a document in the collection asynchronously without waiting.
// For example: {key1: value1, key2: value2} becomes {$set: {key1: value1, key2: value2}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) SetFields(filter, update M) {
qc.AsyncCollection.SetFields(qc.name, "", filter, update)
}
// UpdateOne updates a document in the collection asynchronously without waiting for it to complete.
// Update map/document must contain key beginning with '$', e.g. {$set: {key1: value1}}.
// Modifiers operate on fields. For example: {$mod: {<field>: ...}}.
// You can use predefined options from mongox, e.g. mongox.M{mongox.Inc: mongox.M{"number": 1}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) UpdateOne(filter, update M) {
qc.AsyncCollection.UpdateOne(qc.name, "", filter, update)
}
// UpdateMany updates multi documents in the collection asynchronously without waiting for them to complete.
// Update map/document must contain key beginning with '$', e.g. {$set: {key1: value1}}.
// Modifiers operate on fields. For example: {$mod: {<field>: ...}}.
// You can use predefined options from mongox, e.g. mongox.M{mongox.Inc: mongox.M{"number": 1}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) UpdateMany(filter, update M) {
qc.AsyncCollection.UpdateMany(qc.name, "", filter, update)
}
// UpdateOneFromDiff sets fields in a document in the collection using diff structure asynchronously without waiting for it to complete.
// Diff structure is a map of pointers to field names with their new values.
// E.g. if you have structure:
//
// type MyStruct struct {name string, index int}
//
// Diff structure will be:
//
// type MyStructDiff struct {name *string, index *int}
//
// It returns ErrNotFound if no document is updated.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) UpdateOneFromDiff(filter M, diff any) {
qc.AsyncCollection.UpdateOneFromDiff(qc.name, "", filter, diff)
}
// DeleteFields deletes fields in a document in the collection asynchronously without waiting for it to complete.
// For example: [key1, key2] becomes {$unset: {key1: "", key2: ""}}.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) DeleteFields(filter M, fields ...string) {
qc.AsyncCollection.DeleteFields(qc.name, "", filter, fields...)
}
// DeleteOne deletes a document in the collection asynchronously without waiting for it to complete.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) DeleteOne(filter M) {
qc.AsyncCollection.DeleteOne(qc.name, "", filter)
}
// DeleteMany deletes multi documents in the collection asynchronously without waiting for them to complete.
// It start retrying in case of error for DefaultAsyncRetries times.
// It filters errors and won't retry in case of ErrNotFound, ErrInvalidArgument and some other errors.
func (qc *QueueCollection) DeleteMany(filter M) {
qc.AsyncCollection.DeleteMany(qc.name, "", filter)
}
// BulkWrite executes bulk write operations in the collection asynchronously without waiting for them to complete.
// Use [BulkBuilder] to create models for bulk write operations.
// IsOrdered==true means that all operations are executed in the order they are added to the [BulkBuilder]
// and if any of them fails, the whole operation fails.
// IsOrdered==false means that all operations are executed in parallel and if any of them fails,
// the whole operation continues.
func (qc *QueueCollection) BulkWrite(models []mongo.WriteModel, isOrdered bool) {
qc.AsyncCollection.BulkWrite(qc.name, "", models, isOrdered)
}