Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,14 @@ private boolean allBulkWriteErrorsAreDueToDuplicateKey(MongoBulkWriteException b
*/
@Override
public boolean upsert(Key key, Document document) throws IOException {
try {
UpdateOptions options = new UpdateOptions().upsert(true);
UpdateResult writeResult =
collection.updateOne(this.selectionCriteriaForKey(key), this.prepareUpsert(key, document), options);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Write result: " + writeResult.toString());
}

return writeResult.getModifiedCount() > 0;
} catch (IOException e) {
LOGGER.error("Exception upserting document. key: {} content:{}", key, document, e);
throw e;
UpdateOptions options = new UpdateOptions().upsert(true);
UpdateResult writeResult =
collection.updateOne(selectionCriteriaForKey(key), prepareUpsert(key, document), options);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Write result: {}", writeResult);
}

return writeResult.getModifiedCount() > 0;
}

/**
Expand All @@ -126,22 +121,22 @@ public boolean upsert(Key key, Document document) throws IOException {
@Override
public Document upsertAndReturn(Key key, Document document) throws IOException {
BasicDBObject upsertResult = Failsafe.with(upsertRetryPolicy).get(() -> collection.findOneAndUpdate(
this.selectionCriteriaForKey(key),
this.prepareUpsert(key, document),
selectionCriteriaForKey(key),
prepareUpsert(key, document),
new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)));
if (upsertResult == null) {
throw new IOException("Could not upsert the document with key: " + key);
}

return this.dbObjectToDocument(upsertResult);
return dbObjectToDocument(upsertResult);
}

private BasicDBObject prepareUpsert(Key key, Document document) throws JsonProcessingException {
private static BasicDBObject prepareUpsert(Key key, Document document) throws JsonProcessingException {
String jsonString = document.toJson();
JsonNode jsonNode = MAPPER.readTree(jsonString);

// escape "." and "$" in field names since Mongo DB does not like them
JsonNode sanitizedJsonNode = recursiveClone(jsonNode, this::encodeKey);
JsonNode sanitizedJsonNode = recursiveClone(jsonNode, MongoCollection::encodeKey);
BasicDBObject setObject = BasicDBObject.parse(MAPPER.writeValueAsString(sanitizedJsonNode));
long now = System.currentTimeMillis();
setObject.put(ID_KEY, key.toString());
Expand All @@ -161,7 +156,7 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) {
JsonNode jsonNode = MAPPER.readTree(jsonString);

// escape "." and "$" in field names since Mongo DB does not like them
JsonNode sanitizedJsonNode = recursiveClone(jsonNode, this::encodeKey);
JsonNode sanitizedJsonNode = recursiveClone(jsonNode, MongoCollection::encodeKey);
BasicDBObject dbObject =
new BasicDBObject(
subDocPath, BasicDBObject.parse(MAPPER.writeValueAsString(sanitizedJsonNode)));
Expand All @@ -171,7 +166,7 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) {
UpdateResult writeResult = collection.updateOne(selectionCriteriaForKey(key), setObject,
new UpdateOptions());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Write result: " + writeResult.toString());
LOGGER.debug("Write result: {}", writeResult);
}
// TODO:look into the writeResult to ensure it was successful. Was not easy to find this from
// docs.
Expand All @@ -182,7 +177,7 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) {
}
}

private JsonNode recursiveClone(JsonNode src, Function<String, String> function) {
private static JsonNode recursiveClone(JsonNode src, Function<String, String> function) {
if (!src.isObject()) {
return src;
}
Expand All @@ -202,11 +197,11 @@ private JsonNode recursiveClone(JsonNode src, Function<String, String> function)
return tgt;
}

private String encodeKey(String key) {
private static String encodeKey(String key) {
return key.replace("\\", "\\\\").replace("$", "\\u0024").replace(".", "\\u002e");
}

private String decodeKey(String key) {
private static String decodeKey(String key) {
return key.replace("\\u002e", ".").replace("\\u0024", "$").replace("\\\\", "\\");
}

Expand Down Expand Up @@ -255,24 +250,24 @@ public boolean hasNext() {

@Override
public Document next() {
return MongoCollection.this.dbObjectToDocument(mongoCursor.next());
return dbObjectToDocument(mongoCursor.next());
}
};
}

@Override
public boolean delete(Key key) {
DeleteResult deleteResult = collection.deleteOne(this.selectionCriteriaForKey(key));
DeleteResult deleteResult = collection.deleteOne(selectionCriteriaForKey(key));
return deleteResult.getDeletedCount() > 0;
}

@Override
public boolean deleteSubDoc(Key key, String subDocPath) {
BasicDBObject unsetObject = new BasicDBObject("$unset", new BasicDBObject(subDocPath, ""));

UpdateResult updateResult = collection.updateOne(this.selectionCriteriaForKey(key), unsetObject);
UpdateResult updateResult = collection.updateOne(selectionCriteriaForKey(key), unsetObject);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Write result: " + updateResult.toString());
LOGGER.debug("Update result: {}", updateResult);
}

return updateResult.getModifiedCount() > 0;
Expand All @@ -286,7 +281,7 @@ public boolean deleteAll() {
return true;
}

private void parseOrderByQuery(List<OrderBy> orderBys, Map<String, Object> orderbyMap) {
private static void parseOrderByQuery(List<OrderBy> orderBys, Map<String, Object> orderbyMap) {
for (OrderBy orderBy : orderBys) {
orderbyMap.put(orderBy.getField(), (orderBy.isAsc() ? 1 : -1));
}
Expand Down Expand Up @@ -388,7 +383,7 @@ public long total(Query query) {
public boolean bulkUpsert(Map<Key, Document> documents) {
try {
BulkWriteResult result = bulkUpsertImpl(documents);
LOGGER.debug(result.toString());
LOGGER.debug("BulkWriteResult: {}", result);
return true;
} catch (IOException | MongoServerException e) {
LOGGER.error("Error during bulk upsert for documents:{}", documents, e);
Expand All @@ -402,7 +397,7 @@ private BulkWriteResult bulkUpsertImpl(Map<Key, Document> documents) throws Json
Key key = entry.getKey();
// insert or overwrite
bulkCollection.add(new UpdateOneModel<>(
this.selectionCriteriaForKey(key),
selectionCriteriaForKey(key),
prepareUpsert(key, entry.getValue()),
new UpdateOptions().upsert(true)));
}
Expand All @@ -420,7 +415,7 @@ public Iterator<Document> bulkUpsertAndReturnOlderDocuments(Map<Key, Document> d

// Now go ahead and do the bulk upsert.
BulkWriteResult result = bulkUpsertImpl(documents);
LOGGER.debug(result.toString());
LOGGER.debug("BulkWriteResult: {}", result);

return new Iterator<>() {
@Override
Expand All @@ -430,12 +425,11 @@ public boolean hasNext() {

@Override
public Document next() {
return MongoCollection.this.dbObjectToDocument(mongoCursor.next());
return dbObjectToDocument(mongoCursor.next());
}
};
} catch (JsonProcessingException e) {
LOGGER.error("Error during bulk upsert for documents:{}", documents, e);
throw new IOException("Error during bulk upsert.");
throw new IOException("Error during bulk upsert.", e);
}
}

Expand All @@ -444,16 +438,16 @@ public void drop() {
collection.drop();
}

private BasicDBObject selectionCriteriaForKey(Key key) {
private static BasicDBObject selectionCriteriaForKey(Key key) {
return new BasicDBObject(ID_KEY, key.toString());
}

private BasicDBObject selectionCriteriaForKeys(Set<Key> keys) {
private static BasicDBObject selectionCriteriaForKeys(Set<Key> keys) {
return new BasicDBObject(Map.of(ID_KEY, new BasicDBObject("$in",
keys.stream().map(Key::toString).collect(Collectors.toList()))));
}

private Document dbObjectToDocument(BasicDBObject dbObject) {
private static Document dbObjectToDocument(BasicDBObject dbObject) {
try {
// Hack: Remove the _id field since it's an unrecognized field for Proto layer.
// TODO: We should rather use separate DAO classes instead of using the
Expand All @@ -464,7 +458,7 @@ private Document dbObjectToDocument(BasicDBObject dbObject) {
JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build();
jsonString = dbObject.toJson(relaxed);
JsonNode jsonNode = MAPPER.readTree(jsonString);
JsonNode decodedJsonNode = recursiveClone(jsonNode, this::decodeKey);
JsonNode decodedJsonNode = recursiveClone(jsonNode, MongoCollection::decodeKey);
return new JSONDocument(decodedJsonNode);
} catch (IOException e) {
// throwing exception is not very useful here.
Expand Down