Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions samples/UpdateStreamDocuments.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ public static void main(String[] args) throws IOException, InterruptedException,
DeleteDocument document3 = new DeleteDocument("https://my.document3.uri");
updateStreamService.delete(document3);

PartialUpdateDocument document4 = new PartialUpdateDocument("https://my.document4.uri", PartialUpdateOperator.FIELD_VALUE_REPLACE, "title", "My new title");
updateStreamService.addPartialUpdate(document4);

PartialUpdateDocument document5 = new PartialUpdateDocument("https://my.document5.uri", PartialUpdateOperator.DICTIONARY_PUT, "dictionaryAttribute", new HashMap<>() {{
put("newkey", "newvalue");
}});
updateStreamService.addPartialUpdate(document5);

PartialUpdateDocument document6 = new PartialUpdateDocument("https://my.document6.uri", PartialUpdateOperator.ARRAY_APPEND, "arrayAttribute", new String[]{"newValue"});
updateStreamService.addPartialUpdate(document6);

PartialUpdateDocument document7 = new PartialUpdateDocument("https://my.document7.uri", PartialUpdateOperator.ARRAY_REMOVE, "arrayAttribute", new String[]{"oldValue"});
updateStreamService.addPartialUpdate(document7);

PartialUpdateDocument document8 = new PartialUpdateDocument("https://my.document8.uri", PartialUpdateOperator.DICIONARY_REMOVE, "dictionaryAttribute", "oldkey");
updateStreamService.addPartialUpdate(document8);

updateStreamService.close();
}
}
10 changes: 5 additions & 5 deletions src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
/** Represents a queue for uploading documents using a specified upload strategy */
class DocumentUploadQueue {
private static final Logger logger = LogManager.getLogger(DocumentUploadQueue.class);
private final UploadStrategy uploader;
private final int maxQueueSize = 5 * 1024 * 1024;
private ArrayList<DocumentBuilder> documentToAddList;
private ArrayList<DeleteDocument> documentToDeleteList;
private int size;
protected final UploadStrategy uploader;
protected final int maxQueueSize = 5 * 1024 * 1024;
protected ArrayList<DocumentBuilder> documentToAddList;
protected ArrayList<DeleteDocument> documentToDeleteList;
protected int size;

/**
* Constructs a new DocumentUploadQueue object with a default maximum queue size limit of 5MB.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.coveo.pushapiclient;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.util.Map;

public class PartialUpdateDocument {

/** The documentId of the document. */
public String documentId;

/** The operator of the document. */
public PartialUpdateOperator operator;

/** The field to update. */
public String field;

/** The value of the field to be updated. */
public Object value;

public JsonObject marshalJsonObject() {
return new Gson().toJsonTree(this).getAsJsonObject();
}

/**
* Creates a new PartialUpdateDocument. The type of the value provided is constrained by the
* operator.
*
* <ul>
* <li>PartialUpdateOperator.ARRAY_APPEND: value must be an array
* <li>PartialUpdateOperator.ARRAY_REMOVE: value must be an array
* <li>PartialUpdateOperator.FIELD_VALUE_REPLACE: value can be any type
* <li>PartialUpdateOperator.DICTIONARY_PUT: value must be a Map
* <li>PartialUpdateOperator.DICTIONARY_REMOVE: value must be a String or an Array
* </ul>
*
* @param documentId The id of the document.
* @param operator The operator to use.
* @param field The field to update.
* @param value The value to update the field with.
*/
public PartialUpdateDocument(
String documentId, PartialUpdateOperator operator, String field, Object value) {
if (value == null) throw new IllegalArgumentException("Value cannot be null");
if (operator == null) throw new IllegalArgumentException("Operator cannot be null");
if (field == null) throw new IllegalArgumentException("Field cannot be null");
if (documentId == null) throw new IllegalArgumentException("DocumentId cannot be null");

this.documentId = documentId;
this.operator = operator;
this.field = field;

switch (operator) {
case ARRAYAPPEND:
case ARRAYREMOVE:
if (!value.getClass().isArray())
throw new IllegalArgumentException("Value must be an array for operator " + operator);
break;
case FIELDVALUEREPLACE:
break;
case DICTIONARYPUT:
if (!(value instanceof Map<?, ?>))
throw new IllegalArgumentException("Value must be a Map for operator " + operator);
break;
case DICTIONARYREMOVE:
if (!(value instanceof String) && !value.getClass().isArray())
throw new IllegalArgumentException(
"Value must be a String or an Array for operator " + operator);
break;
default:
throw new IllegalArgumentException("Invalid operator " + operator);
}
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.coveo.pushapiclient;

public enum PartialUpdateOperator {
ARRAYAPPEND {
public String toString() {
return "arrayAppend";
}
},
ARRAYREMOVE {
public String toString() {
return "arrayRemove";
}
},
FIELDVALUEREPLACE {
public String toString() {
return "fieldValueReplace";
}
},
DICTIONARYPUT {
public String toString() {
return "dictionaryPut";
}
},
DICTIONARYREMOVE {
public String toString() {
return "dictionaryRemove";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.coveo.pushapiclient;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class StreamDocumentUploadQueue extends DocumentUploadQueue {

private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class);
protected ArrayList<PartialUpdateDocument> documentToPartiallyUpdateList;

public StreamDocumentUploadQueue(UploadStrategy uploader) {
super(uploader);
this.documentToPartiallyUpdateList = new ArrayList<>();
}

/**
* Flushes the accumulated documents by applying the upload strategy.
*
* @throws IOException If an I/O error occurs during the upload.
* @throws InterruptedException If the upload process is interrupted.
*/
@Override
public void flush() throws IOException, InterruptedException {
if (this.isEmpty()) {
logger.debug("Empty batch. Skipping upload");
return;
}
// TODO: LENS-871: support concurrent requests
StreamUpdate stream = this.getStream();
logger.info("Uploading document Stream");
this.uploader.apply(stream);

this.size = 0;
this.documentToAddList.clear();
this.documentToDeleteList.clear();
this.documentToPartiallyUpdateList.clear();
}

/**
* Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds
* the maximum content length. See {@link PartialUpdateDocument#flush}.
*
* @param document The document to be deleted from the index.
* @throws IOException If an I/O error occurs during the upload.
* @throws InterruptedException If the upload process is interrupted.
*/
public void add(PartialUpdateDocument document) throws IOException, InterruptedException {
if (document == null) {
return;
}

final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length;
if (this.size + sizeOfDoc >= this.maxQueueSize) {
this.flush();
}
documentToPartiallyUpdateList.add(document);
logger.info("Adding document to batch: " + document.documentId);
this.size += sizeOfDoc;
}

public StreamUpdate getStream() {
return new StreamUpdate(
new ArrayList<>(this.documentToAddList),
new ArrayList<>(this.documentToDeleteList),
new ArrayList<>(this.documentToPartiallyUpdateList));
}

@Override
public BatchUpdate getBatch() {
throw new UnsupportedOperationException("StreamDocumentUploadQueue does not support getBatch");
}

@Override
public boolean isEmpty() {
return super.isEmpty() && documentToPartiallyUpdateList.isEmpty();
}
}
60 changes: 60 additions & 0 deletions src/main/java/com/coveo/pushapiclient/StreamUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.coveo.pushapiclient;

import com.google.gson.JsonObject;
import java.util.List;

public class StreamUpdate extends BatchUpdate {

private final List<PartialUpdateDocument> partialUpdate;

public StreamUpdate(
List<DocumentBuilder> addOrUpdate,
List<DeleteDocument> delete,
List<PartialUpdateDocument> partialUpdate) {
super(addOrUpdate, delete);
this.partialUpdate = partialUpdate;
}

@Override
public StreamUpdateRecord marshal() {
return new StreamUpdateRecord(
this.getAddOrUpdate().stream()
.map(DocumentBuilder::marshalJsonObject)
.toArray(JsonObject[]::new),
this.getDelete().stream().map(DeleteDocument::marshalJsonObject).toArray(JsonObject[]::new),
this.partialUpdate.stream()
.map(PartialUpdateDocument::marshalJsonObject)
.toArray(JsonObject[]::new));
}

public List<PartialUpdateDocument> getPartialUpdate() {
return partialUpdate;
}

@Override
public String toString() {
return "StreamUpdate["
+ "addOrUpdate="
+ getAddOrUpdate()
+ ", delete="
+ getDelete()
+ ", partialUpdate="
+ partialUpdate
+ ']';
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
StreamUpdate that = (StreamUpdate) obj;
return getAddOrUpdate().equals(that.getAddOrUpdate())
&& getDelete().equals(that.getDelete())
&& partialUpdate.equals(that.partialUpdate);
}

@Override
public int hashCode() {
return super.hashCode() + partialUpdate.hashCode();
}
}
48 changes: 48 additions & 0 deletions src/main/java/com/coveo/pushapiclient/StreamUpdateRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.coveo.pushapiclient;

import com.google.gson.JsonObject;
import java.util.Arrays;

public class StreamUpdateRecord extends BatchUpdateRecord {

private final JsonObject[] partialUpdate;

public StreamUpdateRecord(
JsonObject[] addOrUpdate, JsonObject[] delete, JsonObject[] partialUpdate) {
super(addOrUpdate, delete);
this.partialUpdate = partialUpdate;
}

public JsonObject[] getPartialUpdate() {
return partialUpdate;
}

@Override
public String toString() {
return "StreamUpdateRecord["
+ "addOrUpdate="
+ Arrays.toString(this.getAddOrUpdate())
+ ", delete="
+ Arrays.toString(this.getDelete())
+ ", partialUpdate="
+ Arrays.toString(partialUpdate)
+ ']';
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
StreamUpdateRecord that = (StreamUpdateRecord) obj;
return Arrays.equals(this.getAddOrUpdate(), that.getAddOrUpdate())
&& Arrays.equals(this.getDelete(), that.getDelete())
&& Arrays.equals(partialUpdate, that.partialUpdate);
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + Arrays.hashCode(partialUpdate);
return result;
}
}
Loading