diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 451cb72a..18dd1c80 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -104,19 +104,14 @@ private boolean allBulkWriteErrorsAreDueToDuplicateKey(MongoBulkWriteException b */ @Override public boolean upsert(Key key, Document document) throws IOException { - try { - UpdateOptions options = new UpdateOptions().upsert(true); - UpdateResult writeResult = - collection.updateOne(this.selectionCriteriaForKey(key), this.prepareUpsert(key, document), options); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Write result: " + writeResult.toString()); - } - - return writeResult.getModifiedCount() > 0; - } catch (IOException e) { - LOGGER.error("Exception upserting document. key: {} content:{}", key, document, e); - throw e; + UpdateOptions options = new UpdateOptions().upsert(true); + UpdateResult writeResult = + collection.updateOne(selectionCriteriaForKey(key), prepareUpsert(key, document), options); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Write result: {}", writeResult); } + + return writeResult.getModifiedCount() > 0; } /** @@ -126,22 +121,22 @@ public boolean upsert(Key key, Document document) throws IOException { @Override public Document upsertAndReturn(Key key, Document document) throws IOException { BasicDBObject upsertResult = Failsafe.with(upsertRetryPolicy).get(() -> collection.findOneAndUpdate( - this.selectionCriteriaForKey(key), - this.prepareUpsert(key, document), + selectionCriteriaForKey(key), + prepareUpsert(key, document), new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER))); if (upsertResult == null) { throw new IOException("Could not upsert the document with key: " + key); } - return this.dbObjectToDocument(upsertResult); + return dbObjectToDocument(upsertResult); } - private BasicDBObject prepareUpsert(Key key, Document document) throws JsonProcessingException { + private static BasicDBObject prepareUpsert(Key key, Document document) throws JsonProcessingException { String jsonString = document.toJson(); JsonNode jsonNode = MAPPER.readTree(jsonString); // escape "." and "$" in field names since Mongo DB does not like them - JsonNode sanitizedJsonNode = recursiveClone(jsonNode, this::encodeKey); + JsonNode sanitizedJsonNode = recursiveClone(jsonNode, MongoCollection::encodeKey); BasicDBObject setObject = BasicDBObject.parse(MAPPER.writeValueAsString(sanitizedJsonNode)); long now = System.currentTimeMillis(); setObject.put(ID_KEY, key.toString()); @@ -161,7 +156,7 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) { JsonNode jsonNode = MAPPER.readTree(jsonString); // escape "." and "$" in field names since Mongo DB does not like them - JsonNode sanitizedJsonNode = recursiveClone(jsonNode, this::encodeKey); + JsonNode sanitizedJsonNode = recursiveClone(jsonNode, MongoCollection::encodeKey); BasicDBObject dbObject = new BasicDBObject( subDocPath, BasicDBObject.parse(MAPPER.writeValueAsString(sanitizedJsonNode))); @@ -171,7 +166,7 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) { UpdateResult writeResult = collection.updateOne(selectionCriteriaForKey(key), setObject, new UpdateOptions()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Write result: " + writeResult.toString()); + LOGGER.debug("Write result: {}", writeResult); } // TODO:look into the writeResult to ensure it was successful. Was not easy to find this from // docs. @@ -182,7 +177,7 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) { } } - private JsonNode recursiveClone(JsonNode src, Function function) { + private static JsonNode recursiveClone(JsonNode src, Function function) { if (!src.isObject()) { return src; } @@ -202,11 +197,11 @@ private JsonNode recursiveClone(JsonNode src, Function function) return tgt; } - private String encodeKey(String key) { + private static String encodeKey(String key) { return key.replace("\\", "\\\\").replace("$", "\\u0024").replace(".", "\\u002e"); } - private String decodeKey(String key) { + private static String decodeKey(String key) { return key.replace("\\u002e", ".").replace("\\u0024", "$").replace("\\\\", "\\"); } @@ -255,14 +250,14 @@ public boolean hasNext() { @Override public Document next() { - return MongoCollection.this.dbObjectToDocument(mongoCursor.next()); + return dbObjectToDocument(mongoCursor.next()); } }; } @Override public boolean delete(Key key) { - DeleteResult deleteResult = collection.deleteOne(this.selectionCriteriaForKey(key)); + DeleteResult deleteResult = collection.deleteOne(selectionCriteriaForKey(key)); return deleteResult.getDeletedCount() > 0; } @@ -270,9 +265,9 @@ public boolean delete(Key key) { public boolean deleteSubDoc(Key key, String subDocPath) { BasicDBObject unsetObject = new BasicDBObject("$unset", new BasicDBObject(subDocPath, "")); - UpdateResult updateResult = collection.updateOne(this.selectionCriteriaForKey(key), unsetObject); + UpdateResult updateResult = collection.updateOne(selectionCriteriaForKey(key), unsetObject); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Write result: " + updateResult.toString()); + LOGGER.debug("Update result: {}", updateResult); } return updateResult.getModifiedCount() > 0; @@ -286,7 +281,7 @@ public boolean deleteAll() { return true; } - private void parseOrderByQuery(List orderBys, Map orderbyMap) { + private static void parseOrderByQuery(List orderBys, Map orderbyMap) { for (OrderBy orderBy : orderBys) { orderbyMap.put(orderBy.getField(), (orderBy.isAsc() ? 1 : -1)); } @@ -388,7 +383,7 @@ public long total(Query query) { public boolean bulkUpsert(Map documents) { try { BulkWriteResult result = bulkUpsertImpl(documents); - LOGGER.debug(result.toString()); + LOGGER.debug("BulkWriteResult: {}", result); return true; } catch (IOException | MongoServerException e) { LOGGER.error("Error during bulk upsert for documents:{}", documents, e); @@ -402,7 +397,7 @@ private BulkWriteResult bulkUpsertImpl(Map documents) throws Json Key key = entry.getKey(); // insert or overwrite bulkCollection.add(new UpdateOneModel<>( - this.selectionCriteriaForKey(key), + selectionCriteriaForKey(key), prepareUpsert(key, entry.getValue()), new UpdateOptions().upsert(true))); } @@ -420,7 +415,7 @@ public Iterator bulkUpsertAndReturnOlderDocuments(Map d // Now go ahead and do the bulk upsert. BulkWriteResult result = bulkUpsertImpl(documents); - LOGGER.debug(result.toString()); + LOGGER.debug("BulkWriteResult: {}", result); return new Iterator<>() { @Override @@ -430,12 +425,11 @@ public boolean hasNext() { @Override public Document next() { - return MongoCollection.this.dbObjectToDocument(mongoCursor.next()); + return dbObjectToDocument(mongoCursor.next()); } }; } catch (JsonProcessingException e) { - LOGGER.error("Error during bulk upsert for documents:{}", documents, e); - throw new IOException("Error during bulk upsert."); + throw new IOException("Error during bulk upsert.", e); } } @@ -444,16 +438,16 @@ public void drop() { collection.drop(); } - private BasicDBObject selectionCriteriaForKey(Key key) { + private static BasicDBObject selectionCriteriaForKey(Key key) { return new BasicDBObject(ID_KEY, key.toString()); } - private BasicDBObject selectionCriteriaForKeys(Set keys) { + private static BasicDBObject selectionCriteriaForKeys(Set keys) { return new BasicDBObject(Map.of(ID_KEY, new BasicDBObject("$in", keys.stream().map(Key::toString).collect(Collectors.toList())))); } - private Document dbObjectToDocument(BasicDBObject dbObject) { + private static Document dbObjectToDocument(BasicDBObject dbObject) { try { // Hack: Remove the _id field since it's an unrecognized field for Proto layer. // TODO: We should rather use separate DAO classes instead of using the @@ -464,7 +458,7 @@ private Document dbObjectToDocument(BasicDBObject dbObject) { JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build(); jsonString = dbObject.toJson(relaxed); JsonNode jsonNode = MAPPER.readTree(jsonString); - JsonNode decodedJsonNode = recursiveClone(jsonNode, this::decodeKey); + JsonNode decodedJsonNode = recursiveClone(jsonNode, MongoCollection::decodeKey); return new JSONDocument(decodedJsonNode); } catch (IOException e) { // throwing exception is not very useful here.