Skip to content

Commit 9498a4f

Browse files
Merge #261
261: [Feature] [add/update]DocumentsInBatches r=brunoocasali a=ahmednfwela Fixes #95 - [x] Tests added Co-authored-by: Ahmed Fwela <[email protected]>
2 parents 505355f + 210bc05 commit 9498a4f

File tree

6 files changed

+163
-4
lines changed

6 files changed

+163
-4
lines changed

lib/src/index.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,29 @@ abstract class MeiliSearchIndex {
6262
String? primaryKey,
6363
});
6464

65+
/// Add a list of documents in batches of size [batchSize] by given [documents] and optional [primaryKey] parameter.
66+
/// If the index does not exist try to create a new index and add documents.
67+
Future<List<Task>> addDocumentsInBatches(
68+
List<Map<String, Object?>> documents, {
69+
int batchSize = 1000,
70+
String? primaryKey,
71+
});
72+
6573
/// Add a list of documents or update them if they already exist by given [documents] and optional [primaryKey] parameter.
6674
/// If index is not exists tries to create a new index and adds documents.
6775
Future<Task> updateDocuments(
6876
List<Map<String, Object?>> documents, {
6977
String? primaryKey,
7078
});
7179

80+
/// Add a list of documents or update them if they already exist in batches of size [batchSize] by given [documents] and optional [primaryKey] parameter.
81+
/// If index is not exists tries to create a new index and adds documents.
82+
Future<List<Task>> updateDocumentsInBatches(
83+
List<Map<String, Object?>> documents, {
84+
int batchSize = 1000,
85+
String? primaryKey,
86+
});
87+
7288
/// Delete one document by given [id].
7389
Future<Task> deleteDocument(Object id);
7490

lib/src/index_impl.dart

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import 'package:meilisearch/src/query_parameters/tasks_query.dart';
44
import 'package:meilisearch/src/result.dart';
55
import 'package:meilisearch/src/searchable.dart';
66
import 'package:meilisearch/src/tasks_results.dart';
7-
7+
import 'package:collection/collection.dart';
88
import 'client.dart';
99
import 'filter_builder/filter_builder_base.dart';
1010
import 'index.dart';
@@ -448,4 +448,28 @@ class MeiliSearchIndexImpl implements MeiliSearchIndex {
448448
Future<Task> getTask(int uid) async {
449449
return await client.getTask(uid);
450450
}
451+
452+
@override
453+
Future<List<Task>> addDocumentsInBatches(
454+
List<Map<String, Object?>> documents, {
455+
int batchSize = 1000,
456+
String? primaryKey,
457+
}) =>
458+
Future.wait(
459+
documents
460+
.slices(batchSize)
461+
.map((slice) => addDocuments(slice, primaryKey: primaryKey)),
462+
);
463+
464+
@override
465+
Future<List<Task>> updateDocumentsInBatches(
466+
List<Map<String, Object?>> documents, {
467+
int batchSize = 1000,
468+
String? primaryKey,
469+
}) =>
470+
Future.wait(
471+
documents
472+
.slices(batchSize)
473+
.map((slice) => updateDocuments(slice, primaryKey: primaryKey)),
474+
);
451475
}

test/documents_test.dart

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import 'package:meilisearch/meilisearch.dart';
22
import 'package:test/test.dart';
3-
43
import 'utils/books_data.dart';
54
import 'utils/wait_for.dart';
65
import 'utils/client.dart';
@@ -17,6 +16,23 @@ void main() {
1716
expect(docs.total, books.length);
1817
});
1918

19+
test('Add documents in batches', () async {
20+
final index = client.index(randomUid());
21+
const batchSize = 10;
22+
const totalCount = (batchSize * 4) + 1;
23+
const chunks = 5;
24+
25+
final tasks = await index.addDocumentsInBatches(
26+
dynamicBooks(totalCount),
27+
batchSize: batchSize,
28+
);
29+
30+
expect(tasks.length, chunks);
31+
await tasks.waitFor(client: client, timeout: Duration(seconds: 30));
32+
final docs = await index.getDocuments();
33+
expect(docs.total, totalCount);
34+
});
35+
2036
test('Add documents with primary key', () async {
2137
final index = client.index(randomUid());
2238
await index
@@ -37,6 +53,33 @@ void main() {
3753
expect(doc?['title'], equals('The Hobbit 2'));
3854
});
3955

56+
test('Update documents in batches', () async {
57+
const batchSize = 10;
58+
const chunks = 3;
59+
const totalCount = (batchSize * 2) + 1;
60+
final index = await createDynamicBooksIndex(count: totalCount);
61+
62+
final tasks = await index.updateDocumentsInBatches(
63+
List.generate(
64+
totalCount,
65+
(index) => {
66+
'book_id': index,
67+
'title': 'Updated Book $index',
68+
},
69+
),
70+
batchSize: batchSize,
71+
);
72+
73+
expect(tasks.length, chunks);
74+
await tasks.waitFor(client: client, timeout: Duration(seconds: 30));
75+
final docs = await index.getDocuments();
76+
expect(docs.total, totalCount);
77+
docs.results.map((element) {
78+
final bookId = element['book_id'];
79+
expect(element['title'], equals('Updated Book $bookId'));
80+
});
81+
});
82+
4083
test('Update documents and pass a primary key', () async {
4184
final uid = randomUid();
4285
var index = client.index(uid);

test/utils/books.dart

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,21 @@ import 'books_data.dart';
44
import 'client.dart';
55
import 'wait_for.dart';
66

7+
Future<MeiliSearchIndex> createDynamicBooksIndex({
8+
String? uid,
9+
required int count,
10+
}) async {
11+
final index = client.index(uid ?? randomUid());
12+
final docs = dynamicBooks(count);
13+
final response = await index.addDocuments(docs).waitFor(client: client);
14+
15+
if (response.status != 'succeeded') {
16+
throw Exception(
17+
'Impossible to process test suite, the documents were not added into the index.');
18+
}
19+
return index;
20+
}
21+
722
Future<MeiliSearchIndex> createBooksIndex({String? uid}) async {
823
return _createIndex(uid: uid);
924
}
@@ -12,8 +27,10 @@ Future<MeiliSearchIndex> createNestedBooksIndex({String? uid}) async {
1227
return _createIndex(uid: uid, isNested: true);
1328
}
1429

15-
Future<MeiliSearchIndex> _createIndex(
16-
{String? uid, bool isNested = false}) async {
30+
Future<MeiliSearchIndex> _createIndex({
31+
String? uid,
32+
bool isNested = false,
33+
}) async {
1734
final index = client.index(uid ?? randomUid());
1835
final docs = isNested ? nestedBooks : books;
1936
final response = await index.addDocuments(docs).waitFor(client: client);

test/utils/books_data.dart

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
List<Map<String, Object?>> dynamicBooks(int count) {
2+
final tags = List.generate(4, (index) => "Tag $index");
3+
return List.generate(
4+
count,
5+
(index) => {
6+
'book_id': index,
7+
'title': 'Book $index',
8+
'tag': tags[index % tags.length],
9+
},
10+
);
11+
}
12+
113
final books = [
214
{'book_id': 123, 'title': 'Pride and Prejudice', 'tag': 'Romance'},
315
{'book_id': 456, 'title': 'Le Petit Prince', 'tag': 'Tale'},

test/utils/wait_for.dart

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'package:meilisearch/meilisearch.dart';
2+
import 'package:collection/collection.dart';
23

34
extension TaskWaiter on Task {
45
Future<Task> waitFor({
@@ -22,6 +23,41 @@ extension TaskWaiter on Task {
2223
}
2324
}
2425

26+
extension TaskWaiterForLists on Iterable<Task> {
27+
Future<List<Task>> waitFor({
28+
required MeiliSearchClient client,
29+
Duration timeout = const Duration(seconds: 20),
30+
Duration interval = const Duration(milliseconds: 50),
31+
}) async {
32+
final endingTime = DateTime.now().add(timeout);
33+
final originalUids = toList();
34+
final remainingUids = map((e) => e.uid).whereNotNull().toList();
35+
final completedTasks = <int, Task>{};
36+
final statuses = ['enqueued', 'processing'];
37+
38+
while (DateTime.now().isBefore(endingTime)) {
39+
var taskRes =
40+
await client.getTasks(params: TasksQuery(uids: remainingUids));
41+
final tasks = taskRes.results;
42+
final completed = tasks.where((e) => !statuses.contains(e.status));
43+
44+
completedTasks.addEntries(completed.map((e) => MapEntry(e.uid!, e)));
45+
remainingUids
46+
.removeWhere((element) => completedTasks.containsKey(element));
47+
48+
if (remainingUids.isEmpty) {
49+
return originalUids
50+
.map((e) => completedTasks[e.uid])
51+
.whereNotNull()
52+
.toList();
53+
}
54+
await Future<void>.delayed(interval);
55+
}
56+
57+
throw Exception('The tasks $originalUids timed out.');
58+
}
59+
}
60+
2561
extension TaskWaiterForFutures on Future<Task> {
2662
Future<Task> waitFor({
2763
required MeiliSearchClient client,
@@ -32,3 +68,14 @@ extension TaskWaiterForFutures on Future<Task> {
3268
.waitFor(timeout: timeout, interval: interval, client: client);
3369
}
3470
}
71+
72+
extension TaskWaiterForFutureList on Future<Iterable<Task>> {
73+
Future<List<Task>> waitFor({
74+
required MeiliSearchClient client,
75+
Duration timeout = const Duration(seconds: 20),
76+
Duration interval = const Duration(milliseconds: 50),
77+
}) async {
78+
return await (await this)
79+
.waitFor(timeout: timeout, interval: interval, client: client);
80+
}
81+
}

0 commit comments

Comments
 (0)