Skip to content

Commit 75d605f

Browse files
committed
feat: implement StreamService class (#32)
https://coveord.atlassian.net/browse/LENS-838
1 parent d21d776 commit 75d605f

File tree

11 files changed

+389
-3
lines changed

11 files changed

+389
-3
lines changed

.github/workflows/build.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ jobs:
1515

1616
steps:
1717
- uses: actions/checkout@v2
18-
- name: Set up JDK 16
18+
- name: Set up JDK 11
1919
uses: actions/setup-java@v2
2020
with:
21-
java-version: '16'
21+
java-version: '11'
2222
distribution: 'adopt'
2323
- name: Build with Maven
2424
run: mvn -B package --file pom.xml

.github/workflows/codeql.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ jobs:
1414
analyze-java:
1515
uses: coveo/public-actions/.github/workflows/java-maven-openjdk11-codeql.yml@main
1616
with:
17-
runs-on: "['linux', 'x64', 'ec2.instance-type = t3.large']"
17+
runs-on: ubuntu-latest
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.coveo.pushapiclient;
2+
3+
import java.io.IOException;
4+
5+
// TODO: LENS-851 - Make public
6+
class DocumentUploadQueue {
7+
private final UploadStrategy uploader;
8+
9+
public DocumentUploadQueue(UploadStrategy uploader) {
10+
this.uploader = uploader;
11+
}
12+
13+
public void flush() throws IOException, InterruptedException {
14+
throw new UnsupportedOperationException("Unimplemented method (TODO: LENS-856)");
15+
}
16+
17+
public void add(DocumentBuilder document) throws IOException, InterruptedException {
18+
throw new UnsupportedOperationException("Unimplemented method (TODO: LENS-856)");
19+
}
20+
21+
}

src/main/java/com/coveo/pushapiclient/PlatformClient.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,47 @@ public HttpResponse<String> deleteDocument(String sourceId, String documentId, B
282282
return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
283283
}
284284

285+
public HttpResponse<String> openStream(String sourceId) throws IOException, InterruptedException {
286+
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
287+
// TODO: LENS-875: standardize string manipulation
288+
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/open", sourceId));
289+
290+
// TODO: LENS-876: reduce code duplication
291+
HttpRequest request = HttpRequest.newBuilder()
292+
.headers(headers)
293+
.uri(uri)
294+
.POST(HttpRequest.BodyPublishers.ofString(""))
295+
.build();
296+
297+
return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
298+
}
299+
300+
public HttpResponse<String> closeStream(String sourceId, String streamId) throws IOException, InterruptedException {
301+
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
302+
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/%s/close", sourceId, streamId));
303+
304+
HttpRequest request = HttpRequest.newBuilder()
305+
.headers(headers)
306+
.uri(uri)
307+
.POST(HttpRequest.BodyPublishers.ofString(""))
308+
.build();
309+
310+
return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
311+
}
312+
313+
public HttpResponse<String> requireStreamChunk(String sourceId, String streamId) throws IOException, InterruptedException {
314+
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
315+
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/%s/chunk", sourceId, streamId));
316+
317+
HttpRequest request = HttpRequest.newBuilder()
318+
.headers(headers)
319+
.uri(uri)
320+
.POST(HttpRequest.BodyPublishers.ofString(""))
321+
.build();
322+
323+
return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
324+
}
325+
285326
/**
286327
* Create a file container. See [Creating a File Container](https://docs.coveo.com/en/43).
287328
*
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.coveo.pushapiclient;
2+
3+
import java.util.Map;
4+
5+
public class StreamResponse {
6+
public String uploadUri;
7+
public String fileId;
8+
public String streamId;
9+
public Map<String, String> requiredHeaders;
10+
11+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package com.coveo.pushapiclient;
2+
3+
import java.io.IOException;
4+
import java.net.http.HttpResponse;
5+
6+
import com.coveo.pushapiclient.exceptions.NoOpenStreamException;
7+
import com.google.gson.Gson;
8+
9+
// TODO: LENS-851 - Make public
10+
class StreamService {
11+
private final StreamEnabledSource source;
12+
private final PlatformClient platformClient;
13+
private StreamServiceInternal service;
14+
private String streamId;
15+
private DocumentUploadQueue queue;
16+
17+
/**
18+
* Creates a service to stream your documents to the provided source by
19+
* interacting with the Stream API.
20+
*
21+
* <p>
22+
* To perform <a href="https://docs.coveo.com/en/l62e0540">full document
23+
* updates</a>, use the {@PushService}, since pushing documents with the
24+
* {@StreamService} is equivalent to triggering a full source rebuild. The
25+
* {@StreamService} can also be used for an initial catalog upload.
26+
*
27+
* @param source The source to which you want to send your documents.
28+
*/
29+
public StreamService(StreamEnabledSource source) {
30+
String apiKey = source.getApiKey();
31+
String organizationId = source.getOrganizationId();
32+
PlatformUrl platformUrl = source.getPlatformUrl();
33+
UploadStrategy uploader = this.getUploadStrategy();
34+
35+
this.source = source;
36+
this.queue = new DocumentUploadQueue(uploader);
37+
this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl);
38+
this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient);
39+
}
40+
41+
/**
42+
* Adds documents to the previously specified source.
43+
* This function will open a stream before uploading documents into it.
44+
*
45+
* <p>
46+
* If called several times, the service will automatically batch documents and
47+
* create new stream chunks whenever the data payload exceeds the
48+
* <a href="https://docs.coveo.com/en/lb4a0344#stream-api-limits">batch size limit</a>
49+
* set for the Stream API.
50+
*
51+
* <p>
52+
* Once there are no more documents to add, it is important to call the {@link StreamService#close} function
53+
* in order to send any buffered documents and close the open stream.
54+
* Otherwise, changes will not be reflected in the index.
55+
*
56+
* <p>
57+
* <pre>
58+
* {@code
59+
* //...
60+
* StreamService service = new StreamService(source));
61+
* for (DocumentBuilder document : fictionalDocumentList) {
62+
* service.add(document);
63+
* }
64+
* service.close(document);
65+
* </pre>
66+
*
67+
* <p>
68+
* For more code samples, visit <a href="TODO: LENS-840">Stream data to your catalog source</a>
69+
*
70+
* @param document The documentBuilder to add to your source
71+
* @throws InterruptedException
72+
* @throws IOException
73+
*/
74+
public void add(DocumentBuilder document) throws IOException, InterruptedException {
75+
this.service.add(document);
76+
}
77+
78+
/**
79+
* Sends any buffered documents and <a href="https://docs.coveo.com/en/lb4a0344#step-3-close-the-stream">closes the stream</a>.
80+
*
81+
* <p>
82+
* Upon invoking this method, any indexed items not added through this {@link StreamService} instance will be removed.
83+
* All documents added from the initialization of the service until the invocation of the {@link StreamService#close} function
84+
* will completely replace the previous content of the source.
85+
*
86+
* <p>
87+
* When you upload a catalog into a source, it will replace the previous content
88+
* of the source completely. Expect a 15-minute delay for the removal of the old
89+
* items from the index.
90+
*
91+
* @return
92+
* @throws IOException
93+
* @throws InterruptedException
94+
* @throws NoOpenStreamException
95+
*/
96+
public HttpResponse<String> close() throws IOException, InterruptedException, NoOpenStreamException {
97+
return this.service.close();
98+
}
99+
100+
private UploadStrategy getUploadStrategy() {
101+
return (batchUpdate) -> {
102+
String sourceId = this.getSourceId();
103+
HttpResponse<String> resFileContainer = this.platformClient.requireStreamChunk(sourceId, this.streamId);
104+
FileContainer fileContainer = new Gson().fromJson(resFileContainer.body(), FileContainer.class);
105+
String batchUpdateJson = new Gson().toJson(batchUpdate.marshal());
106+
return this.platformClient.uploadContentToFileContainer(fileContainer,
107+
batchUpdateJson);
108+
109+
};
110+
}
111+
112+
private String getSourceId() {
113+
return this.source.getId();
114+
}
115+
116+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.coveo.pushapiclient;
2+
3+
import java.io.IOException;
4+
import java.net.http.HttpResponse;
5+
6+
import com.coveo.pushapiclient.exceptions.NoOpenStreamException;
7+
import com.google.gson.Gson;
8+
9+
/**
10+
* For internal use only. Made to easily test the service without having to use PowerMock
11+
*/
12+
class StreamServiceInternal {
13+
private final StreamEnabledSource source;
14+
private final PlatformClient platformClient;
15+
private String streamId;
16+
private DocumentUploadQueue queue;
17+
18+
public StreamServiceInternal(StreamEnabledSource source, DocumentUploadQueue queue, PlatformClient platformClient) {
19+
this.source = source;
20+
this.queue = queue;
21+
this.platformClient = platformClient;
22+
}
23+
24+
public void add(DocumentBuilder document) throws IOException, InterruptedException {
25+
if (this.streamId == null) {
26+
this.streamId = this.getStreamId();
27+
}
28+
queue.add(document);
29+
}
30+
31+
public HttpResponse<String> close() throws IOException, InterruptedException, NoOpenStreamException {
32+
if (this.streamId == null) {
33+
throw new NoOpenStreamException(
34+
"No open stream detected. A stream will automatically be opened once you start adding documents.");
35+
}
36+
queue.flush();
37+
String sourceId = this.getSourceId();
38+
return this.platformClient.closeStream(sourceId, this.streamId);
39+
}
40+
41+
private String getStreamId() throws IOException, InterruptedException {
42+
String sourceId = this.getSourceId();
43+
HttpResponse<String> response = this.platformClient.openStream(sourceId);
44+
StreamResponse streamResponse = new Gson().fromJson(response.body(), StreamResponse.class);
45+
return streamResponse.streamId;
46+
}
47+
48+
private String getSourceId() {
49+
return this.source.getId();
50+
}
51+
52+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.coveo.pushapiclient;
2+
3+
import java.io.IOException;
4+
import java.net.http.HttpResponse;
5+
6+
@FunctionalInterface
7+
public interface UploadStrategy {
8+
HttpResponse<String> apply(BatchUpdate batchUpdate) throws IOException, InterruptedException;
9+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.coveo.pushapiclient.exceptions;
2+
3+
public class NoOpenStreamException extends Exception {
4+
public NoOpenStreamException(String errorMessage) {
5+
super(errorMessage);
6+
}
7+
}

src/test/java/com/coveo/pushapiclient/PlatformClientTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,39 @@ public void testPushFileContainerContent() throws IOException, InterruptedExcept
266266
assertAuthorizationHeader();
267267
}
268268

269+
@Test
270+
public void testOpenStream() throws IOException, InterruptedException {
271+
client.openStream("my_source");
272+
verify(httpClient).send(argument.capture(), any(HttpResponse.BodyHandlers.ofString().getClass()));
273+
274+
assertEquals("POST", argument.getValue().method());
275+
assertTrue(argument.getValue().uri().getPath().contains("the_org_id/sources/my_source/stream/open"));
276+
assertApplicationJsonHeader();
277+
assertAuthorizationHeader();
278+
}
279+
280+
@Test
281+
public void testRequireStreamChunk() throws IOException, InterruptedException {
282+
client.requireStreamChunk("my_source", "stream_id");
283+
verify(httpClient).send(argument.capture(), any(HttpResponse.BodyHandlers.ofString().getClass()));
284+
285+
assertEquals("POST", argument.getValue().method());
286+
assertTrue(argument.getValue().uri().getPath().contains("the_org_id/sources/my_source/stream/stream_id/chunk"));
287+
assertApplicationJsonHeader();
288+
assertAuthorizationHeader();
289+
}
290+
291+
@Test
292+
public void testCloseStream() throws IOException, InterruptedException {
293+
client.closeStream("my_source", "stream_id");
294+
verify(httpClient).send(argument.capture(), any(HttpResponse.BodyHandlers.ofString().getClass()));
295+
296+
assertEquals("POST", argument.getValue().method());
297+
assertTrue(argument.getValue().uri().getPath().contains("the_org_id/sources/my_source/stream/stream_id/close"));
298+
assertApplicationJsonHeader();
299+
assertAuthorizationHeader();
300+
}
301+
269302
@Test
270303
public void testDeleteDocument() throws IOException, InterruptedException {
271304
client.deleteDocument("my_source", document().uri, true);

0 commit comments

Comments
 (0)