Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e0296ef
create `PushSource` and `catalogSource` classes
y-lakhdar May 19, 2023
419d177
add boilerplate for stream service
y-lakhdar May 19, 2023
4e8528c
Merge branch 'main' of github.com:coveo/push-api-client.java into LEN…
y-lakhdar May 19, 2023
1c5adec
revert master merge
y-lakhdar May 19, 2023
f5a2557
add required POST calls
y-lakhdar May 19, 2023
673b76a
update streamService
y-lakhdar May 19, 2023
5ef6772
apply ApiUrl corrections
y-lakhdar May 23, 2023
93c9867
Merge branch 'LEN-839' of github.com:coveo/push-api-client.java into …
y-lakhdar May 23, 2023
8e1e3e7
document StreamService
y-lakhdar May 23, 2023
de36933
Merge branch 'main' of github.com:coveo/push-api-client.java into LEN…
y-lakhdar May 25, 2023
7f9ad5e
Update src/main/java/com/coveo/pushapiclient/StreamService.java
y-lakhdar May 26, 2023
3499404
Update src/main/java/com/coveo/pushapiclient/StreamService.java
y-lakhdar May 26, 2023
f3ec845
Apply suggestions from code review
y-lakhdar May 26, 2023
508564e
refactor unit tests
y-lakhdar May 26, 2023
954ad44
Merge branch 'LEN-838' of github.com:coveo/push-api-client.java into …
y-lakhdar May 26, 2023
0e9acac
Merge branch 'main' into LEN-838
y-lakhdar May 29, 2023
b121952
create batch update accumulator
y-lakhdar May 29, 2023
476a09d
rework queue
y-lakhdar May 29, 2023
141889d
complete unit tests
y-lakhdar May 30, 2023
aa68240
remove comments
y-lakhdar May 30, 2023
369525a
update comment
y-lakhdar May 30, 2023
7d5dc2c
draft PushService
y-lakhdar May 30, 2023
c90446c
apply corrections
y-lakhdar May 30, 2023
cd43eb9
Merge branch 'LEN-838' of github.com:coveo/push-api-client.java into …
y-lakhdar May 30, 2023
9a7c88b
Merge branch 'LEN-838' of github.com:coveo/push-api-client.java into …
y-lakhdar May 30, 2023
cad5aac
add gitignore
y-lakhdar May 30, 2023
bd5ab59
Merge branch 'LENS-856' of github.com:coveo/push-api-client.java into…
y-lakhdar May 30, 2023
0b5baa1
fix merge conflict
y-lakhdar May 30, 2023
ce594de
Merge branch 'LENS-856' of github.com:coveo/push-api-client.java into…
y-lakhdar May 30, 2023
40a73dd
add unit tests
y-lakhdar May 30, 2023
b5bbee0
Apply suggestions from code review
y-lakhdar Jun 2, 2023
4d6ff14
Merge branch 'main' of github.com:coveo/push-api-client.java into LEN…
y-lakhdar Jun 2, 2023
018998b
Merge branch 'LENS-837' of github.com:coveo/push-api-client.java into…
y-lakhdar Jun 2, 2023
e429d3e
applying corrections
y-lakhdar Jun 2, 2023
d50d170
Merge branch 'main' into LENS-837
y-lakhdar Jun 8, 2023
cce0ca3
Merge branch 'main' into LENS-837
y-lakhdar Jun 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Set up JDK 16
- name: Set up JDK 11
uses: actions/setup-java@v2
with:
java-version: '16'
java-version: '11'
distribution: 'adopt'
- name: Build with Maven
run: mvn -B package --file pom.xml
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target/
.env
/.idea/
.vscode
105 changes: 105 additions & 0 deletions src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.coveo.pushapiclient;

import java.io.IOException;
import java.util.ArrayList;

/**
* Represents a queue for uploading documents using a specified upload strategy
*/
class DocumentUploadQueue {
private final UploadStrategy uploader;
private final int maxQueueSize = 5 * 1024 * 1024;
private ArrayList<DocumentBuilder> documentToAddList;
private ArrayList<DeleteDocument> documentToDeleteList;
private int size;

/**
* Constructs a new DocumentUploadQueue object with a default maximum queue size
* limit of 5MB.
*
* @param uploader The upload strategy to be used for document uploads.
*/
public DocumentUploadQueue(UploadStrategy uploader) {
this.documentToAddList = new ArrayList<>();
this.documentToDeleteList = new ArrayList<>();
this.uploader = uploader;
}

/**
* Flushes the accumulated documents by applying the upload strategy.
*
* @throws IOException If an I/O error occurs during the upload.
* @throws InterruptedException If the upload process is interrupted.
*/
public void flush() throws IOException, InterruptedException {
if (this.isEmpty()) {
return;
}
BatchUpdate batch = this.getBatch();
// TODO: LENS-871: support concurrent requests
this.uploader.apply(batch);
this.size = 0;
this.documentToAddList.clear();
this.documentToDeleteList.clear();
}

/**
* Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if
* it exceeds the maximum content length.
* See {@link DocumentUploadQueue#flush}.
*
* @param document The document to be added to the index.
* @throws IOException If an I/O error occurs during the upload.
* @throws InterruptedException If the upload process is interrupted.
*/
public void add(DocumentBuilder document) throws IOException, InterruptedException {
if (document == null) {
return;
}

final int sizeOfDoc = document.marshal().getBytes().length;
if (this.size + sizeOfDoc >= this.maxQueueSize) {
this.flush();
}
if (document != null) {
documentToAddList.add(document);
this.size += sizeOfDoc;
}
}

/**
* Adds a {@link DeleteDocument} to the upload queue and flushes the queue if
* it exceeds the maximum content length.
* See {@link DocumentUploadQueue#flush}.
*
* @param document The document to be delete from the index.
* @throws IOException If an I/O error occurs during the upload.
* @throws InterruptedException If the upload process is interrupted.
*/
public void add(DeleteDocument document) throws IOException, InterruptedException {
if (document == null) {
return;
}

final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length;
if (this.size + sizeOfDoc >= this.maxQueueSize) {
this.flush();
}
if (document != null) {
documentToDeleteList.add(document);
this.size += sizeOfDoc;
}
}

public BatchUpdate getBatch() {
return new BatchUpdate(
new ArrayList<DocumentBuilder>(this.documentToAddList),
new ArrayList<DeleteDocument>(this.documentToDeleteList));
}

public boolean isEmpty() {
// TODO: LENS-843: include partial document updates
return documentToAddList.isEmpty() && documentToDeleteList.isEmpty();
}

}
41 changes: 41 additions & 0 deletions src/main/java/com/coveo/pushapiclient/PlatformClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,47 @@ public HttpResponse<String> deleteDocument(String sourceId, String documentId, B
return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

public HttpResponse<String> openStream(String sourceId) throws IOException, InterruptedException {
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
// TODO: LENS-875: standardize string manipulation
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/open", sourceId));

// TODO: LENS-876: reduce code duplication
HttpRequest request = HttpRequest.newBuilder()
.headers(headers)
.uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(""))
.build();

return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

public HttpResponse<String> closeStream(String sourceId, String streamId) throws IOException, InterruptedException {
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/%s/close", sourceId, streamId));

HttpRequest request = HttpRequest.newBuilder()
.headers(headers)
.uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(""))
.build();

return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

public HttpResponse<String> requireStreamChunk(String sourceId, String streamId) throws IOException, InterruptedException {
String[] headers = this.getHeaders(this.getAuthorizationHeader(), this.getContentTypeApplicationJSONHeader());
URI uri = URI.create(this.getBasePushURL() + String.format("/sources/%s/stream/%s/chunk", sourceId, streamId));

HttpRequest request = HttpRequest.newBuilder()
.headers(headers)
.uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(""))
.build();

return this.httpClient.send(request, HttpResponse.BodyHandlers.ofString());
}

/**
* Create a file container. See [Creating a File Container](https://docs.coveo.com/en/43).
*
Expand Down
51 changes: 51 additions & 0 deletions src/main/java/com/coveo/pushapiclient/PushService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.coveo.pushapiclient;

import java.io.IOException;
import java.net.http.HttpResponse;

import com.google.gson.Gson;

public class PushService {
private final PushEnabledSource source;
private final PlatformClient platformClient;
private PushServiceInternal service;

public PushService(PushEnabledSource source) {
String apiKey = source.getApiKey();
String organizationId = source.getOrganizationId();
PlatformUrl platformUrl = source.getPlatformUrl();
UploadStrategy uploader = this.getUploadStrategy();
DocumentUploadQueue queue = new DocumentUploadQueue(uploader);

this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl);
this.service = new PushServiceInternal(queue);
this.source = source;
}

public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException {
// TODO: LENS-843: include partial document updates
this.service.addOrUpdate(document);
}

public void delete(DeleteDocument document) throws IOException, InterruptedException {
this.service.delete(document);
}

public void close() throws IOException, InterruptedException {
this.service.close();
}

private UploadStrategy getUploadStrategy() {
return (batchUpdate) -> {
String sourceId = this.getSourceId();
HttpResponse<String> resFileContainer = this.platformClient.createFileContainer();
FileContainer fileContainer = new Gson().fromJson(resFileContainer.body(), FileContainer.class);
this.platformClient.uploadContentToFileContainer(fileContainer, new Gson().toJson(batchUpdate.marshal()));
return this.platformClient.pushFileContainerContent(sourceId, fileContainer);
};
}

private String getSourceId() {
return this.source.getId();
}
}
24 changes: 24 additions & 0 deletions src/main/java/com/coveo/pushapiclient/PushServiceInternal.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.coveo.pushapiclient;

import java.io.IOException;

public class PushServiceInternal {
private DocumentUploadQueue queue;

public PushServiceInternal(DocumentUploadQueue queue) {
this.queue = queue;
}

public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException {
this.queue.add(document);
}

public void delete(DeleteDocument document) throws IOException, InterruptedException {
this.queue.add(document);
}

public void close() throws IOException, InterruptedException {
queue.flush();
}

}
11 changes: 11 additions & 0 deletions src/main/java/com/coveo/pushapiclient/StreamResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.coveo.pushapiclient;

import java.util.Map;

public class StreamResponse {
public String uploadUri;
public String fileId;
public String streamId;
public Map<String, String> requiredHeaders;

}
116 changes: 116 additions & 0 deletions src/main/java/com/coveo/pushapiclient/StreamService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.coveo.pushapiclient;

import java.io.IOException;
import java.net.http.HttpResponse;

import com.coveo.pushapiclient.exceptions.NoOpenStreamException;
import com.google.gson.Gson;

// TODO: LENS-851 - Make public
class StreamService {
private final StreamEnabledSource source;
private final PlatformClient platformClient;
private StreamServiceInternal service;
private String streamId;
private DocumentUploadQueue queue;

/**
* Creates a service to stream your documents to the provided source by
* interacting with the Stream API.
*
* <p>
* To perform <a href="https://docs.coveo.com/en/l62e0540">full document
* updates</a>, use the {@PushService}, since pushing documents with the
* {@StreamService} is equivalent to triggering a full source rebuild. The
* {@StreamService} can also be used for an initial catalog upload.
*
* @param source The source to which you want to send your documents.
*/
public StreamService(StreamEnabledSource source) {
String apiKey = source.getApiKey();
String organizationId = source.getOrganizationId();
PlatformUrl platformUrl = source.getPlatformUrl();
UploadStrategy uploader = this.getUploadStrategy();

this.source = source;
this.queue = new DocumentUploadQueue(uploader);
this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl);
this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient);
}

/**
* Adds documents to the previously specified source.
* This function will open a stream before uploading documents into it.
*
* <p>
* If called several times, the service will automatically batch documents and
* create new stream chunks whenever the data payload exceeds the
* <a href="https://docs.coveo.com/en/lb4a0344#stream-api-limits">batch size limit</a>
* set for the Stream API.
*
* <p>
* Once there are no more documents to add, it is important to call the {@link StreamService#close} function
* in order to send any buffered documents and close the open stream.
* Otherwise, changes will not be reflected in the index.
*
* <p>
* <pre>
* {@code
* //...
* StreamService service = new StreamService(source));
* for (DocumentBuilder document : fictionalDocumentList) {
* service.add(document);
* }
* service.close(document);
* </pre>
*
* <p>
* For more code samples, visit <a href="TODO: LENS-840">Stream data to your catalog source</a>
*
* @param document The documentBuilder to add to your source
* @throws InterruptedException
* @throws IOException
*/
public void add(DocumentBuilder document) throws IOException, InterruptedException {
this.service.add(document);
}

/**
* Sends any buffered documents and <a href="https://docs.coveo.com/en/lb4a0344#step-3-close-the-stream">closes the stream</a>.
*
* <p>
* Upon invoking this method, any indexed items not added through this {@link StreamService} instance will be removed.
* All documents added from the initialization of the service until the invocation of the {@link StreamService#close} function
* will completely replace the previous content of the source.
*
* <p>
* When you upload a catalog into a source, it will replace the previous content
* of the source completely. Expect a 15-minute delay for the removal of the old
* items from the index.
*
* @return
* @throws IOException
* @throws InterruptedException
* @throws NoOpenStreamException
*/
public HttpResponse<String> close() throws IOException, InterruptedException, NoOpenStreamException {
return this.service.close();
}

private UploadStrategy getUploadStrategy() {
return (batchUpdate) -> {
String sourceId = this.getSourceId();
HttpResponse<String> resFileContainer = this.platformClient.requireStreamChunk(sourceId, this.streamId);
FileContainer fileContainer = new Gson().fromJson(resFileContainer.body(), FileContainer.class);
String batchUpdateJson = new Gson().toJson(batchUpdate.marshal());
return this.platformClient.uploadContentToFileContainer(fileContainer,
batchUpdateJson);

};
}

private String getSourceId() {
return this.source.getId();
}

}
Loading