Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e0296ef
create `PushSource` and `catalogSource` classes
y-lakhdar May 19, 2023
419d177
add boilerplate for stream service
y-lakhdar May 19, 2023
4e8528c
Merge branch 'main' of github.com:coveo/push-api-client.java into LEN…
y-lakhdar May 19, 2023
1c5adec
revert master merge
y-lakhdar May 19, 2023
f5a2557
add required POST calls
y-lakhdar May 19, 2023
673b76a
update streamService
y-lakhdar May 19, 2023
5ef6772
apply ApiUrl corrections
y-lakhdar May 23, 2023
93c9867
Merge branch 'LEN-839' of github.com:coveo/push-api-client.java into …
y-lakhdar May 23, 2023
8e1e3e7
document StreamService
y-lakhdar May 23, 2023
de36933
Merge branch 'main' of github.com:coveo/push-api-client.java into LEN…
y-lakhdar May 25, 2023
7f9ad5e
Update src/main/java/com/coveo/pushapiclient/StreamService.java
y-lakhdar May 26, 2023
3499404
Update src/main/java/com/coveo/pushapiclient/StreamService.java
y-lakhdar May 26, 2023
f3ec845
Apply suggestions from code review
y-lakhdar May 26, 2023
508564e
refactor unit tests
y-lakhdar May 26, 2023
954ad44
Merge branch 'LEN-838' of github.com:coveo/push-api-client.java into …
y-lakhdar May 26, 2023
0e9acac
Merge branch 'main' into LEN-838
y-lakhdar May 29, 2023
c90446c
apply corrections
y-lakhdar May 30, 2023
cd43eb9
Merge branch 'LEN-838' of github.com:coveo/push-api-client.java into …
y-lakhdar May 30, 2023
933be43
Update codeql.yml
louis-bompart May 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Set up JDK 16
- name: Set up JDK 11
uses: actions/setup-java@v2
with:
java-version: '16'
java-version: '11'
distribution: 'adopt'
- name: Build with Maven
run: mvn -B package --file pom.xml
2 changes: 1 addition & 1 deletion .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ jobs:
analyze-java:
uses: coveo/public-actions/.github/workflows/java-maven-openjdk11-codeql.yml@main
with:
runs-on: "['linux', 'x64', 'ec2.instance-type = t3.large']"
runs-on: ubuntu-latest
21 changes: 21 additions & 0 deletions src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.coveo.pushapiclient;

import java.io.IOException;

// TODO: LENS-851 - Make public
class DocumentUploadQueue {
private final UploadStrategy uploader;

public DocumentUploadQueue(UploadStrategy uploader) {
this.uploader = uploader;
}

public void flush() throws IOException, InterruptedException {
throw new UnsupportedOperationException("Unimplemented method (TODO: LENS-856)");
}

public void add(DocumentBuilder document) throws IOException, InterruptedException {
throw new UnsupportedOperationException("Unimplemented method (TODO: LENS-856)");
}

}
41 changes: 41 additions & 0 deletions src/main/java/com/coveo/pushapiclient/PlatformClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,47 @@ public HttpResponse<String> deleteDocument(String sourceId, String documentId, B
return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

public HttpResponse<String> openStream(String sourceId) throws IOException, InterruptedException {
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
// TODO: LENS-875: standardize string manipulation
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/open", sourceId));

// TODO: LENS-876: reduce code duplication
HttpRequest request = HttpRequest.newBuilder()
.headers(headers)
.uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(""))
.build();

return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

public HttpResponse<String> closeStream(String sourceId, String streamId) throws IOException, InterruptedException {
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/%s/close", sourceId, streamId));

HttpRequest request = HttpRequest.newBuilder()
.headers(headers)
.uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(""))
.build();

return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

public HttpResponse<String> requireStreamChunk(String sourceId, String streamId) throws IOException, InterruptedException {
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/%s/chunk", sourceId, streamId));

HttpRequest request = HttpRequest.newBuilder()
.headers(headers)
.uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(""))
.build();

return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

/**
* Create a file container. See [Creating a File Container](https://docs.coveo.com/en/43).
*
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/coveo/pushapiclient/StreamResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.coveo.pushapiclient;

import java.util.Map;

public class StreamResponse {
public String uploadUri;
public String fileId;
public String streamId;
public Map<String, String> requiredHeaders;

}
116 changes: 116 additions & 0 deletions src/main/java/com/coveo/pushapiclient/StreamService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.coveo.pushapiclient;

import java.io.IOException;
import java.net.http.HttpResponse;

import com.coveo.pushapiclient.exceptions.NoOpenStreamException;
import com.google.gson.Gson;

// TODO: LENS-851 - Make public
class StreamService {
private final StreamEnabledSource source;
private final PlatformClient platformClient;
private StreamServiceInternal service;
private String streamId;
private DocumentUploadQueue queue;

/**
* Creates a service to stream your documents to the provided source by
* interacting with the Stream API.
*
* <p>
* To perform <a href="https://docs.coveo.com/en/l62e0540">full document
* updates</a>, use the {@PushService}, since pushing documents with the
* {@StreamService} is equivalent to triggering a full source rebuild. The
* {@StreamService} can also be used for an initial catalog upload.
*
* @param source The source to which you want to send your documents.
*/
public StreamService(StreamEnabledSource source) {
String apiKey = source.getApiKey();
String organizationId = source.getOrganizationId();
PlatformUrl platformUrl = source.getPlatformUrl();
UploadStrategy uploader = this.getUploadStrategy();

this.source = source;
this.queue = new DocumentUploadQueue(uploader);
this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl);
this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient);
}

/**
* Adds documents to the previously specified source.
* This function will open a stream before uploading documents into it.
*
* <p>
* If called several times, the service will automatically batch documents and
* create new stream chunks whenever the data payload exceeds the
* <a href="https://docs.coveo.com/en/lb4a0344#stream-api-limits">batch size limit</a>
* set for the Stream API.
*
* <p>
* Once there are no more documents to add, it is important to call the {@link StreamService#close} function
* in order to send any buffered documents and close the open stream.
* Otherwise, changes will not be reflected in the index.
*
* <p>
* <pre>
* {@code
* //...
* StreamService service = new StreamService(source));
* for (DocumentBuilder document : fictionalDocumentList) {
* service.add(document);
* }
* service.close(document);
* </pre>
*
* <p>
* For more code samples, visit <a href="TODO: LENS-840">Stream data to your catalog source</a>
*
* @param document The documentBuilder to add to your source
* @throws InterruptedException
* @throws IOException
*/
public void add(DocumentBuilder document) throws IOException, InterruptedException {
this.service.add(document);
}

/**
* Sends any buffered documents and <a href="https://docs.coveo.com/en/lb4a0344#step-3-close-the-stream">closes the stream</a>.
*
* <p>
* Upon invoking this method, any indexed items not added through this {@link StreamService} instance will be removed.
* All documents added from the initialization of the service until the invocation of the {@link StreamService#close} function
* will completely replace the previous content of the source.
*
* <p>
* When you upload a catalog into a source, it will replace the previous content
* of the source completely. Expect a 15-minute delay for the removal of the old
* items from the index.
*
* @return
* @throws IOException
* @throws InterruptedException
* @throws NoOpenStreamException
*/
public HttpResponse<String> close() throws IOException, InterruptedException, NoOpenStreamException {
return this.service.close();
}

private UploadStrategy getUploadStrategy() {
return (batchUpdate) -> {
String sourceId = this.getSourceId();
HttpResponse<String> resFileContainer = this.platformClient.requireStreamChunk(sourceId, this.streamId);
FileContainer fileContainer = new Gson().fromJson(resFileContainer.body(), FileContainer.class);
String batchUpdateJson = new Gson().toJson(batchUpdate.marshal());
return this.platformClient.uploadContentToFileContainer(fileContainer,
batchUpdateJson);

};
}

private String getSourceId() {
return this.source.getId();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.coveo.pushapiclient;

import java.io.IOException;
import java.net.http.HttpResponse;

import com.coveo.pushapiclient.exceptions.NoOpenStreamException;
import com.google.gson.Gson;

/**
* For internal use only. Made to easily test the service without having to use PowerMock
*/
class StreamServiceInternal {
private final StreamEnabledSource source;
private final PlatformClient platformClient;
private String streamId;
private DocumentUploadQueue queue;

public StreamServiceInternal(StreamEnabledSource source, DocumentUploadQueue queue, PlatformClient platformClient) {
this.source = source;
this.queue = queue;
this.platformClient = platformClient;
}

public void add(DocumentBuilder document) throws IOException, InterruptedException {
if (this.streamId == null) {
this.streamId = this.getStreamId();
}
queue.add(document);
}

public HttpResponse<String> close() throws IOException, InterruptedException, NoOpenStreamException {
if (this.streamId == null) {
throw new NoOpenStreamException(
"No open stream detected. A stream will automatically be opened once you start adding documents.");
}
queue.flush();
String sourceId = this.getSourceId();
return this.platformClient.closeStream(sourceId, this.streamId);
}

private String getStreamId() throws IOException, InterruptedException {
String sourceId = this.getSourceId();
HttpResponse<String> response = this.platformClient.openStream(sourceId);
StreamResponse streamResponse = new Gson().fromJson(response.body(), StreamResponse.class);
return streamResponse.streamId;
}

private String getSourceId() {
return this.source.getId();
}

}
9 changes: 9 additions & 0 deletions src/main/java/com/coveo/pushapiclient/UploadStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.coveo.pushapiclient;

import java.io.IOException;
import java.net.http.HttpResponse;

@FunctionalInterface
public interface UploadStrategy {
HttpResponse<String> apply(BatchUpdate batchUpdate) throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.coveo.pushapiclient.exceptions;

public class NoOpenStreamException extends Exception {
public NoOpenStreamException(String errorMessage) {
super(errorMessage);
}
}
33 changes: 33 additions & 0 deletions src/test/java/com/coveo/pushapiclient/PlatformClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,39 @@ public void testPushFileContainerContent() throws IOException, InterruptedExcept
assertAuthorizationHeader();
}

@Test
public void testOpenStream() throws IOException, InterruptedException {
client.openStream("my_source");
verify(httpClient).send(argument.capture(), any(HttpResponse.BodyHandlers.ofString().getClass()));

assertEquals("POST", argument.getValue().method());
assertTrue(argument.getValue().uri().getPath().contains("the_org_id/sources/my_source/stream/open"));
assertApplicationJsonHeader();
assertAuthorizationHeader();
}

@Test
public void testRequireStreamChunk() throws IOException, InterruptedException {
client.requireStreamChunk("my_source", "stream_id");
verify(httpClient).send(argument.capture(), any(HttpResponse.BodyHandlers.ofString().getClass()));

assertEquals("POST", argument.getValue().method());
assertTrue(argument.getValue().uri().getPath().contains("the_org_id/sources/my_source/stream/stream_id/chunk"));
assertApplicationJsonHeader();
assertAuthorizationHeader();
}

@Test
public void testCloseStream() throws IOException, InterruptedException {
client.closeStream("my_source", "stream_id");
verify(httpClient).send(argument.capture(), any(HttpResponse.BodyHandlers.ofString().getClass()));

assertEquals("POST", argument.getValue().method());
assertTrue(argument.getValue().uri().getPath().contains("the_org_id/sources/my_source/stream/stream_id/close"));
assertApplicationJsonHeader();
assertAuthorizationHeader();
}

@Test
public void testDeleteDocument() throws IOException, InterruptedException {
client.deleteDocument("my_source", document().uri, true);
Expand Down
Loading