diff --git a/document-store/build.gradle.kts b/document-store/build.gradle.kts index e2e1372c..bd69e53c 100644 --- a/document-store/build.gradle.kts +++ b/document-store/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { testImplementation("org.junit.jupiter:junit-jupiter:5.6.2") testImplementation("org.mockito:mockito-core:2.19.0") integrationTestImplementation("org.junit.jupiter:junit-jupiter:5.6.2") + integrationTestImplementation("com.github.java-json-tools:json-patch:1.13") } tasks.test { diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java index 20ed32e4..0d56449b 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java @@ -5,9 +5,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.fge.jsonpatch.diff.JsonDiff; import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.client.FindIterable; @@ -22,9 +24,13 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.DatastoreProvider; @@ -112,8 +118,8 @@ public void testIgnoreCaseLikeQuery() throws IOException { String persistedDocument = documents.get(0).toJson(); JsonNode jsonNode = OBJECT_MAPPER.reader().readTree(persistedDocument); Assertions.assertTrue(persistedDocument.contains("Bob")); - Assertions.assertTrue(jsonNode.findValue("createdTime").asLong(0) > now); - Assertions.assertTrue(jsonNode.findValue("lastUpdatedTime").asLong(0) > now); + Assertions.assertTrue(jsonNode.findValue("createdTime").asLong(0) >= now); + Assertions.assertTrue(jsonNode.findValue("lastUpdatedTime").asLong(0) >= now); } } @@ -394,6 +400,74 @@ public void testBulkUpsert() { assertEquals(1, collection.count()); } + @Test + public void testReturnAndBulkUpsert() throws IOException { + datastore.createCollection(COLLECTION_NAME, null); + Collection collection = datastore.getCollection(COLLECTION_NAME); + Map documentMapV1 = Map.of( + new SingleValueKey("default", "testKey1"), createDocument("id", "1", "testKey1", "abc-v1"), + new SingleValueKey("default", "testKey2"), createDocument("id", "2", "testKey2", "xyz-v1") + ); + + Iterator iterator = collection.bulkUpsertAndReturnOlderDocuments(documentMapV1); + // Initially there shouldn't be any documents. + Assertions.assertFalse(iterator.hasNext()); + + // Add more details to the document and bulk upsert again. + Map documentMapV2 = Map.of( + new SingleValueKey("default", "testKey1"), createDocument("id", "1", "testKey1", "abc-v2"), + new SingleValueKey("default", "testKey2"), createDocument("id", "2", "testKey2", "xyz-v2") + ); + iterator = collection.bulkUpsertAndReturnOlderDocuments(documentMapV2); + assertEquals(2, collection.count()); + List documents = new ArrayList<>(); + while (iterator.hasNext()) { + documents.add(iterator.next()); + } + assertEquals(2, documents.size()); + + Map expectedDocs = convertToMap(documentMapV1.values(), "id"); + Map actualDocs = convertToMap(documents, "id"); + + // Verify that the documents returned were previous copies. + for (Map.Entry entry: expectedDocs.entrySet()) { + JsonNode expected = entry.getValue(); + JsonNode actual = actualDocs.get(entry.getKey()); + + Assertions.assertNotNull(actual); + JsonNode patch = JsonDiff.asJson(expected, actual); + + // Verify that there are only additions and "no" removals in this new node. + Set ops = new HashSet<>(); + patch.elements().forEachRemaining(e -> { + if (e.has("op")) { + ops.add(e.get("op").asText()); + } + }); + + Assertions.assertTrue(ops.contains("add")); + Assertions.assertEquals(1, ops.size()); + } + + // Delete one of the documents and test again. + collection.delete(new SingleValueKey("default", "testKey1")); + assertEquals(1, collection.count()); + } + + private Map convertToMap(java.util.Collection docs, String key) { + return docs.stream() + .map(d -> { + try { + return OBJECT_MAPPER.reader().readTree(d.toJson()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toMap(d -> d.get(key).asText(), d -> d)); + } + @Test public void testLike() { MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017"); @@ -431,6 +505,14 @@ public void testLike() { assertEquals(1, results.size()); } + private Document createDocument(String ...keys) { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + for (int i = 0; i < keys.length - 1; i++) { + objectNode.put(keys[i], keys[i + 1]); + } + return new JSONDocument(objectNode); + } + private Document createDocument(String key, String value) { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); objectNode.put(key, value); diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java index 7e8c888b..5d11dd84 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java @@ -99,7 +99,7 @@ public void testUpsert() throws IOException { query.setFilter(Filter.eq(ID, "default:testKey")); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertFalse(documents.isEmpty()); @@ -119,7 +119,7 @@ public void testUpsertAndReturn() throws IOException { } @Test - public void testBulkUpsert() throws IOException { + public void testBulkUpsert() { Collection collection = datastore.getCollection(COLLECTION_NAME); Map bulkMap = new HashMap<>(); bulkMap.put(new SingleValueKey("default", "testKey1"), createDocument("name", "Bob")); @@ -153,6 +153,51 @@ public void testBulkUpsert() throws IOException { } } + @Test + public void testBulkUpsertAndReturn() throws IOException { + Collection collection = datastore.getCollection(COLLECTION_NAME); + Map bulkMap = new HashMap<>(); + bulkMap.put(new SingleValueKey("default", "testKey1"), createDocument("name", "Bob")); + bulkMap.put(new SingleValueKey("default", "testKey2"), createDocument("name", "Alice")); + bulkMap.put(new SingleValueKey("default", "testKey3"), createDocument("name", "Alice")); + bulkMap.put(new SingleValueKey("default", "testKey4"), createDocument("name", "Bob")); + bulkMap.put(new SingleValueKey("default", "testKey5"), createDocument("name", "Alice")); + bulkMap.put( + new SingleValueKey("default", "testKey6"), createDocument("email", "bob@example.com")); + + Iterator iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); + // Initially there shouldn't be any documents. + Assertions.assertFalse(iterator.hasNext()); + + // The operation should be idempotent, so go ahead and try again. + iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); + List documents = new ArrayList<>(); + while (iterator.hasNext()) { + documents.add(iterator.next()); + } + Assertions.assertEquals(6, documents.size()); + + { + // empty query returns all the documents + Query query = new Query(); + Assertions.assertEquals(6, collection.total(query)); + } + + { + Query query = new Query(); + query.setFilter(Filter.eq("name", "Bob")); + Assertions.assertEquals(2, collection.total(query)); + } + + { + // limit should not affect the total + Query query = new Query(); + query.setFilter(Filter.eq("name", "Bob")); + query.setLimit(1); + Assertions.assertEquals(2, collection.total(query)); + } + } + @Test public void testSubDocumentUpdate() throws IOException { Collection collection = datastore.getCollection(COLLECTION_NAME); @@ -169,7 +214,7 @@ public void testSubDocumentUpdate() throws IOException { query.setFilter(Filter.eq(ID, "default:testKey")); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertFalse(documents.isEmpty()); @@ -229,7 +274,7 @@ public void testDeleteAll() throws IOException { } @Test - public void testDrop() throws IOException { + public void testDrop() { Collection collection = datastore.getCollection(COLLECTION_NAME); Assertions.assertTrue(datastore.listCollections().contains("postgres." + COLLECTION_NAME)); @@ -249,7 +294,7 @@ public void testIgnoreCaseLikeQuery() throws IOException { query.setFilter(new Filter(Filter.Op.LIKE, "name", searchValue)); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertFalse(documents.isEmpty()); @@ -271,7 +316,7 @@ public void testSearch() throws IOException { query.setFilter(new Filter(Filter.Op.EQ, DOCUMENT_ID, key.toString())); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertEquals(documents.size(), 1); @@ -291,7 +336,7 @@ public void testSearchForNestedKey() throws IOException { .setFilter(new Filter(Filter.Op.EQ, "attributes.span_id.value.string", "6449f1f720c93a67")); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertEquals(documents.size(), 1); @@ -347,7 +392,7 @@ public void testOffsetLimitAndOrderBY() throws IOException { Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index 85b89cc5..626a4af5 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -80,6 +80,13 @@ public interface Collection { */ boolean bulkUpsert(Map documents); + /** + * Method to bulkUpsert the given documents and return the previous copies of those documents. + * This helps the clients to see how the documents were prior to upserting them and do that + * in one less round trip. + */ + Iterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException; + /** * Drops a collections */ diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 11edb224..451cb72a 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import net.jodah.failsafe.Failsafe; @@ -386,28 +387,58 @@ public long total(Query query) { @Override public boolean bulkUpsert(Map documents) { try { - List> bulkCollection = new ArrayList<>(); - for (Entry entry : documents.entrySet()) { - Key key = entry.getKey(); - // insert or overwrite - bulkCollection.add(new UpdateOneModel<>( - this.selectionCriteriaForKey(key), - prepareUpsert(key, entry.getValue()), - new UpdateOptions().upsert(true))); - } - - BulkWriteResult result = Failsafe.with(bulkWriteRetryPolicy) - .get(() -> collection.bulkWrite(bulkCollection, new BulkWriteOptions().ordered(false))); + BulkWriteResult result = bulkUpsertImpl(documents); LOGGER.debug(result.toString()); - return true; - } catch (IOException | MongoServerException e) { LOGGER.error("Error during bulk upsert for documents:{}", documents, e); return false; } } + private BulkWriteResult bulkUpsertImpl(Map documents) throws JsonProcessingException { + List> bulkCollection = new ArrayList<>(); + for (Entry entry : documents.entrySet()) { + Key key = entry.getKey(); + // insert or overwrite + bulkCollection.add(new UpdateOneModel<>( + this.selectionCriteriaForKey(key), + prepareUpsert(key, entry.getValue()), + new UpdateOptions().upsert(true))); + } + + return Failsafe.with(bulkWriteRetryPolicy) + .get(() -> collection.bulkWrite(bulkCollection, new BulkWriteOptions().ordered(false))); + } + + @Override + public Iterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException { + try { + // First get all the documents for the given keys. + FindIterable cursor = collection.find(selectionCriteriaForKeys(documents.keySet())); + final MongoCursor mongoCursor = cursor.cursor(); + + // Now go ahead and do the bulk upsert. + BulkWriteResult result = bulkUpsertImpl(documents); + LOGGER.debug(result.toString()); + + return new Iterator<>() { + @Override + public boolean hasNext() { + return mongoCursor.hasNext(); + } + + @Override + public Document next() { + return MongoCollection.this.dbObjectToDocument(mongoCursor.next()); + } + }; + } catch (JsonProcessingException e) { + LOGGER.error("Error during bulk upsert for documents:{}", documents, e); + throw new IOException("Error during bulk upsert."); + } + } + @Override public void drop() { collection.drop(); @@ -417,6 +448,11 @@ private BasicDBObject selectionCriteriaForKey(Key key) { return new BasicDBObject(ID_KEY, key.toString()); } + private BasicDBObject selectionCriteriaForKeys(Set keys) { + return new BasicDBObject(Map.of(ID_KEY, new BasicDBObject("$in", + keys.stream().map(Key::toString).collect(Collectors.toList())))); + } + private Document dbObjectToDocument(BasicDBObject dbObject) { try { // Hack: Remove the _id field since it's an unrecognized field for Proto layer. diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index a6b7b4f6..6eaab35c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -49,8 +49,8 @@ public class PostgresCollection implements Collection { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String DOC_PATH_SEPARATOR = "\\."; - private Connection client; - private String collectionName; + private final Connection client; + private final String collectionName; public PostgresCollection(Connection client, String collectionName) { this.client = client; @@ -77,11 +77,11 @@ public boolean upsert(Key key, Document document) throws IOException { } } - @Override /** * For Postgres upsertAndReturn functionality is not supported directly. * So using direct upsert and then return document. */ + @Override public Document upsertAndReturn(Key key, Document document) throws IOException { try { boolean upsert = upsert(key, document); @@ -92,7 +92,6 @@ public Document upsertAndReturn(Key key, Document document) throws IOException { } } - @Override /** * Update the sub document based on subDocPath based on longest key match. * @@ -119,6 +118,7 @@ public Document upsertAndReturn(Key key, Document document) throws IOException { * address.street.longitude.degree * */ + @Override public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) { String updateSubDocSQL = String .format("UPDATE %s SET %s=jsonb_set(%s, ?::text[], ?::jsonb) WHERE %s=?", @@ -238,7 +238,7 @@ protected String parseQueryForNonCompositeFilter(Filter filter) { .stream() .map(val -> "'" + val + "'") .collect(Collectors.joining(", ")); - return filterString.append("(" + collect + ")").toString(); + return filterString.append("(").append(collect).append(")").toString(); case CONTAINS: // TODO: Matches condition inside an array of documents case EXISTS: @@ -307,13 +307,11 @@ private String getFieldPrefix(String fieldName) { } private String parseOrderByQuery(List orderBys) { - String orderBySQL = orderBys + return orderBys .stream() .map(orderBy -> getFieldPrefix(orderBy.getField()) + " " + (orderBy.isAsc() ? "ASC" : "DESC")) .filter(str -> !StringUtils.isEmpty(str)) .collect(Collectors.joining(" , ")); - - return orderBySQL; } @Override @@ -412,21 +410,7 @@ public long total(Query query) { @Override public boolean bulkUpsert(Map documents) { try { - PreparedStatement preparedStatement = client - .prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS); - for (Map.Entry entry : documents.entrySet()) { - - Key key = entry.getKey(); - String jsonString = prepareUpsertDocument(key, entry.getValue()); - - preparedStatement.setString(1, key.toString()); - preparedStatement.setString(2, jsonString); - preparedStatement.setString(3, jsonString); - - preparedStatement.addBatch(); - } - - int[] updateCounts = preparedStatement.executeBatch(); + int[] updateCounts = bulkUpsertImpl(documents); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Write result: " + Arrays.toString(updateCounts)); @@ -445,6 +429,58 @@ public boolean bulkUpsert(Map documents) { return false; } + private int[] bulkUpsertImpl(Map documents) throws SQLException, IOException { + PreparedStatement preparedStatement = client + .prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS); + for (Map.Entry entry : documents.entrySet()) { + + Key key = entry.getKey(); + String jsonString = prepareUpsertDocument(key, entry.getValue()); + + preparedStatement.setString(1, key.toString()); + preparedStatement.setString(2, jsonString); + preparedStatement.setString(3, jsonString); + + preparedStatement.addBatch(); + } + + return preparedStatement.executeBatch(); + } + + @Override + public Iterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException { + try { + String collect = documents.keySet().stream() + .map(val -> "'" + val.toString() + "'") + .collect(Collectors.joining(", ")); + + String space = " "; + String query = new StringBuilder("SELECT * FROM") + .append(space).append(collectionName) + .append(" WHERE ").append(ID).append(" IN ") + .append("(").append(collect).append(")").toString(); + + try { + PreparedStatement preparedStatement = client.prepareStatement(query); + ResultSet resultSet = preparedStatement.executeQuery(); + + // Now go ahead and bulk upsert the documents. + int[] updateCounts = bulkUpsertImpl(documents); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Write result: " + Arrays.toString(updateCounts)); + } + + return new PostgresResultIterator(resultSet); + } catch (SQLException e) { + LOGGER.error("SQLException querying documents. query: {}", query, e); + } + } catch (IOException e) { + LOGGER.error("SQLException bulk inserting documents. documents: {}", documents, e); + } + + throw new IOException("Could not bulk upsert the documents."); + } + private String prepareUpsertDocument(Key key, Document document) throws IOException { String jsonString = document.toJson(); @@ -472,7 +508,7 @@ public void drop() { } } - class PostgresResultIterator implements Iterator { + static class PostgresResultIterator implements Iterator { private final ObjectMapper MAPPER = new ObjectMapper(); ResultSet resultSet;