Skip to content

Commit ed0f754

Browse files
committed
feat: implement DocumentUploadQueue class (#33)
https://coveord.atlassian.net/browse/LENS-856
1 parent c3025bb commit ed0f754

File tree

3 files changed

+280
-3
lines changed

3 files changed

+280
-3
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/target/
22
.env
33
/.idea/
4+
.vscode
Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,105 @@
11
package com.coveo.pushapiclient;
22

33
import java.io.IOException;
4+
import java.util.ArrayList;
45

5-
// TODO: LENS-851 - Make public
6+
/**
7+
* Represents a queue for uploading documents using a specified upload strategy
8+
*/
69
class DocumentUploadQueue {
710
private final UploadStrategy uploader;
11+
private final int maxQueueSize = 5 * 1024 * 1024;
12+
private ArrayList<DocumentBuilder> documentToAddList;
13+
private ArrayList<DeleteDocument> documentToDeleteList;
14+
private int size;
815

16+
/**
17+
* Constructs a new DocumentUploadQueue object with a default maximum queue size
18+
* limit of 5MB.
19+
*
20+
* @param uploader The upload strategy to be used for document uploads.
21+
*/
922
public DocumentUploadQueue(UploadStrategy uploader) {
23+
this.documentToAddList = new ArrayList<>();
24+
this.documentToDeleteList = new ArrayList<>();
1025
this.uploader = uploader;
1126
}
1227

28+
/**
29+
* Flushes the accumulated documents by applying the upload strategy.
30+
*
31+
* @throws IOException If an I/O error occurs during the upload.
32+
* @throws InterruptedException If the upload process is interrupted.
33+
*/
1334
public void flush() throws IOException, InterruptedException {
14-
throw new UnsupportedOperationException("Unimplemented method (TODO: LENS-856)");
35+
if (this.isEmpty()) {
36+
return;
37+
}
38+
BatchUpdate batch = this.getBatch();
39+
// TODO: LENS-871: support concurrent requests
40+
this.uploader.apply(batch);
41+
this.size = 0;
42+
this.documentToAddList.clear();
43+
this.documentToDeleteList.clear();
1544
}
1645

46+
/**
47+
* Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if
48+
* it exceeds the maximum content length.
49+
* See {@link DocumentUploadQueue#flush}.
50+
*
51+
* @param document The document to be added to the index.
52+
* @throws IOException If an I/O error occurs during the upload.
53+
* @throws InterruptedException If the upload process is interrupted.
54+
*/
1755
public void add(DocumentBuilder document) throws IOException, InterruptedException {
18-
throw new UnsupportedOperationException("Unimplemented method (TODO: LENS-856)");
56+
if (document == null) {
57+
return;
58+
}
59+
60+
final int sizeOfDoc = document.marshal().getBytes().length;
61+
if (this.size + sizeOfDoc >= this.maxQueueSize) {
62+
this.flush();
63+
}
64+
if (document != null) {
65+
documentToAddList.add(document);
66+
this.size += sizeOfDoc;
67+
}
68+
}
69+
70+
/**
71+
* Adds a {@link DeleteDocument} to the upload queue and flushes the queue if
72+
* it exceeds the maximum content length.
73+
* See {@link DocumentUploadQueue#flush}.
74+
*
75+
* @param document The document to be delete from the index.
76+
* @throws IOException If an I/O error occurs during the upload.
77+
* @throws InterruptedException If the upload process is interrupted.
78+
*/
79+
public void add(DeleteDocument document) throws IOException, InterruptedException {
80+
if (document == null) {
81+
return;
82+
}
83+
84+
final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length;
85+
if (this.size + sizeOfDoc >= this.maxQueueSize) {
86+
this.flush();
87+
}
88+
if (document != null) {
89+
documentToDeleteList.add(document);
90+
this.size += sizeOfDoc;
91+
}
92+
}
93+
94+
public BatchUpdate getBatch() {
95+
return new BatchUpdate(
96+
new ArrayList<DocumentBuilder>(this.documentToAddList),
97+
new ArrayList<DeleteDocument>(this.documentToDeleteList));
98+
}
99+
100+
public boolean isEmpty() {
101+
// TODO: LENS-843: include partial document updates
102+
return documentToAddList.isEmpty() && documentToDeleteList.isEmpty();
19103
}
20104

21105
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.coveo.pushapiclient;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assert.assertTrue;
6+
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.Mockito.times;
8+
import static org.mockito.Mockito.verify;
9+
10+
import java.io.IOException;
11+
import java.util.ArrayList;
12+
13+
import org.junit.After;
14+
import org.junit.Before;
15+
import org.junit.Test;
16+
import org.mockito.InjectMocks;
17+
import org.mockito.Mock;
18+
import org.mockito.MockitoAnnotations;
19+
20+
public class DocumentUploadQueueTest {
21+
22+
@Mock
23+
private UploadStrategy uploadStrategy;
24+
25+
@InjectMocks
26+
private DocumentUploadQueue queue;
27+
28+
private AutoCloseable closeable;
29+
private DocumentBuilder documentToAdd;
30+
private DeleteDocument documentToDelete;
31+
32+
private int oneMegaByte = 1 * 1024 * 1024;
33+
34+
private String generateStringFromBytes(int numBytes) {
35+
// Check if the number of bytes is valid
36+
if (numBytes <= 0) {
37+
return "";
38+
}
39+
40+
// Create a byte array with the specified length
41+
byte[] bytes = new byte[numBytes];
42+
43+
// Fill the byte array with a pattern of ASCII characters
44+
byte pattern = 65; // ASCII value for 'A'
45+
for (int i = 0; i < numBytes; i++) {
46+
bytes[i] = pattern;
47+
}
48+
49+
return new String(bytes);
50+
}
51+
52+
private DocumentBuilder generateDocumentFromSize(int numBytes) {
53+
return new DocumentBuilder("https://my.document.uri?ref=1",
54+
"My bulky document")
55+
.withData(generateStringFromBytes(numBytes));
56+
}
57+
58+
@Before
59+
public void setup() {
60+
String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte);
61+
62+
documentToAdd = new DocumentBuilder(
63+
"https://my.document.uri?ref=1",
64+
"My new document")
65+
.withData(twoMegaByteData);
66+
67+
documentToDelete = new DeleteDocument("https://my.document.uri?ref=3");
68+
69+
closeable = MockitoAnnotations.openMocks(this);
70+
}
71+
72+
@After
73+
public void closeService() throws Exception {
74+
closeable.close();
75+
}
76+
77+
@Test
78+
public void testIsEmpty() throws IOException, InterruptedException {
79+
assertTrue(queue.isEmpty());
80+
}
81+
82+
@Test
83+
public void testIsNotEmpty() throws IOException, InterruptedException {
84+
queue.add(documentToAdd);
85+
assertFalse(queue.isEmpty());
86+
}
87+
88+
@Test
89+
public void testShouldReturnBatch() throws IOException, InterruptedException {
90+
BatchUpdate batchUpdate = new BatchUpdate(
91+
new ArrayList<>() {
92+
{
93+
add(documentToAdd);
94+
}
95+
}, new ArrayList<>() {
96+
{
97+
add(documentToDelete);
98+
}
99+
});
100+
queue.add(documentToAdd);
101+
queue.add(documentToDelete);
102+
103+
assertEquals(batchUpdate, queue.getBatch());
104+
}
105+
106+
@Test
107+
public void testFlushShouldNotUploadDocumentaWhenRequiredSizeIsNotMet() throws IOException, InterruptedException {
108+
queue.add(documentToAdd);
109+
queue.add(documentToDelete);
110+
111+
verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class));
112+
}
113+
114+
@Test
115+
public void testShouldAutomaticallyFlushAccumulatedDocuments() throws IOException, InterruptedException {
116+
DocumentBuilder firstBulkyDocument = generateDocumentFromSize(2 * oneMegaByte);
117+
DocumentBuilder secondBulkyDocument = generateDocumentFromSize(2 * oneMegaByte);
118+
DocumentBuilder thirdBulkyDocument = generateDocumentFromSize(2 * oneMegaByte);
119+
ArrayList<DeleteDocument> emptyList = new ArrayList<>();
120+
BatchUpdate firstBatch = new BatchUpdate(
121+
new ArrayList<>() {
122+
{
123+
add(firstBulkyDocument);
124+
add(secondBulkyDocument);
125+
}
126+
}, emptyList);
127+
128+
// Adding 3 documents of 2MB to the queue. After adding the first 2 documents,
129+
// the queue size will reach 6MB, which exceeds the maximum queue size
130+
// limit. Therefore, the 2 first added documents will automatically be uploaded
131+
// to the source.
132+
queue.add(firstBulkyDocument);
133+
queue.add(secondBulkyDocument);
134+
135+
// The 3rd document added to the queue will be included in a separate batch,
136+
// which will not be uploaded unless the `flush()` method is called or until the
137+
// queue size limit has been reached
138+
queue.add(thirdBulkyDocument);
139+
140+
verify(uploadStrategy, times(1)).apply(any(BatchUpdate.class));
141+
verify(uploadStrategy, times(1)).apply(firstBatch);
142+
}
143+
144+
@Test
145+
public void testShouldManuallyFlushAccumulatedDocuments() throws IOException, InterruptedException {
146+
DocumentBuilder firstBulkyDocument = generateDocumentFromSize(2 * oneMegaByte);
147+
DocumentBuilder secondBulkyDocument = generateDocumentFromSize(2 * oneMegaByte);
148+
DocumentBuilder thirdBulkyDocument = generateDocumentFromSize(2 * oneMegaByte);
149+
ArrayList<DeleteDocument> emptyList = new ArrayList<>();
150+
BatchUpdate firstBatch = new BatchUpdate(
151+
new ArrayList<>() {
152+
{
153+
add(firstBulkyDocument);
154+
add(secondBulkyDocument);
155+
}
156+
}, emptyList);
157+
158+
BatchUpdate secondBatch = new BatchUpdate(
159+
new ArrayList<>() {
160+
{
161+
add(thirdBulkyDocument);
162+
}
163+
}, emptyList);
164+
165+
// Adding 3 documents of 2MB to the queue. After adding the first 2 documents,
166+
// the queue size will reach 6MB, which exceeds the maximum queue size
167+
// limit. Therefore, the 2 first added documents will automatically be uploaded
168+
// to the source.
169+
queue.add(firstBulkyDocument);
170+
queue.add(secondBulkyDocument);
171+
queue.add(thirdBulkyDocument);
172+
173+
queue.flush();
174+
175+
// Additional flush will have no effect if documents where already flushed
176+
queue.flush();
177+
178+
verify(uploadStrategy, times(2)).apply(any(BatchUpdate.class));
179+
verify(uploadStrategy, times(1)).apply(firstBatch);
180+
verify(uploadStrategy, times(1)).apply(secondBatch);
181+
}
182+
183+
@Test
184+
public void testAddingEmptyDocument() throws IOException, InterruptedException {
185+
DocumentBuilder nullDocument = null;
186+
187+
queue.add(nullDocument);
188+
queue.flush();
189+
190+
verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class));
191+
}
192+
}

0 commit comments

Comments
 (0)